mirror of
https://github.com/digital-asset/daml.git
synced 2024-11-10 10:46:11 +03:00
Use OpenTelemetry metrics for the ledger-api-bench-tool [PLEN-100] (#16240)
This commit is contained in:
parent
eb0c90a173
commit
496e6d57ee
@ -65,16 +65,19 @@ da_scala_library(
|
||||
"//libs-scala/resources",
|
||||
"//libs-scala/resources-akka",
|
||||
"//libs-scala/resources-grpc",
|
||||
"//libs-scala/scala-utils",
|
||||
"//libs-scala/timer-utils",
|
||||
"//ledger/test-common:benchtool-tests-%s.scala" % "1.15", # TODO: make the LF version configurable
|
||||
"//ledger/test-common:dar-files-%s-lib" % "1.15", # TODO: make the LF version configurable
|
||||
"//observability/metrics",
|
||||
"@maven//:io_dropwizard_metrics_metrics_core",
|
||||
"@maven//:com_google_code_findbugs_jsr305",
|
||||
"@maven//:io_grpc_grpc_api",
|
||||
"@maven//:io_grpc_grpc_core",
|
||||
"@maven//:io_grpc_grpc_netty",
|
||||
"@maven//:io_netty_netty_handler",
|
||||
"@maven//:io_opentelemetry_opentelemetry_api",
|
||||
"@maven//:io_opentelemetry_opentelemetry_exporter_prometheus",
|
||||
"@maven//:io_opentelemetry_opentelemetry_sdk_common",
|
||||
"@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
|
||||
"@maven//:org_slf4j_slf4j_api",
|
||||
],
|
||||
)
|
||||
@ -104,7 +107,7 @@ da_scala_library(
|
||||
"//ledger/participant-integration-api",
|
||||
"//ledger/sandbox-on-x",
|
||||
"//ledger/sandbox-on-x:sandbox-on-x-test-lib",
|
||||
"@maven//:io_dropwizard_metrics_metrics_core",
|
||||
"//observability/metrics",
|
||||
"@maven//:org_slf4j_slf4j_api",
|
||||
],
|
||||
)
|
||||
|
@ -4,7 +4,6 @@
|
||||
package com.daml.ledger.api.benchtool
|
||||
|
||||
import akka.actor.typed.{ActorSystem, SpawnProtocol}
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig
|
||||
import com.daml.ledger.api.benchtool.metrics.{BenchmarkResult, MetricsSet, StreamMetrics}
|
||||
import com.daml.ledger.api.benchtool.services.LedgerApiServices
|
||||
@ -15,6 +14,7 @@ import com.daml.ledger.api.v1.transaction_service.{
|
||||
GetTransactionTreesResponse,
|
||||
GetTransactionsResponse,
|
||||
}
|
||||
import com.daml.metrics.api.MetricHandle.MetricsFactory
|
||||
import com.daml.timer.Delayed
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
@ -28,7 +28,7 @@ object Benchmark {
|
||||
streamConfigs: List[StreamConfig],
|
||||
reportingPeriod: FiniteDuration,
|
||||
apiServices: LedgerApiServices,
|
||||
metricRegistry: MetricRegistry,
|
||||
metricsFactory: MetricsFactory,
|
||||
system: ActorSystem[SpawnProtocol.Command],
|
||||
)(implicit ec: ExecutionContext): Future[Either[String, Unit]] =
|
||||
Future
|
||||
@ -44,7 +44,7 @@ object Benchmark {
|
||||
logger = logger,
|
||||
exposedMetrics = Some(
|
||||
MetricsSet
|
||||
.transactionExposedMetrics(streamConfig.name, metricRegistry, reportingPeriod)
|
||||
.transactionExposedMetrics(streamConfig.name, metricsFactory)
|
||||
),
|
||||
itemCountingFunction = MetricsSet.countFlatTransactionsEvents,
|
||||
maxItemCount = streamConfig.maxItemCount,
|
||||
@ -65,8 +65,7 @@ object Benchmark {
|
||||
exposedMetrics = Some(
|
||||
MetricsSet.transactionTreesExposedMetrics(
|
||||
streamConfig.name,
|
||||
metricRegistry,
|
||||
reportingPeriod,
|
||||
metricsFactory,
|
||||
)
|
||||
),
|
||||
itemCountingFunction = MetricsSet.countTreeTransactionsEvents,
|
||||
@ -88,12 +87,10 @@ object Benchmark {
|
||||
exposedMetrics = Some(
|
||||
MetricsSet.activeContractsExposedMetrics(
|
||||
streamConfig.name,
|
||||
metricRegistry,
|
||||
reportingPeriod,
|
||||
metricsFactory,
|
||||
)
|
||||
),
|
||||
itemCountingFunction =
|
||||
(response) => MetricsSet.countActiveContracts(response).toLong,
|
||||
itemCountingFunction = response => MetricsSet.countActiveContracts(response).toLong,
|
||||
maxItemCount = streamConfig.maxItemCount,
|
||||
)(system, ec)
|
||||
_ = streamConfig.timeoutO
|
||||
@ -111,9 +108,9 @@ object Benchmark {
|
||||
logger = logger,
|
||||
exposedMetrics = Some(
|
||||
MetricsSet
|
||||
.completionsExposedMetrics(streamConfig.name, metricRegistry, reportingPeriod)
|
||||
.completionsExposedMetrics(streamConfig.name, metricsFactory)
|
||||
),
|
||||
itemCountingFunction = (response) => MetricsSet.countCompletions(response).toLong,
|
||||
itemCountingFunction = response => MetricsSet.countCompletions(response).toLong,
|
||||
maxItemCount = streamConfig.maxItemCount,
|
||||
)(system, ec)
|
||||
_ = streamConfig.timeoutO
|
||||
|
@ -6,7 +6,6 @@ package com.daml.ledger.api.benchtool
|
||||
import java.util.concurrent._
|
||||
|
||||
import akka.actor.typed.{ActorSystem, SpawnProtocol}
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.api.benchtool.config.WorkflowConfig.{
|
||||
FibonacciSubmissionConfig,
|
||||
FooSubmissionConfig,
|
||||
@ -25,9 +24,12 @@ import com.daml.ledger.api.benchtool.submission.foo.RandomPartySelecting
|
||||
import com.daml.ledger.api.benchtool.util.TypedActorSystemResourceOwner
|
||||
import com.daml.ledger.api.tls.TlsConfiguration
|
||||
import com.daml.ledger.resources.{ResourceContext, ResourceOwner}
|
||||
import com.daml.metrics.api.MetricHandle.MetricsFactory
|
||||
import com.daml.metrics.api.opentelemetry.OpenTelemetryMetricsFactory
|
||||
import com.daml.platform.localstore.api.UserManagementStore
|
||||
import io.grpc.Channel
|
||||
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
|
||||
import io.opentelemetry.api.metrics.MeterProvider
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.concurrent.duration._
|
||||
@ -95,21 +97,24 @@ class LedgerApiBenchTool(
|
||||
(
|
||||
String => LedgerApiServices,
|
||||
ActorSystem[SpawnProtocol.Command],
|
||||
MetricRegistry,
|
||||
MeterProvider,
|
||||
)
|
||||
] = for {
|
||||
servicesForUserId <- apiServicesOwner(config, authorizationHelper)
|
||||
system <- TypedActorSystemResourceOwner.owner()
|
||||
metricRegistry <- new MetricRegistryOwner(
|
||||
meterProvider <- new MetricRegistryOwner(
|
||||
reporter = config.metricsReporter,
|
||||
reportingInterval = config.reportingPeriod,
|
||||
logger = logger,
|
||||
)
|
||||
} yield (servicesForUserId, system, metricRegistry)
|
||||
} yield (servicesForUserId, system, meterProvider)
|
||||
|
||||
resources.use { case (servicesForUserId, actorSystem, metricRegistry) =>
|
||||
resources.use { case (servicesForUserId, actorSystem, meterProvider) =>
|
||||
val adminServices = servicesForUserId(UserManagementStore.DefaultParticipantAdminUserId)
|
||||
val regularUserServices = servicesForUserId(names.benchtoolUserId)
|
||||
val metricsFactory = new OpenTelemetryMetricsFactory(
|
||||
meterProvider.meterBuilder("ledger-api-bench-tool").build()
|
||||
)
|
||||
|
||||
val partyAllocating = new PartyAllocating(
|
||||
names = names,
|
||||
@ -138,7 +143,7 @@ class LedgerApiBenchTool(
|
||||
regularUserServices = regularUserServices,
|
||||
adminServices = adminServices,
|
||||
submissionConfig = submissionConfig,
|
||||
metricRegistry = metricRegistry,
|
||||
metricsFactory = metricsFactory,
|
||||
partyAllocating = partyAllocating,
|
||||
)
|
||||
.map(_ -> BenchtoolTestsPackageInfo.StaticDefault)
|
||||
@ -169,7 +174,7 @@ class LedgerApiBenchTool(
|
||||
regularUserServices = regularUserServices,
|
||||
adminServices = adminServices,
|
||||
submissionConfigO = config.workflow.submission,
|
||||
metricRegistry = metricRegistry,
|
||||
metricsFactory = metricsFactory,
|
||||
allocatedParties = allocatedParties,
|
||||
actorSystem = actorSystem,
|
||||
maxLatencyObjectiveMillis = config.maxLatencyObjectiveMillis,
|
||||
@ -188,7 +193,7 @@ class LedgerApiBenchTool(
|
||||
benchmarkStreams(
|
||||
regularUserServices = regularUserServices,
|
||||
streamConfigs = updatedStreamConfigs,
|
||||
metricRegistry = metricRegistry,
|
||||
metricsFactory = metricsFactory,
|
||||
actorSystem = actorSystem,
|
||||
)
|
||||
}
|
||||
@ -225,7 +230,7 @@ class LedgerApiBenchTool(
|
||||
private def benchmarkStreams(
|
||||
regularUserServices: LedgerApiServices,
|
||||
streamConfigs: List[WorkflowConfig.StreamConfig],
|
||||
metricRegistry: MetricRegistry,
|
||||
metricsFactory: MetricsFactory,
|
||||
actorSystem: ActorSystem[SpawnProtocol.Command],
|
||||
)(implicit ec: ExecutionContext): Future[Either[String, Unit]] =
|
||||
if (streamConfigs.isEmpty) {
|
||||
@ -237,7 +242,7 @@ class LedgerApiBenchTool(
|
||||
streamConfigs = streamConfigs,
|
||||
reportingPeriod = config.reportingPeriod,
|
||||
apiServices = regularUserServices,
|
||||
metricRegistry = metricRegistry,
|
||||
metricsFactory = metricsFactory,
|
||||
system = actorSystem,
|
||||
)
|
||||
|
||||
@ -245,7 +250,7 @@ class LedgerApiBenchTool(
|
||||
regularUserServices: LedgerApiServices,
|
||||
adminServices: LedgerApiServices,
|
||||
submissionConfigO: Option[WorkflowConfig.SubmissionConfig],
|
||||
metricRegistry: MetricRegistry,
|
||||
metricsFactory: MetricsFactory,
|
||||
allocatedParties: AllocatedParties,
|
||||
actorSystem: ActorSystem[SpawnProtocol.Command],
|
||||
maxLatencyObjectiveMillis: Long,
|
||||
@ -275,7 +280,7 @@ class LedgerApiBenchTool(
|
||||
names = names,
|
||||
benchtoolUserServices = regularUserServices,
|
||||
adminServices = adminServices,
|
||||
metricRegistry = metricRegistry,
|
||||
metricsFactory = metricsFactory,
|
||||
metricsManager = metricsManager,
|
||||
waitForSubmission = true,
|
||||
partyAllocating = new PartyAllocating(
|
||||
@ -316,7 +321,7 @@ class LedgerApiBenchTool(
|
||||
regularUserServices: LedgerApiServices,
|
||||
adminServices: LedgerApiServices,
|
||||
submissionConfig: WorkflowConfig.SubmissionConfig,
|
||||
metricRegistry: MetricRegistry,
|
||||
metricsFactory: MetricsFactory,
|
||||
partyAllocating: PartyAllocating,
|
||||
)(implicit
|
||||
ec: ExecutionContext
|
||||
@ -326,7 +331,7 @@ class LedgerApiBenchTool(
|
||||
names = names,
|
||||
benchtoolUserServices = regularUserServices,
|
||||
adminServices = adminServices,
|
||||
metricRegistry = metricRegistry,
|
||||
metricsFactory = metricsFactory,
|
||||
metricsManager = NoOpMetricsManager(),
|
||||
waitForSubmission = submissionConfig.waitForSubmission,
|
||||
partyAllocating = partyAllocating,
|
||||
|
@ -3,16 +3,11 @@
|
||||
|
||||
package com.daml.ledger.api.benchtool.metrics
|
||||
|
||||
import java.time.{Clock, Duration}
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.time.Clock
|
||||
|
||||
import com.codahale.metrics.{MetricRegistry, SlidingTimeWindowArrayReservoir}
|
||||
import com.codahale.{metrics => codahale}
|
||||
import com.daml.ledger.api.benchtool.util.TimeUtil
|
||||
import com.daml.metrics.api.MetricHandle.{Counter, Gauge, Histogram}
|
||||
import com.daml.metrics.api.dropwizard.{DropwizardCounter, DropwizardGauge, DropwizardHistogram}
|
||||
import com.daml.metrics.api.{Gauges, MetricName, MetricsContext}
|
||||
import com.daml.scalautil.Statement.discard
|
||||
import com.daml.metrics.api.MetricHandle.{Counter, Gauge, Histogram, MetricsFactory}
|
||||
import com.daml.metrics.api.{MetricName, MetricsContext}
|
||||
import com.google.protobuf.timestamp.Timestamp
|
||||
|
||||
final class ExposedMetrics[T](
|
||||
@ -59,51 +54,38 @@ object ExposedMetrics {
|
||||
|
||||
def apply[T](
|
||||
streamName: String,
|
||||
registry: MetricRegistry,
|
||||
slidingTimeWindow: Duration,
|
||||
factory: MetricsFactory,
|
||||
countingFunction: T => Long,
|
||||
sizingFunction: T => Long,
|
||||
recordTimeFunction: Option[T => Seq[Timestamp]],
|
||||
clock: Clock = Clock.systemUTC(),
|
||||
): ExposedMetrics[T] = {
|
||||
val counterMetric = CounterMetric[T](
|
||||
counter = DropwizardCounter(
|
||||
Prefix :+ "count" :+ streamName,
|
||||
registry.counter(Prefix :+ "count" :+ streamName),
|
||||
counter = factory.counter(
|
||||
Prefix :+ "count" :+ streamName
|
||||
),
|
||||
countingFunction = countingFunction,
|
||||
)
|
||||
val bytesProcessedMetric = BytesProcessedMetric[T](
|
||||
bytesProcessed = DropwizardCounter(
|
||||
Prefix :+ "bytes_read" :+ streamName,
|
||||
registry.counter(Prefix :+ "bytes_read" :+ streamName),
|
||||
bytesProcessed = factory.counter(
|
||||
Prefix :+ "bytes_read" :+ streamName
|
||||
),
|
||||
sizingFunction = sizingFunction,
|
||||
)
|
||||
val delaysHistogram = new codahale.Histogram(
|
||||
new SlidingTimeWindowArrayReservoir(slidingTimeWindow.toNanos, TimeUnit.NANOSECONDS)
|
||||
)
|
||||
val delayMetric = recordTimeFunction.map { f =>
|
||||
DelayMetric[T](
|
||||
delays = DropwizardHistogram(
|
||||
Prefix :+ "delay" :+ streamName,
|
||||
registry.register(Prefix :+ "delay" :+ streamName, delaysHistogram),
|
||||
delays = factory.histogram(
|
||||
Prefix :+ "delay" :+ streamName
|
||||
),
|
||||
recordTimeFunction = f,
|
||||
)
|
||||
}
|
||||
val latestRecordTimeMetric = recordTimeFunction.map { f =>
|
||||
val name = Prefix :+ "latest_record_time" :+ streamName
|
||||
LatestRecordTimeMetric[T](
|
||||
latestRecordTime = DropwizardGauge(
|
||||
name,
|
||||
registry
|
||||
.register(
|
||||
name,
|
||||
new Gauges.VarGauge[Long](0L),
|
||||
),
|
||||
() => discard(registry.remove(name)),
|
||||
),
|
||||
latestRecordTime = factory.gauge(
|
||||
Prefix :+ "latest_record_time" :+ streamName,
|
||||
0L,
|
||||
)(MetricsContext.Empty),
|
||||
recordTimeFunction = f,
|
||||
)
|
||||
}
|
||||
|
@ -3,52 +3,50 @@
|
||||
|
||||
package com.daml.ledger.api.benchtool.metrics
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
|
||||
import com.daml.metrics.api.reporters.MetricsReporter
|
||||
import io.opentelemetry.api.metrics.MeterProvider
|
||||
import io.opentelemetry.exporter.prometheus.PrometheusHttpServer
|
||||
import io.opentelemetry.sdk.metrics.SdkMeterProvider
|
||||
import io.opentelemetry.sdk.metrics.`export`.PeriodicMetricReader
|
||||
import org.slf4j.Logger
|
||||
|
||||
import scala.concurrent.duration.Duration
|
||||
import java.util.concurrent.TimeUnit
|
||||
import com.codahale.metrics.{MetricRegistry, ScheduledReporter, Slf4jReporter}
|
||||
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
|
||||
import com.daml.metrics.api.reporters.MetricsReporter
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
class MetricRegistryOwner(
|
||||
reporter: MetricsReporter,
|
||||
reportingInterval: Duration,
|
||||
logger: Logger,
|
||||
) extends ResourceOwner[MetricRegistry] {
|
||||
) extends ResourceOwner[MeterProvider] {
|
||||
override def acquire()(implicit
|
||||
context: ResourceContext
|
||||
): Resource[MetricRegistry] =
|
||||
for {
|
||||
registry <- ResourceOwner.forValue(() => new MetricRegistry).acquire()
|
||||
_ <- acquireSlfjReporter(registry)
|
||||
metricsReporter <- acquireMetricsReporter(registry)
|
||||
_ = metricsReporter.start(reportingInterval.toMillis, TimeUnit.MILLISECONDS)
|
||||
} yield registry
|
||||
): Resource[MeterProvider] =
|
||||
ResourceOwner.forCloseable(() => metricOwner).acquire()
|
||||
|
||||
private def acquireSlfjReporter(
|
||||
registry: MetricRegistry
|
||||
)(implicit context: ResourceContext): Resource[Slf4jReporter] =
|
||||
Resource {
|
||||
Future.successful(newSlf4jReporter(registry))
|
||||
} { reporter =>
|
||||
Future(reporter.report()) // Trigger a report to the SLF4J logger on shutdown.
|
||||
.andThen { case _ => reporter.close() } // Gracefully shut down
|
||||
private def metricOwner = {
|
||||
val loggingMetricReader = PeriodicMetricReader
|
||||
.builder(new Slf4jMetricExporter(logger))
|
||||
.setInterval(reportingInterval.toMillis, TimeUnit.MILLISECONDS)
|
||||
.newMetricReaderFactory()
|
||||
val meterProviderBuilder = SdkMeterProvider
|
||||
.builder()
|
||||
reporter match {
|
||||
case MetricsReporter.Console => meterProviderBuilder.registerMetricReader(loggingMetricReader)
|
||||
case MetricsReporter.Prometheus(address) =>
|
||||
meterProviderBuilder
|
||||
.registerMetricReader(loggingMetricReader)
|
||||
.registerMetricReader(
|
||||
PrometheusHttpServer
|
||||
.builder()
|
||||
.setHost(address.getHostString)
|
||||
.setPort(address.getPort)
|
||||
.newMetricReaderFactory()
|
||||
)
|
||||
case _ => throw new IllegalArgumentException(s"Metric reporter $reporter not supported.")
|
||||
}
|
||||
meterProviderBuilder.build()
|
||||
}
|
||||
|
||||
private def acquireMetricsReporter(registry: MetricRegistry)(implicit
|
||||
context: ResourceContext
|
||||
): Resource[ScheduledReporter] =
|
||||
ResourceOwner.forCloseable(() => reporter.register(registry)).acquire()
|
||||
|
||||
private def newSlf4jReporter(registry: MetricRegistry): Slf4jReporter =
|
||||
Slf4jReporter
|
||||
.forRegistry(registry)
|
||||
.convertRatesTo(TimeUnit.SECONDS)
|
||||
.convertDurationsTo(TimeUnit.MILLISECONDS)
|
||||
.withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
|
||||
.outputTo(logger)
|
||||
.build()
|
||||
}
|
||||
|
@ -3,18 +3,19 @@
|
||||
|
||||
package com.daml.ledger.api.benchtool.metrics
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import java.time.{Clock, Duration}
|
||||
|
||||
import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig._
|
||||
import com.daml.ledger.api.benchtool.metrics.metrics.TotalRuntimeMetric
|
||||
import com.daml.ledger.api.benchtool.metrics.metrics.TotalRuntimeMetric.MaxDurationObjective
|
||||
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
|
||||
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
|
||||
import com.daml.ledger.api.v1.transaction_service.{
|
||||
GetTransactionTreesResponse,
|
||||
GetTransactionsResponse,
|
||||
}
|
||||
import com.daml.metrics.api.MetricHandle.MetricsFactory
|
||||
import com.google.protobuf.timestamp.Timestamp
|
||||
import java.time.{Clock, Duration}
|
||||
import com.daml.ledger.api.benchtool.metrics.metrics.TotalRuntimeMetric
|
||||
import com.daml.ledger.api.benchtool.metrics.metrics.TotalRuntimeMetric.MaxDurationObjective
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
@ -24,7 +25,7 @@ object MetricsSet {
|
||||
configO: Option[TransactionObjectives]
|
||||
): List[Metric[GetTransactionsResponse]] =
|
||||
transactionMetrics[GetTransactionsResponse](
|
||||
countingFunction = (response => countFlatTransactionsEvents(response).toInt),
|
||||
countingFunction = response => countFlatTransactionsEvents(response).toInt,
|
||||
sizingFunction = _.serializedSize.toLong,
|
||||
recordTimeFunction = _.transactions.collect {
|
||||
case t if t.effectiveAt.isDefined => t.getEffectiveAt
|
||||
@ -34,13 +35,11 @@ object MetricsSet {
|
||||
|
||||
def transactionExposedMetrics(
|
||||
streamName: String,
|
||||
registry: MetricRegistry,
|
||||
slidingTimeWindow: FiniteDuration,
|
||||
metricsFactory: MetricsFactory,
|
||||
): ExposedMetrics[GetTransactionsResponse] =
|
||||
ExposedMetrics[GetTransactionsResponse](
|
||||
streamName = streamName,
|
||||
registry = registry,
|
||||
slidingTimeWindow = Duration.ofNanos(slidingTimeWindow.toNanos),
|
||||
factory = metricsFactory,
|
||||
countingFunction = countFlatTransactionsEvents,
|
||||
sizingFunction = _.serializedSize.toLong,
|
||||
recordTimeFunction = Some(_.transactions.collect {
|
||||
@ -52,7 +51,7 @@ object MetricsSet {
|
||||
configO: Option[TransactionObjectives]
|
||||
): List[Metric[GetTransactionTreesResponse]] =
|
||||
transactionMetrics[GetTransactionTreesResponse](
|
||||
countingFunction = (response => countTreeTransactionsEvents(response).toInt),
|
||||
countingFunction = response => countTreeTransactionsEvents(response).toInt,
|
||||
sizingFunction = _.serializedSize.toLong,
|
||||
recordTimeFunction = _.transactions.collect {
|
||||
case t if t.effectiveAt.isDefined => t.getEffectiveAt
|
||||
@ -62,13 +61,11 @@ object MetricsSet {
|
||||
|
||||
def transactionTreesExposedMetrics(
|
||||
streamName: String,
|
||||
registry: MetricRegistry,
|
||||
slidingTimeWindow: FiniteDuration,
|
||||
metricsFactory: MetricsFactory,
|
||||
): ExposedMetrics[GetTransactionTreesResponse] =
|
||||
ExposedMetrics[GetTransactionTreesResponse](
|
||||
streamName = streamName,
|
||||
registry = registry,
|
||||
slidingTimeWindow = Duration.ofNanos(slidingTimeWindow.toNanos),
|
||||
factory = metricsFactory,
|
||||
countingFunction = countTreeTransactionsEvents,
|
||||
sizingFunction = _.serializedSize.toLong,
|
||||
recordTimeFunction = Some(_.transactions.collect {
|
||||
@ -98,14 +95,12 @@ object MetricsSet {
|
||||
|
||||
def activeContractsExposedMetrics(
|
||||
streamName: String,
|
||||
registry: MetricRegistry,
|
||||
slidingTimeWindow: FiniteDuration,
|
||||
metricsFactory: MetricsFactory,
|
||||
): ExposedMetrics[GetActiveContractsResponse] =
|
||||
ExposedMetrics[GetActiveContractsResponse](
|
||||
streamName = streamName,
|
||||
registry = registry,
|
||||
slidingTimeWindow = Duration.ofNanos(slidingTimeWindow.toNanos),
|
||||
countingFunction = (response) => countActiveContracts(response).toLong,
|
||||
factory = metricsFactory,
|
||||
countingFunction = response => countActiveContracts(response).toLong,
|
||||
sizingFunction = _.serializedSize.toLong,
|
||||
recordTimeFunction = None,
|
||||
)
|
||||
@ -132,14 +127,12 @@ object MetricsSet {
|
||||
|
||||
def completionsExposedMetrics(
|
||||
streamName: String,
|
||||
registry: MetricRegistry,
|
||||
slidingTimeWindow: FiniteDuration,
|
||||
metricsFactory: MetricsFactory,
|
||||
): ExposedMetrics[CompletionStreamResponse] =
|
||||
ExposedMetrics[CompletionStreamResponse](
|
||||
streamName = streamName,
|
||||
registry = registry,
|
||||
slidingTimeWindow = Duration.ofNanos(slidingTimeWindow.toNanos),
|
||||
countingFunction = (response) => countCompletions(response).toLong,
|
||||
factory = metricsFactory,
|
||||
countingFunction = response => countCompletions(response).toLong,
|
||||
sizingFunction = _.serializedSize.toLong,
|
||||
recordTimeFunction = None,
|
||||
)
|
||||
|
@ -0,0 +1,29 @@
|
||||
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.api.benchtool.metrics
|
||||
|
||||
import java.util
|
||||
|
||||
import io.opentelemetry.sdk.common.CompletableResultCode
|
||||
import io.opentelemetry.sdk.metrics.`export`.MetricExporter
|
||||
import io.opentelemetry.sdk.metrics.data.MetricData
|
||||
import org.slf4j.Logger
|
||||
|
||||
import scala.jdk.CollectionConverters.CollectionHasAsScala
|
||||
|
||||
class Slf4jMetricExporter(logger: Logger) extends MetricExporter {
|
||||
|
||||
override def `export`(
|
||||
metrics: util.Collection[MetricData]
|
||||
): CompletableResultCode = {
|
||||
logger.debug(s"Logging ${metrics.size()} metrics")
|
||||
metrics.asScala.foreach(metricData => logger.debug(s"metric: $metricData"))
|
||||
CompletableResultCode.ofSuccess()
|
||||
}
|
||||
|
||||
override def flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
|
||||
|
||||
override def shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
|
||||
|
||||
}
|
@ -6,7 +6,6 @@ package com.daml.ledger.api.benchtool.submission
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.scaladsl.{Sink, Source}
|
||||
import akka.stream.{Materializer, OverflowStrategy}
|
||||
import com.codahale.metrics.{MetricRegistry, Timer}
|
||||
import com.daml.ledger.api.benchtool.config.WorkflowConfig.SubmissionConfig
|
||||
import com.daml.ledger.api.benchtool.infrastructure.TestDars
|
||||
import com.daml.ledger.api.benchtool.metrics.LatencyMetric.LatencyNanos
|
||||
@ -16,6 +15,8 @@ import com.daml.ledger.api.v1.commands.{Command, Commands}
|
||||
import com.daml.ledger.client
|
||||
import com.daml.ledger.client.binding.Primitive
|
||||
import com.daml.ledger.resources.{ResourceContext, ResourceOwner}
|
||||
import com.daml.metrics.api.MetricHandle.{MetricsFactory, Timer}
|
||||
import com.daml.metrics.api.MetricName
|
||||
import io.grpc.Status
|
||||
import org.slf4j.LoggerFactory
|
||||
import scalaz.syntax.tag._
|
||||
@ -30,7 +31,7 @@ case class CommandSubmitter(
|
||||
benchtoolUserServices: LedgerApiServices,
|
||||
adminServices: LedgerApiServices,
|
||||
partyAllocating: PartyAllocating,
|
||||
metricRegistry: MetricRegistry,
|
||||
metricsFactory: MetricsFactory,
|
||||
metricsManager: MetricsManager[LatencyNanos],
|
||||
waitForSubmission: Boolean,
|
||||
commandGenerationParallelism: Int = 8,
|
||||
@ -38,9 +39,9 @@ case class CommandSubmitter(
|
||||
) {
|
||||
private val logger = LoggerFactory.getLogger(getClass)
|
||||
private val submitLatencyTimer = if (waitForSubmission) {
|
||||
metricRegistry.timer("daml_submit_and_wait_latency")
|
||||
metricsFactory.timer(MetricName("daml_submit_and_wait_latency"))
|
||||
} else {
|
||||
metricRegistry.timer("daml_submit_latency")
|
||||
metricsFactory.timer(MetricName("daml_submit_latency"))
|
||||
}
|
||||
|
||||
def prepare(config: SubmissionConfig)(implicit
|
||||
@ -235,10 +236,12 @@ case class CommandSubmitter(
|
||||
private def timed[O](timer: Timer, metricsManager: MetricsManager[LatencyNanos])(
|
||||
f: => Future[O]
|
||||
)(implicit ec: ExecutionContext) = {
|
||||
val ctx = timer.time()
|
||||
val ctx = timer.startAsync()
|
||||
val startNanos = System.nanoTime()
|
||||
f.map(_.tap { _ =>
|
||||
val latencyNanos = ctx.stop()
|
||||
metricsManager.sendNewValue(latencyNanos)
|
||||
ctx.stop()
|
||||
val endNanos = System.nanoTime()
|
||||
metricsManager.sendNewValue(endNanos - startNanos)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,6 @@ package com.daml.ledger.api.benchtool
|
||||
|
||||
import java.io.File
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.bazeltools.BazelRunfiles.rlocation
|
||||
import com.daml.ledger.api.benchtool.config.WorkflowConfig
|
||||
import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager
|
||||
@ -20,6 +19,7 @@ import com.daml.ledger.api.benchtool.submission.{
|
||||
}
|
||||
import com.daml.ledger.test.BenchtoolTestDar
|
||||
import com.daml.lf.language.LanguageVersion
|
||||
import com.daml.metrics.api.noop.NoOpMetricsFactory
|
||||
import com.daml.platform.sandbox.fixture.SandboxFixture
|
||||
import org.scalatest.Suite
|
||||
|
||||
@ -50,7 +50,7 @@ trait BenchtoolSandboxFixture extends SandboxFixture {
|
||||
names = names,
|
||||
benchtoolUserServices = apiServices,
|
||||
adminServices = apiServices,
|
||||
metricRegistry = new MetricRegistry,
|
||||
metricsFactory = NoOpMetricsFactory,
|
||||
metricsManager = NoOpMetricsManager(),
|
||||
waitForSubmission = true,
|
||||
partyAllocating = new PartyAllocating(
|
||||
|
Loading…
Reference in New Issue
Block a user