diff --git a/ledger/ledger-api-bench-tool/BUILD.bazel b/ledger/ledger-api-bench-tool/BUILD.bazel index 0f4d06f3da..264883bb1c 100644 --- a/ledger/ledger-api-bench-tool/BUILD.bazel +++ b/ledger/ledger-api-bench-tool/BUILD.bazel @@ -65,16 +65,19 @@ da_scala_library( "//libs-scala/resources", "//libs-scala/resources-akka", "//libs-scala/resources-grpc", - "//libs-scala/scala-utils", "//libs-scala/timer-utils", "//ledger/test-common:benchtool-tests-%s.scala" % "1.15", # TODO: make the LF version configurable "//ledger/test-common:dar-files-%s-lib" % "1.15", # TODO: make the LF version configurable "//observability/metrics", - "@maven//:io_dropwizard_metrics_metrics_core", + "@maven//:com_google_code_findbugs_jsr305", "@maven//:io_grpc_grpc_api", "@maven//:io_grpc_grpc_core", "@maven//:io_grpc_grpc_netty", "@maven//:io_netty_netty_handler", + "@maven//:io_opentelemetry_opentelemetry_api", + "@maven//:io_opentelemetry_opentelemetry_exporter_prometheus", + "@maven//:io_opentelemetry_opentelemetry_sdk_common", + "@maven//:io_opentelemetry_opentelemetry_sdk_metrics", "@maven//:org_slf4j_slf4j_api", ], ) @@ -104,7 +107,7 @@ da_scala_library( "//ledger/participant-integration-api", "//ledger/sandbox-on-x", "//ledger/sandbox-on-x:sandbox-on-x-test-lib", - "@maven//:io_dropwizard_metrics_metrics_core", + "//observability/metrics", "@maven//:org_slf4j_slf4j_api", ], ) diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Benchmark.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Benchmark.scala index cda0ce771c..27a09832c1 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Benchmark.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Benchmark.scala @@ -4,7 +4,6 @@ package com.daml.ledger.api.benchtool import akka.actor.typed.{ActorSystem, SpawnProtocol} -import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig import com.daml.ledger.api.benchtool.metrics.{BenchmarkResult, MetricsSet, StreamMetrics} import com.daml.ledger.api.benchtool.services.LedgerApiServices @@ -15,6 +14,7 @@ import com.daml.ledger.api.v1.transaction_service.{ GetTransactionTreesResponse, GetTransactionsResponse, } +import com.daml.metrics.api.MetricHandle.MetricsFactory import com.daml.timer.Delayed import org.slf4j.LoggerFactory @@ -28,7 +28,7 @@ object Benchmark { streamConfigs: List[StreamConfig], reportingPeriod: FiniteDuration, apiServices: LedgerApiServices, - metricRegistry: MetricRegistry, + metricsFactory: MetricsFactory, system: ActorSystem[SpawnProtocol.Command], )(implicit ec: ExecutionContext): Future[Either[String, Unit]] = Future @@ -44,7 +44,7 @@ object Benchmark { logger = logger, exposedMetrics = Some( MetricsSet - .transactionExposedMetrics(streamConfig.name, metricRegistry, reportingPeriod) + .transactionExposedMetrics(streamConfig.name, metricsFactory) ), itemCountingFunction = MetricsSet.countFlatTransactionsEvents, maxItemCount = streamConfig.maxItemCount, @@ -65,8 +65,7 @@ object Benchmark { exposedMetrics = Some( MetricsSet.transactionTreesExposedMetrics( streamConfig.name, - metricRegistry, - reportingPeriod, + metricsFactory, ) ), itemCountingFunction = MetricsSet.countTreeTransactionsEvents, @@ -88,12 +87,10 @@ object Benchmark { exposedMetrics = Some( MetricsSet.activeContractsExposedMetrics( streamConfig.name, - metricRegistry, - reportingPeriod, + metricsFactory, ) ), - itemCountingFunction = - (response) => MetricsSet.countActiveContracts(response).toLong, + itemCountingFunction = response => MetricsSet.countActiveContracts(response).toLong, maxItemCount = streamConfig.maxItemCount, )(system, ec) _ = streamConfig.timeoutO @@ -111,9 +108,9 @@ object Benchmark { logger = logger, exposedMetrics = Some( MetricsSet - .completionsExposedMetrics(streamConfig.name, metricRegistry, reportingPeriod) + .completionsExposedMetrics(streamConfig.name, metricsFactory) ), - itemCountingFunction = (response) => MetricsSet.countCompletions(response).toLong, + itemCountingFunction = response => MetricsSet.countCompletions(response).toLong, maxItemCount = streamConfig.maxItemCount, )(system, ec) _ = streamConfig.timeoutO diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala index 9750b220e3..02746d5601 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala @@ -6,7 +6,6 @@ package com.daml.ledger.api.benchtool import java.util.concurrent._ import akka.actor.typed.{ActorSystem, SpawnProtocol} -import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.benchtool.config.WorkflowConfig.{ FibonacciSubmissionConfig, FooSubmissionConfig, @@ -25,9 +24,12 @@ import com.daml.ledger.api.benchtool.submission.foo.RandomPartySelecting import com.daml.ledger.api.benchtool.util.TypedActorSystemResourceOwner import com.daml.ledger.api.tls.TlsConfiguration import com.daml.ledger.resources.{ResourceContext, ResourceOwner} +import com.daml.metrics.api.MetricHandle.MetricsFactory +import com.daml.metrics.api.opentelemetry.OpenTelemetryMetricsFactory import com.daml.platform.localstore.api.UserManagementStore import io.grpc.Channel import io.grpc.netty.{NegotiationType, NettyChannelBuilder} +import io.opentelemetry.api.metrics.MeterProvider import org.slf4j.{Logger, LoggerFactory} import scala.concurrent.duration._ @@ -95,21 +97,24 @@ class LedgerApiBenchTool( ( String => LedgerApiServices, ActorSystem[SpawnProtocol.Command], - MetricRegistry, + MeterProvider, ) ] = for { servicesForUserId <- apiServicesOwner(config, authorizationHelper) system <- TypedActorSystemResourceOwner.owner() - metricRegistry <- new MetricRegistryOwner( + meterProvider <- new MetricRegistryOwner( reporter = config.metricsReporter, reportingInterval = config.reportingPeriod, logger = logger, ) - } yield (servicesForUserId, system, metricRegistry) + } yield (servicesForUserId, system, meterProvider) - resources.use { case (servicesForUserId, actorSystem, metricRegistry) => + resources.use { case (servicesForUserId, actorSystem, meterProvider) => val adminServices = servicesForUserId(UserManagementStore.DefaultParticipantAdminUserId) val regularUserServices = servicesForUserId(names.benchtoolUserId) + val metricsFactory = new OpenTelemetryMetricsFactory( + meterProvider.meterBuilder("ledger-api-bench-tool").build() + ) val partyAllocating = new PartyAllocating( names = names, @@ -138,7 +143,7 @@ class LedgerApiBenchTool( regularUserServices = regularUserServices, adminServices = adminServices, submissionConfig = submissionConfig, - metricRegistry = metricRegistry, + metricsFactory = metricsFactory, partyAllocating = partyAllocating, ) .map(_ -> BenchtoolTestsPackageInfo.StaticDefault) @@ -169,7 +174,7 @@ class LedgerApiBenchTool( regularUserServices = regularUserServices, adminServices = adminServices, submissionConfigO = config.workflow.submission, - metricRegistry = metricRegistry, + metricsFactory = metricsFactory, allocatedParties = allocatedParties, actorSystem = actorSystem, maxLatencyObjectiveMillis = config.maxLatencyObjectiveMillis, @@ -188,7 +193,7 @@ class LedgerApiBenchTool( benchmarkStreams( regularUserServices = regularUserServices, streamConfigs = updatedStreamConfigs, - metricRegistry = metricRegistry, + metricsFactory = metricsFactory, actorSystem = actorSystem, ) } @@ -225,7 +230,7 @@ class LedgerApiBenchTool( private def benchmarkStreams( regularUserServices: LedgerApiServices, streamConfigs: List[WorkflowConfig.StreamConfig], - metricRegistry: MetricRegistry, + metricsFactory: MetricsFactory, actorSystem: ActorSystem[SpawnProtocol.Command], )(implicit ec: ExecutionContext): Future[Either[String, Unit]] = if (streamConfigs.isEmpty) { @@ -237,7 +242,7 @@ class LedgerApiBenchTool( streamConfigs = streamConfigs, reportingPeriod = config.reportingPeriod, apiServices = regularUserServices, - metricRegistry = metricRegistry, + metricsFactory = metricsFactory, system = actorSystem, ) @@ -245,7 +250,7 @@ class LedgerApiBenchTool( regularUserServices: LedgerApiServices, adminServices: LedgerApiServices, submissionConfigO: Option[WorkflowConfig.SubmissionConfig], - metricRegistry: MetricRegistry, + metricsFactory: MetricsFactory, allocatedParties: AllocatedParties, actorSystem: ActorSystem[SpawnProtocol.Command], maxLatencyObjectiveMillis: Long, @@ -275,7 +280,7 @@ class LedgerApiBenchTool( names = names, benchtoolUserServices = regularUserServices, adminServices = adminServices, - metricRegistry = metricRegistry, + metricsFactory = metricsFactory, metricsManager = metricsManager, waitForSubmission = true, partyAllocating = new PartyAllocating( @@ -316,7 +321,7 @@ class LedgerApiBenchTool( regularUserServices: LedgerApiServices, adminServices: LedgerApiServices, submissionConfig: WorkflowConfig.SubmissionConfig, - metricRegistry: MetricRegistry, + metricsFactory: MetricsFactory, partyAllocating: PartyAllocating, )(implicit ec: ExecutionContext @@ -326,7 +331,7 @@ class LedgerApiBenchTool( names = names, benchtoolUserServices = regularUserServices, adminServices = adminServices, - metricRegistry = metricRegistry, + metricsFactory = metricsFactory, metricsManager = NoOpMetricsManager(), waitForSubmission = submissionConfig.waitForSubmission, partyAllocating = partyAllocating, diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/ExposedMetrics.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/ExposedMetrics.scala index 70e6c044b7..3983b9f8c5 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/ExposedMetrics.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/ExposedMetrics.scala @@ -3,16 +3,11 @@ package com.daml.ledger.api.benchtool.metrics -import java.time.{Clock, Duration} -import java.util.concurrent.TimeUnit +import java.time.Clock -import com.codahale.metrics.{MetricRegistry, SlidingTimeWindowArrayReservoir} -import com.codahale.{metrics => codahale} import com.daml.ledger.api.benchtool.util.TimeUtil -import com.daml.metrics.api.MetricHandle.{Counter, Gauge, Histogram} -import com.daml.metrics.api.dropwizard.{DropwizardCounter, DropwizardGauge, DropwizardHistogram} -import com.daml.metrics.api.{Gauges, MetricName, MetricsContext} -import com.daml.scalautil.Statement.discard +import com.daml.metrics.api.MetricHandle.{Counter, Gauge, Histogram, MetricsFactory} +import com.daml.metrics.api.{MetricName, MetricsContext} import com.google.protobuf.timestamp.Timestamp final class ExposedMetrics[T]( @@ -59,51 +54,38 @@ object ExposedMetrics { def apply[T]( streamName: String, - registry: MetricRegistry, - slidingTimeWindow: Duration, + factory: MetricsFactory, countingFunction: T => Long, sizingFunction: T => Long, recordTimeFunction: Option[T => Seq[Timestamp]], clock: Clock = Clock.systemUTC(), ): ExposedMetrics[T] = { val counterMetric = CounterMetric[T]( - counter = DropwizardCounter( - Prefix :+ "count" :+ streamName, - registry.counter(Prefix :+ "count" :+ streamName), + counter = factory.counter( + Prefix :+ "count" :+ streamName ), countingFunction = countingFunction, ) val bytesProcessedMetric = BytesProcessedMetric[T]( - bytesProcessed = DropwizardCounter( - Prefix :+ "bytes_read" :+ streamName, - registry.counter(Prefix :+ "bytes_read" :+ streamName), + bytesProcessed = factory.counter( + Prefix :+ "bytes_read" :+ streamName ), sizingFunction = sizingFunction, ) - val delaysHistogram = new codahale.Histogram( - new SlidingTimeWindowArrayReservoir(slidingTimeWindow.toNanos, TimeUnit.NANOSECONDS) - ) val delayMetric = recordTimeFunction.map { f => DelayMetric[T]( - delays = DropwizardHistogram( - Prefix :+ "delay" :+ streamName, - registry.register(Prefix :+ "delay" :+ streamName, delaysHistogram), + delays = factory.histogram( + Prefix :+ "delay" :+ streamName ), recordTimeFunction = f, ) } val latestRecordTimeMetric = recordTimeFunction.map { f => - val name = Prefix :+ "latest_record_time" :+ streamName LatestRecordTimeMetric[T]( - latestRecordTime = DropwizardGauge( - name, - registry - .register( - name, - new Gauges.VarGauge[Long](0L), - ), - () => discard(registry.remove(name)), - ), + latestRecordTime = factory.gauge( + Prefix :+ "latest_record_time" :+ streamName, + 0L, + )(MetricsContext.Empty), recordTimeFunction = f, ) } diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricRegistryOwner.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricRegistryOwner.scala index d40f606215..19a097ef17 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricRegistryOwner.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricRegistryOwner.scala @@ -3,52 +3,50 @@ package com.daml.ledger.api.benchtool.metrics +import java.util.concurrent.TimeUnit + +import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} +import com.daml.metrics.api.reporters.MetricsReporter +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.exporter.prometheus.PrometheusHttpServer +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.`export`.PeriodicMetricReader import org.slf4j.Logger import scala.concurrent.duration.Duration -import java.util.concurrent.TimeUnit -import com.codahale.metrics.{MetricRegistry, ScheduledReporter, Slf4jReporter} -import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} -import com.daml.metrics.api.reporters.MetricsReporter - -import scala.concurrent.Future class MetricRegistryOwner( reporter: MetricsReporter, reportingInterval: Duration, logger: Logger, -) extends ResourceOwner[MetricRegistry] { +) extends ResourceOwner[MeterProvider] { override def acquire()(implicit context: ResourceContext - ): Resource[MetricRegistry] = - for { - registry <- ResourceOwner.forValue(() => new MetricRegistry).acquire() - _ <- acquireSlfjReporter(registry) - metricsReporter <- acquireMetricsReporter(registry) - _ = metricsReporter.start(reportingInterval.toMillis, TimeUnit.MILLISECONDS) - } yield registry + ): Resource[MeterProvider] = + ResourceOwner.forCloseable(() => metricOwner).acquire() - private def acquireSlfjReporter( - registry: MetricRegistry - )(implicit context: ResourceContext): Resource[Slf4jReporter] = - Resource { - Future.successful(newSlf4jReporter(registry)) - } { reporter => - Future(reporter.report()) // Trigger a report to the SLF4J logger on shutdown. - .andThen { case _ => reporter.close() } // Gracefully shut down + private def metricOwner = { + val loggingMetricReader = PeriodicMetricReader + .builder(new Slf4jMetricExporter(logger)) + .setInterval(reportingInterval.toMillis, TimeUnit.MILLISECONDS) + .newMetricReaderFactory() + val meterProviderBuilder = SdkMeterProvider + .builder() + reporter match { + case MetricsReporter.Console => meterProviderBuilder.registerMetricReader(loggingMetricReader) + case MetricsReporter.Prometheus(address) => + meterProviderBuilder + .registerMetricReader(loggingMetricReader) + .registerMetricReader( + PrometheusHttpServer + .builder() + .setHost(address.getHostString) + .setPort(address.getPort) + .newMetricReaderFactory() + ) + case _ => throw new IllegalArgumentException(s"Metric reporter $reporter not supported.") } + meterProviderBuilder.build() + } - private def acquireMetricsReporter(registry: MetricRegistry)(implicit - context: ResourceContext - ): Resource[ScheduledReporter] = - ResourceOwner.forCloseable(() => reporter.register(registry)).acquire() - - private def newSlf4jReporter(registry: MetricRegistry): Slf4jReporter = - Slf4jReporter - .forRegistry(registry) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG) - .outputTo(logger) - .build() } diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricsSet.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricsSet.scala index 5ff3e5b997..d228ab24e8 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricsSet.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricsSet.scala @@ -3,18 +3,19 @@ package com.daml.ledger.api.benchtool.metrics -import com.codahale.metrics.MetricRegistry +import java.time.{Clock, Duration} + import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig._ +import com.daml.ledger.api.benchtool.metrics.metrics.TotalRuntimeMetric +import com.daml.ledger.api.benchtool.metrics.metrics.TotalRuntimeMetric.MaxDurationObjective import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse import com.daml.ledger.api.v1.transaction_service.{ GetTransactionTreesResponse, GetTransactionsResponse, } +import com.daml.metrics.api.MetricHandle.MetricsFactory import com.google.protobuf.timestamp.Timestamp -import java.time.{Clock, Duration} -import com.daml.ledger.api.benchtool.metrics.metrics.TotalRuntimeMetric -import com.daml.ledger.api.benchtool.metrics.metrics.TotalRuntimeMetric.MaxDurationObjective import scala.concurrent.duration.FiniteDuration @@ -24,7 +25,7 @@ object MetricsSet { configO: Option[TransactionObjectives] ): List[Metric[GetTransactionsResponse]] = transactionMetrics[GetTransactionsResponse]( - countingFunction = (response => countFlatTransactionsEvents(response).toInt), + countingFunction = response => countFlatTransactionsEvents(response).toInt, sizingFunction = _.serializedSize.toLong, recordTimeFunction = _.transactions.collect { case t if t.effectiveAt.isDefined => t.getEffectiveAt @@ -34,13 +35,11 @@ object MetricsSet { def transactionExposedMetrics( streamName: String, - registry: MetricRegistry, - slidingTimeWindow: FiniteDuration, + metricsFactory: MetricsFactory, ): ExposedMetrics[GetTransactionsResponse] = ExposedMetrics[GetTransactionsResponse]( streamName = streamName, - registry = registry, - slidingTimeWindow = Duration.ofNanos(slidingTimeWindow.toNanos), + factory = metricsFactory, countingFunction = countFlatTransactionsEvents, sizingFunction = _.serializedSize.toLong, recordTimeFunction = Some(_.transactions.collect { @@ -52,7 +51,7 @@ object MetricsSet { configO: Option[TransactionObjectives] ): List[Metric[GetTransactionTreesResponse]] = transactionMetrics[GetTransactionTreesResponse]( - countingFunction = (response => countTreeTransactionsEvents(response).toInt), + countingFunction = response => countTreeTransactionsEvents(response).toInt, sizingFunction = _.serializedSize.toLong, recordTimeFunction = _.transactions.collect { case t if t.effectiveAt.isDefined => t.getEffectiveAt @@ -62,13 +61,11 @@ object MetricsSet { def transactionTreesExposedMetrics( streamName: String, - registry: MetricRegistry, - slidingTimeWindow: FiniteDuration, + metricsFactory: MetricsFactory, ): ExposedMetrics[GetTransactionTreesResponse] = ExposedMetrics[GetTransactionTreesResponse]( streamName = streamName, - registry = registry, - slidingTimeWindow = Duration.ofNanos(slidingTimeWindow.toNanos), + factory = metricsFactory, countingFunction = countTreeTransactionsEvents, sizingFunction = _.serializedSize.toLong, recordTimeFunction = Some(_.transactions.collect { @@ -98,14 +95,12 @@ object MetricsSet { def activeContractsExposedMetrics( streamName: String, - registry: MetricRegistry, - slidingTimeWindow: FiniteDuration, + metricsFactory: MetricsFactory, ): ExposedMetrics[GetActiveContractsResponse] = ExposedMetrics[GetActiveContractsResponse]( streamName = streamName, - registry = registry, - slidingTimeWindow = Duration.ofNanos(slidingTimeWindow.toNanos), - countingFunction = (response) => countActiveContracts(response).toLong, + factory = metricsFactory, + countingFunction = response => countActiveContracts(response).toLong, sizingFunction = _.serializedSize.toLong, recordTimeFunction = None, ) @@ -132,14 +127,12 @@ object MetricsSet { def completionsExposedMetrics( streamName: String, - registry: MetricRegistry, - slidingTimeWindow: FiniteDuration, + metricsFactory: MetricsFactory, ): ExposedMetrics[CompletionStreamResponse] = ExposedMetrics[CompletionStreamResponse]( streamName = streamName, - registry = registry, - slidingTimeWindow = Duration.ofNanos(slidingTimeWindow.toNanos), - countingFunction = (response) => countCompletions(response).toLong, + factory = metricsFactory, + countingFunction = response => countCompletions(response).toLong, sizingFunction = _.serializedSize.toLong, recordTimeFunction = None, ) diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/Slf4jMetricExporter.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/Slf4jMetricExporter.scala new file mode 100644 index 0000000000..a050a2ff65 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/Slf4jMetricExporter.scala @@ -0,0 +1,29 @@ +// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.metrics + +import java.util + +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.`export`.MetricExporter +import io.opentelemetry.sdk.metrics.data.MetricData +import org.slf4j.Logger + +import scala.jdk.CollectionConverters.CollectionHasAsScala + +class Slf4jMetricExporter(logger: Logger) extends MetricExporter { + + override def `export`( + metrics: util.Collection[MetricData] + ): CompletableResultCode = { + logger.debug(s"Logging ${metrics.size()} metrics") + metrics.asScala.foreach(metricData => logger.debug(s"metric: $metricData")) + CompletableResultCode.ofSuccess() + } + + override def flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override def shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() + +} diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/CommandSubmitter.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/CommandSubmitter.scala index 4cf411d5a4..6759605380 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/CommandSubmitter.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/CommandSubmitter.scala @@ -6,7 +6,6 @@ package com.daml.ledger.api.benchtool.submission import akka.actor.ActorSystem import akka.stream.scaladsl.{Sink, Source} import akka.stream.{Materializer, OverflowStrategy} -import com.codahale.metrics.{MetricRegistry, Timer} import com.daml.ledger.api.benchtool.config.WorkflowConfig.SubmissionConfig import com.daml.ledger.api.benchtool.infrastructure.TestDars import com.daml.ledger.api.benchtool.metrics.LatencyMetric.LatencyNanos @@ -16,6 +15,8 @@ import com.daml.ledger.api.v1.commands.{Command, Commands} import com.daml.ledger.client import com.daml.ledger.client.binding.Primitive import com.daml.ledger.resources.{ResourceContext, ResourceOwner} +import com.daml.metrics.api.MetricHandle.{MetricsFactory, Timer} +import com.daml.metrics.api.MetricName import io.grpc.Status import org.slf4j.LoggerFactory import scalaz.syntax.tag._ @@ -30,7 +31,7 @@ case class CommandSubmitter( benchtoolUserServices: LedgerApiServices, adminServices: LedgerApiServices, partyAllocating: PartyAllocating, - metricRegistry: MetricRegistry, + metricsFactory: MetricsFactory, metricsManager: MetricsManager[LatencyNanos], waitForSubmission: Boolean, commandGenerationParallelism: Int = 8, @@ -38,9 +39,9 @@ case class CommandSubmitter( ) { private val logger = LoggerFactory.getLogger(getClass) private val submitLatencyTimer = if (waitForSubmission) { - metricRegistry.timer("daml_submit_and_wait_latency") + metricsFactory.timer(MetricName("daml_submit_and_wait_latency")) } else { - metricRegistry.timer("daml_submit_latency") + metricsFactory.timer(MetricName("daml_submit_latency")) } def prepare(config: SubmissionConfig)(implicit @@ -235,10 +236,12 @@ case class CommandSubmitter( private def timed[O](timer: Timer, metricsManager: MetricsManager[LatencyNanos])( f: => Future[O] )(implicit ec: ExecutionContext) = { - val ctx = timer.time() + val ctx = timer.startAsync() + val startNanos = System.nanoTime() f.map(_.tap { _ => - val latencyNanos = ctx.stop() - metricsManager.sendNewValue(latencyNanos) + ctx.stop() + val endNanos = System.nanoTime() + metricsManager.sendNewValue(endNanos - startNanos) }) } } diff --git a/ledger/ledger-api-bench-tool/src/test/lib/scala/com/daml/ledger/api/benchtool/BenchtoolSandboxFixture.scala b/ledger/ledger-api-bench-tool/src/test/lib/scala/com/daml/ledger/api/benchtool/BenchtoolSandboxFixture.scala index f1b3ff88bf..9432669b54 100644 --- a/ledger/ledger-api-bench-tool/src/test/lib/scala/com/daml/ledger/api/benchtool/BenchtoolSandboxFixture.scala +++ b/ledger/ledger-api-bench-tool/src/test/lib/scala/com/daml/ledger/api/benchtool/BenchtoolSandboxFixture.scala @@ -5,7 +5,6 @@ package com.daml.ledger.api.benchtool import java.io.File -import com.codahale.metrics.MetricRegistry import com.daml.bazeltools.BazelRunfiles.rlocation import com.daml.ledger.api.benchtool.config.WorkflowConfig import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager @@ -20,6 +19,7 @@ import com.daml.ledger.api.benchtool.submission.{ } import com.daml.ledger.test.BenchtoolTestDar import com.daml.lf.language.LanguageVersion +import com.daml.metrics.api.noop.NoOpMetricsFactory import com.daml.platform.sandbox.fixture.SandboxFixture import org.scalatest.Suite @@ -50,7 +50,7 @@ trait BenchtoolSandboxFixture extends SandboxFixture { names = names, benchtoolUserServices = apiServices, adminServices = apiServices, - metricRegistry = new MetricRegistry, + metricsFactory = NoOpMetricsFactory, metricsManager = NoOpMetricsManager(), waitForSubmission = true, partyAllocating = new PartyAllocating(