Add labels through the MetricsContext in the metrics API [PLEN-5] (#15349)

This commit is contained in:
Nicu Reut 2022-10-27 09:26:03 +02:00 committed by GitHub
parent 71fb7ea78c
commit e8c7e29ba2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 309 additions and 118 deletions

View File

@ -4,6 +4,7 @@
package com.daml.caching
import com.daml.metrics.CacheMetrics
import com.daml.metrics.api.MetricsContext
import com.github.benmanes.caffeine.cache.RemovalCause
import com.github.benmanes.caffeine.cache.stats.{CacheStats, StatsCounter}
@ -12,10 +13,10 @@ private[caching] final class DropwizardStatsCounter(
) extends StatsCounter {
override def recordHits(newHits: Int): Unit =
metrics.hitCount.inc(newHits.toLong)
metrics.hitCount.inc(newHits.toLong)(MetricsContext.Empty)
override def recordMisses(newMisses: Int): Unit =
metrics.missCount.inc(newMisses.toLong)
metrics.missCount.inc(newMisses.toLong)(MetricsContext.Empty)
override def recordLoadSuccess(loadTimeNanos: Long): Unit = ()
@ -23,7 +24,7 @@ private[caching] final class DropwizardStatsCounter(
override def recordEviction(weight: Int, cause: RemovalCause): Unit = {
metrics.evictionCount.inc()
metrics.evictionWeight.inc(weight.toLong)
metrics.evictionWeight.inc(weight.toLong)(MetricsContext.Empty)
}
override def snapshot(): CacheStats = CacheStats.empty

View File

@ -11,7 +11,7 @@ import com.codahale.{metrics => codahale}
import com.daml.ledger.api.benchtool.util.TimeUtil
import com.daml.metrics.api.MetricHandle.{Counter, Gauge, Histogram}
import com.daml.metrics.api.dropwizard.{DropwizardCounter, DropwizardGauge, DropwizardHistogram}
import com.daml.metrics.api.{Gauges, MetricName}
import com.daml.metrics.api.{Gauges, MetricName, MetricsContext}
import com.google.protobuf.timestamp.Timestamp
final class ExposedMetrics[T](
@ -22,8 +22,10 @@ final class ExposedMetrics[T](
clock: Clock,
) {
def onNext(elem: T): Unit = {
counterMetric.counter.inc(counterMetric.countingFunction(elem))
bytesProcessedMetric.bytesProcessed.inc(bytesProcessedMetric.sizingFunction(elem))
counterMetric.counter.inc(counterMetric.countingFunction(elem))(MetricsContext.Empty)
bytesProcessedMetric.bytesProcessed.inc(bytesProcessedMetric.sizingFunction(elem))(
MetricsContext.Empty
)
delayMetric.foreach { metric =>
val now = clock.instant()
metric
@ -94,7 +96,10 @@ object ExposedMetrics {
latestRecordTime = DropwizardGauge(
Prefix :+ "latest_record_time" :+ streamName,
registry
.register(Prefix :+ "latest_record_time" :+ streamName, new Gauges.VarGauge[Long](0L)),
.register(
Prefix :+ "latest_record_time" :+ streamName,
new Gauges.VarGauge[Long](0L)(MetricsContext.Empty),
),
),
recordTimeFunction = f,
)

View File

@ -8,6 +8,7 @@ import akka.stream.scaladsl.BidiFlow
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, BidiShape, Inlet, Outlet}
import com.daml.metrics.api.MetricHandle.Counter
import com.daml.metrics.api.MetricsContext
import org.slf4j.LoggerFactory
/** Enforces that at most [[maxInFlight]] items traverse the flow underneath this one.
@ -19,7 +20,7 @@ import org.slf4j.LoggerFactory
class MaxInFlight[I, O](maxInFlight: Int, capacityCounter: Counter, lengthCounter: Counter)
extends GraphStage[BidiShape[I, I, O, O]] {
capacityCounter.inc(maxInFlight.toLong)
capacityCounter.inc(maxInFlight.toLong)(MetricsContext.Empty)
private val logger = LoggerFactory.getLogger(MaxInFlight.getClass.getName)
@ -100,12 +101,12 @@ class MaxInFlight[I, O](maxInFlight: Int, capacityCounter: Counter, lengthCounte
}
override def onUpstreamFinish(): Unit = {
capacityCounter.dec(maxInFlight.toLong)
capacityCounter.dec(maxInFlight.toLong)(MetricsContext.Empty)
super.onUpstreamFinish()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
capacityCounter.dec(maxInFlight.toLong)
capacityCounter.dec(maxInFlight.toLong)(MetricsContext.Empty)
fail(out2, ex)
completeStage()
}

View File

@ -7,7 +7,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import com.daml.metrics.api.MetricDoc.MetricQualification.Debug
import com.daml.metrics.api.MetricHandle.Counter
import com.daml.metrics.api.dropwizard.DropwizardFactory
import com.daml.metrics.api.{MetricDoc, MetricName}
import com.daml.metrics.api.{MetricDoc, MetricName, MetricsContext}
final class CacheMetrics(override val prefix: MetricName, override val registry: MetricRegistry)
extends DropwizardFactory {
@ -43,8 +43,8 @@ final class CacheMetrics(override val prefix: MetricName, override val registry:
val evictionWeight: Counter = counter(prefix :+ "evicted_weight")
def registerSizeGauge(sizeGauge: Gauge[Long]): Unit =
gaugeWithSupplier(prefix :+ "size", () => () => sizeGauge.getValue)
gaugeWithSupplier(prefix :+ "size", () => () => sizeGauge.getValue -> MetricsContext.Empty)
def registerWeightGauge(weightGauge: Gauge[Long]): Unit =
gaugeWithSupplier(prefix :+ "weight", () => () => weightGauge.getValue)
gaugeWithSupplier(prefix :+ "weight", () => () => weightGauge.getValue -> MetricsContext.Empty)
}

View File

@ -9,7 +9,7 @@ import com.codahale.metrics.MetricRegistry
import com.daml.metrics.api.MetricDoc.MetricQualification.Debug
import com.daml.metrics.api.MetricHandle.Gauge
import com.daml.metrics.api.dropwizard.{DropwizardFactory, DropwizardGauge}
import com.daml.metrics.api.{MetricDoc, MetricName}
import com.daml.metrics.api.{MetricDoc, MetricName, MetricsContext}
class IndexerMetrics(override val prefix: MetricName, override val registry: MetricRegistry)
extends DropwizardFactory {
@ -59,6 +59,7 @@ class IndexerMetrics(override val prefix: MetricName, override val registry: Met
gaugeWithSupplier(
prefix :+ "current_record_time_lag",
() => () => Instant.now().toEpochMilli - lastReceivedRecordTime.getValue,
() =>
() => (Instant.now().toEpochMilli - lastReceivedRecordTime.getValue) -> MetricsContext.Empty,
)
}

View File

@ -7,6 +7,7 @@ import akka.stream.scaladsl.{Flow, Source}
import akka.stream.{BoundedSourceQueue, Materializer, OverflowStrategy, QueueOfferResult}
import com.daml.metrics.api.MetricHandle.Timer.TimerHandle
import com.daml.metrics.api.MetricHandle.{Counter, Timer}
import com.daml.metrics.api.MetricsContext
import scala.util.chaining._
@ -22,7 +23,7 @@ object InstrumentedGraph {
override def complete(): Unit = {
delegate.complete()
capacityCounter.dec(bufferSize.toLong)
capacityCounter.dec(bufferSize.toLong)(MetricsContext.Empty)
}
override def size(): Int = bufferSize
@ -80,7 +81,7 @@ object InstrumentedGraph {
lengthCounter,
delayTimer,
)
capacityCounter.inc(bufferSize.toLong)
capacityCounter.inc(bufferSize.toLong)(MetricsContext.Empty)
source.mapMaterializedValue(_ => instrumentedQueue).map { case (timer, item) =>
timer.stop()

View File

@ -9,6 +9,7 @@ import akka.Done
import akka.stream.scaladsl.{Keep, Source}
import com.daml.concurrent
import com.daml.metrics.api.MetricHandle.{Counter, Meter, Timer}
import com.daml.metrics.api.MetricsContext.withEmptyMetricsContext
import scala.concurrent.{ExecutionContext, Future}
@ -17,11 +18,12 @@ object Timed {
def value[T](timer: Timer, value: => T): T =
timer.time(value)
def trackedValue[T](meter: Meter, value: => T): T = {
meter.mark(+1)
val result = value
meter.mark(-1)
result
def trackedValue[T](meter: Meter, value: => T): T = withEmptyMetricsContext {
implicit metricContext =>
meter.mark(+1)
val result = value
meter.mark(-1)
result
}
def timedAndTrackedValue[T](timer: Timer, meter: Meter, value: => T): T = {
@ -36,13 +38,14 @@ object Timed {
}
}
def trackedCompletionStage[T](meter: Meter, future: => CompletionStage[T]): CompletionStage[T] = {
meter.mark(+1)
future.whenComplete { (_, _) =>
meter.mark(-1)
()
def trackedCompletionStage[T](meter: Meter, future: => CompletionStage[T]): CompletionStage[T] =
withEmptyMetricsContext { implicit metricsContext =>
meter.mark(+1)
future.whenComplete { (_, _) =>
meter.mark(-1)
()
}
}
}
def timedAndTrackedCompletionStage[T](
timer: Timer,
@ -68,9 +71,10 @@ object Timed {
future.andThen { case _ => counter.dec() }(ExecutionContext.parasitic)
}
def trackedFuture[T](meter: Meter, future: => Future[T]): Future[T] = {
meter.mark(+1)
future.andThen { case _ => meter.mark(-1) }(ExecutionContext.parasitic)
def trackedFuture[T](meter: Meter, future: => Future[T]): Future[T] = withEmptyMetricsContext {
implicit metricsContext =>
meter.mark(+1)
future.andThen { case _ => meter.mark(-1) }(ExecutionContext.parasitic)
}
def timedAndTrackedFuture[T](timer: Timer, counter: Counter, future: => Future[T]): Future[T] = {

View File

@ -11,15 +11,25 @@ object Gauges {
trait GaugeWithUpdate[T] extends Gauge[T] {
def updateValue(x: T): Unit
def updateValue(x: T)(implicit
context: MetricsContext
): Unit
}
case class VarGauge[T](initial: T) extends GaugeWithUpdate[T] {
private val ref = new AtomicReference[T](initial)
def updateValue(x: T): Unit = ref.set(x)
def updateValue(up: T => T): Unit = {
val _ = ref.updateAndGet(up(_))
case class VarGauge[T](initial: T)(implicit context: MetricsContext) extends GaugeWithUpdate[T] {
private val ref = new AtomicReference[(T, MetricsContext)](initial -> context)
def updateValue(x: T)(implicit
context: MetricsContext
): Unit = ref.set(x -> context)
def updateValue(up: T => T)(implicit
context: MetricsContext
): Unit = {
val _ = ref.updateAndGet { case (value, _) =>
up(value) -> context
}
}
override def getValue: T = ref.get()
override def getValue: T = ref.get()._1
def getValueAndContext: (T, MetricsContext) = ref.get()
}
}

View File

@ -23,11 +23,13 @@ object MetricHandle {
def timer(name: MetricName): Timer
def gauge[T](name: MetricName, initial: T): Gauge[T]
def gauge[T](name: MetricName, initial: T)(implicit
context: MetricsContext
): Gauge[T]
def gaugeWithSupplier[T](
name: MetricName,
gaugeSupplier: () => () => T,
gaugeSupplier: () => () => (T, MetricsContext),
): Unit
def meter(name: MetricName): Meter
@ -42,15 +44,25 @@ object MetricHandle {
def metricType: String = "Timer"
def update(duration: Long, unit: TimeUnit): Unit
def update(duration: Long, unit: TimeUnit)(implicit
context: MetricsContext = MetricsContext.Empty
): Unit
def update(duration: Duration): Unit
def update(duration: Duration)(implicit
context: MetricsContext
): Unit
def time[T](call: => T): T
def time[T](call: => T)(implicit
context: MetricsContext = MetricsContext.Empty
): T
def startAsync(): TimerHandle
def startAsync()(implicit
context: MetricsContext = MetricsContext.Empty
): TimerHandle
def timeFuture[T](call: => Future[T]): Future[T] = {
def timeFuture[T](call: => Future[T])(implicit
context: MetricsContext = MetricsContext.Empty
): Future[T] = {
val timer = startAsync()
val result = call
result.onComplete(_ => timer.stop())(ExecutionContext.parasitic)
@ -69,9 +81,13 @@ object MetricHandle {
trait Gauge[T] extends MetricHandle {
def metricType: String = "Gauge"
def updateValue(newValue: T): Unit
def updateValue(newValue: T)(implicit
context: MetricsContext = MetricsContext.Empty
): Unit
def updateValue(f: T => T): Unit = updateValue(f(getValue))
def updateValue(f: T => T)(implicit
context: MetricsContext
): Unit = updateValue(f(getValue))
def getValue: T
}
@ -79,26 +95,42 @@ object MetricHandle {
trait Meter extends MetricHandle {
def metricType: String = "Meter"
def mark(): Unit = mark(1)
def mark(value: Long): Unit
def mark()(implicit
context: MetricsContext = MetricsContext.Empty
): Unit = mark(1)
def mark(value: Long)(implicit
context: MetricsContext
): Unit
}
trait Counter extends MetricHandle {
override def metricType: String = "Counter"
def inc(): Unit
def inc(n: Long): Unit
def dec(): Unit
def dec(n: Long): Unit
def inc()(implicit
context: MetricsContext = MetricsContext.Empty
): Unit
def inc(n: Long)(implicit
context: MetricsContext
): Unit
def dec()(implicit
context: MetricsContext = MetricsContext.Empty
): Unit
def dec(n: Long)(implicit
context: MetricsContext
): Unit
def getCount: Long
}
trait Histogram extends MetricHandle {
def metricType: String = "Histogram"
def update(value: Long): Unit
def update(value: Int): Unit
def update(value: Long)(implicit
context: MetricsContext = MetricsContext.Empty
): Unit
def update(value: Int)(implicit
context: MetricsContext
): Unit
}

View File

@ -0,0 +1,26 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.metrics.api
import io.opentelemetry.api.common.Attributes
case class MetricsContext(labels: Map[String, String]) {
lazy val asAttributes: Attributes = {
labels
.foldLeft(Attributes.builder()) { case (builder, (key, value)) =>
builder.put(key, value)
}
.build()
}
}
object MetricsContext {
val Empty: MetricsContext = MetricsContext(Map.empty)
def withEmptyMetricsContext[T](run: MetricsContext => T): T = run(Empty)
}

View File

@ -7,7 +7,7 @@ import com.codahale.{metrics => codahale}
import com.daml.metrics.DatabaseMetrics
import com.daml.metrics.api.Gauges.VarGauge
import com.daml.metrics.api.MetricHandle.{Counter, Factory, Gauge, Histogram, Meter, Timer}
import com.daml.metrics.api.{Gauges, MetricName}
import com.daml.metrics.api.{Gauges, MetricName, MetricsContext}
import scala.concurrent.blocking
@ -15,16 +15,18 @@ trait DropwizardFactory extends Factory {
def registry: codahale.MetricRegistry
def timer(name: MetricName): Timer = DropwizardTimer(name, registry.timer(name))
override def timer(name: MetricName): Timer = DropwizardTimer(name, registry.timer(name))
def gauge[T](name: MetricName, initial: T): Gauge[T] = {
override def gauge[T](name: MetricName, initial: T)(implicit
context: MetricsContext = MetricsContext.Empty
): Gauge[T] = {
val registeredgauge = reRegisterGauge[T, VarGauge[T]](name, Gauges.VarGauge(initial))
DropwizardGauge(name, registeredgauge)
}
def gaugeWithSupplier[T](
override def gaugeWithSupplier[T](
name: MetricName,
gaugeSupplier: () => () => T,
gaugeSupplier: () => () => (T, MetricsContext),
): Unit =
synchronized {
registry.remove(name)
@ -32,23 +34,23 @@ trait DropwizardFactory extends Factory {
name,
() => {
val valueGetter = gaugeSupplier()
new codahale.Gauge[T] { override def getValue: T = valueGetter() }
new codahale.Gauge[T] { override def getValue: T = valueGetter()._1 }
},
)
()
}
def meter(name: MetricName): Meter = {
override def meter(name: MetricName): Meter = {
// This is idempotent
DropwizardMeter(name, registry.meter(name))
}
def counter(name: MetricName): Counter = {
override def counter(name: MetricName): Counter = {
// This is idempotent
DropwizardCounter(name, registry.counter(name))
}
def histogram(name: MetricName): Histogram = {
override def histogram(name: MetricName): Histogram = {
DropwizardHistogram(name, registry.histogram(name))
}

View File

@ -9,15 +9,23 @@ import java.util.concurrent.TimeUnit
import com.codahale.{metrics => codahale}
import com.daml.metrics.api.MetricHandle.Timer.TimerHandle
import com.daml.metrics.api.MetricHandle.{Counter, Gauge, Histogram, Meter, Timer}
import com.daml.metrics.api.{Gauges, MetricHandle}
import com.daml.metrics.api.{Gauges, MetricHandle, MetricsContext}
case class DropwizardTimer(name: String, metric: codahale.Timer) extends Timer {
def update(duration: Long, unit: TimeUnit): Unit = metric.update(duration, unit)
def update(duration: Long, unit: TimeUnit)(implicit
context: MetricsContext = MetricsContext.Empty
): Unit = metric.update(duration, unit)
def update(duration: Duration): Unit = metric.update(duration)
override def time[T](call: => T): T = metric.time(() => call)
override def startAsync(): TimerHandle = {
def update(duration: Duration)(implicit
context: MetricsContext
): Unit = metric.update(duration)
override def time[T](call: => T)(implicit
context: MetricsContext = MetricsContext.Empty
): T = metric.time(() => call)
override def startAsync()(implicit
context: MetricsContext = MetricsContext.Empty
): TimerHandle = {
val ctx = metric.time()
() => {
ctx.stop()
@ -28,28 +36,44 @@ case class DropwizardTimer(name: String, metric: codahale.Timer) extends Timer {
sealed case class DropwizardMeter(name: String, metric: codahale.Meter) extends Meter {
def mark(value: Long): Unit = metric.mark(value)
def mark(value: Long)(implicit
context: MetricsContext
): Unit = metric.mark(value)
}
sealed case class DropwizardCounter(name: String, metric: codahale.Counter) extends Counter {
override def inc(): Unit = metric.inc
override def inc(n: Long): Unit = metric.inc(n)
override def dec(): Unit = metric.dec
override def dec(n: Long): Unit = metric.dec(n)
override def inc()(implicit
context: MetricsContext = MetricsContext.Empty
): Unit = metric.inc
override def inc(n: Long)(implicit
context: MetricsContext
): Unit = metric.inc(n)
override def dec()(implicit
context: MetricsContext = MetricsContext.Empty
): Unit = metric.dec
override def dec(n: Long)(implicit
context: MetricsContext
): Unit = metric.dec(n)
override def getCount: Long = metric.getCount
}
sealed case class DropwizardGauge[T](name: String, metric: Gauges.VarGauge[T]) extends Gauge[T] {
def updateValue(newValue: T): Unit = metric.updateValue(newValue)
def updateValue(newValue: T)(implicit
context: MetricsContext = MetricsContext.Empty
): Unit = metric.updateValue(newValue)
override def getValue: T = metric.getValue
}
sealed case class DropwizardHistogram(name: String, metric: codahale.Histogram)
extends MetricHandle
with Histogram {
override def update(value: Long): Unit = metric.update(value)
override def update(value: Int): Unit = metric.update(value)
override def update(value: Long)(implicit
context: MetricsContext = MetricsContext.Empty
): Unit = metric.update(value)
override def update(value: Int)(implicit
context: MetricsContext
): Unit = metric.update(value)
}

View File

@ -8,10 +8,19 @@ import java.util.concurrent.TimeUnit
import com.daml.metrics.api.MetricHandle.Timer
import com.daml.metrics.api.MetricHandle.Timer.TimerHandle
import com.daml.metrics.api.MetricsContext
sealed case class NoOpTimer(name: String) extends Timer {
override def update(duration: Long, unit: TimeUnit): Unit = ()
override def update(duration: Duration): Unit = ()
override def time[T](call: => T): T = call
override def startAsync(): TimerHandle = () => ()
override def update(duration: Long, unit: TimeUnit)(implicit
context: MetricsContext = MetricsContext.Empty
): Unit = ()
override def update(duration: Duration)(implicit
context: MetricsContext
): Unit = ()
override def time[T](call: => T)(implicit
context: MetricsContext = MetricsContext.Empty
): T = call
override def startAsync()(implicit
context: MetricsContext = MetricsContext.Empty
): TimerHandle = () => ()
}

View File

@ -9,7 +9,7 @@ import java.util.concurrent.TimeUnit
import com.daml.metrics.api.Gauges.VarGauge
import com.daml.metrics.api.MetricHandle.Timer.TimerHandle
import com.daml.metrics.api.MetricHandle.{Counter, Factory, Gauge, Histogram, Meter, Timer}
import com.daml.metrics.api.{MetricHandle, MetricName}
import com.daml.metrics.api.{MetricHandle, MetricName, MetricsContext}
import io.opentelemetry.api.metrics.{
LongCounter,
LongHistogram,
@ -24,34 +24,68 @@ trait OpenTelemetryFactory extends Factory {
name: MetricName
): MetricHandle.Timer =
OpentelemetryTimer(name, otelMeter.histogramBuilder(name).ofLongs().setUnit("ms").build())
override def gauge[T](name: MetricName, initial: T): MetricHandle.Gauge[T] = {
override def gauge[T](name: MetricName, initial: T)(implicit
context: MetricsContext = MetricsContext.Empty
): MetricHandle.Gauge[T] = {
initial match {
case longInitial: Int =>
val varGauge = new VarGauge[Int](longInitial)
otelMeter.gaugeBuilder(name).ofLongs().buildWithCallback { consumer =>
val (value, context) = varGauge.getValueAndContext
consumer.record(value.toLong, context.asAttributes)
}
OpentelemetryGauge(name, varGauge.asInstanceOf[VarGauge[T]])
case longInitial: Long =>
val varGauge = new VarGauge[Long](longInitial)
otelMeter.gaugeBuilder(name).ofLongs().buildWithCallback(_.record(varGauge.getValue))
otelMeter.gaugeBuilder(name).ofLongs().buildWithCallback { consumer =>
val (value, context) = varGauge.getValueAndContext
consumer.record(value, context.asAttributes)
}
OpentelemetryGauge(name, varGauge.asInstanceOf[VarGauge[T]])
case doubleInitial: Double =>
val varGauge = new VarGauge[Double](doubleInitial)
otelMeter.gaugeBuilder(name).buildWithCallback(_.record(varGauge.getValue))
otelMeter.gaugeBuilder(name).buildWithCallback { consumer =>
val (value, context) = varGauge.getValueAndContext
consumer.record(value, context.asAttributes)
}
OpentelemetryGauge(name, varGauge.asInstanceOf[VarGauge[T]])
case _ =>
// A NoOp guage as opentelemetry only supports longs and doubles
OpentelemetryGauge(name, VarGauge(initial))
}
}
override def gaugeWithSupplier[T](name: MetricName, gaugeSupplier: () => () => T): Unit = {
override def gaugeWithSupplier[T](
name: MetricName,
gaugeSupplier: () => () => (T, MetricsContext),
): Unit = {
val valueSupplier = gaugeSupplier()
valueSupplier() match {
valueSupplier()._1 match {
case _: Int =>
otelMeter
.gaugeBuilder(name)
.ofLongs()
.buildWithCallback { consumer =>
val (value, context) = valueSupplier()
consumer.record(value.asInstanceOf[Int].toLong, context.asAttributes)
}
()
case _: Long =>
otelMeter
.gaugeBuilder(name)
.ofLongs()
.buildWithCallback(_.record(valueSupplier().asInstanceOf[Long]))
.buildWithCallback { consumer =>
val (value, context) = valueSupplier()
consumer.record(value.asInstanceOf[Long], context.asAttributes)
}
()
case _: Double =>
otelMeter
.gaugeBuilder(name)
.buildWithCallback(_.record(valueSupplier().asInstanceOf[Double]))
.buildWithCallback { consumer =>
val (value, context) = valueSupplier()
consumer.record(value.asInstanceOf[Double], context.asAttributes)
}
()
case _ =>
// NoOp as opentelemetry only supports longs and doubles
@ -72,42 +106,74 @@ trait OpenTelemetryFactory extends Factory {
}
case class OpentelemetryTimer(name: String, histogram: LongHistogram) extends Timer {
override def update(duration: Long, unit: TimeUnit): Unit =
histogram.record(TimeUnit.MILLISECONDS.convert(duration, unit))
override def time[T](call: => T): T = {
override def update(duration: Long, unit: TimeUnit)(implicit
context: MetricsContext
): Unit =
histogram.record(
TimeUnit.MILLISECONDS.convert(duration, unit),
context.asAttributes,
)
override def time[T](call: => T)(implicit
context: MetricsContext
): T = {
val start = System.nanoTime()
val result = call
histogram.record(TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS))
histogram.record(
TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS),
context.asAttributes,
)
result
}
override def startAsync(): TimerHandle = {
override def startAsync()(implicit
context: MetricsContext
): TimerHandle = {
val start = System.nanoTime()
() =>
histogram.record(
TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS)
TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS),
context.asAttributes,
)
}
override def update(duration: Duration): Unit = update(duration.toNanos, TimeUnit.NANOSECONDS)
override def update(duration: Duration)(implicit
context: MetricsContext
): Unit = update(duration.toNanos, TimeUnit.NANOSECONDS)
}
case class OpentelemetryGauge[T](name: String, varGauge: VarGauge[T]) extends Gauge[T] {
override def updateValue(newValue: T): Unit = varGauge.updateValue(newValue)
override def updateValue(newValue: T)(implicit
context: MetricsContext
): Unit = varGauge.updateValue(newValue)
override def getValue: T = varGauge.getValue
}
case class OpentelemetryMeter(name: String, counter: LongCounter) extends Meter {
override def mark(value: Long): Unit = counter.add(value)
override def mark(value: Long)(implicit
context: MetricsContext
): Unit = counter.add(value, context.asAttributes)
}
case class OpentelemetryCounter(name: String, counter: LongUpDownCounter) extends Counter {
override def inc(): Unit = counter.add(1)
override def inc(n: Long): Unit = counter.add(n)
override def dec(): Unit = counter.add(-1)
override def dec(n: Long): Unit = counter.add(-n)
override def inc()(implicit
context: MetricsContext
): Unit = counter.add(1, context.asAttributes)
override def inc(n: Long)(implicit
context: MetricsContext
): Unit = counter.add(n, context.asAttributes)
override def dec()(implicit
context: MetricsContext
): Unit = counter.add(-1, context.asAttributes)
override def dec(n: Long)(implicit
context: MetricsContext
): Unit = counter.add(-n, context.asAttributes)
override def getCount: Long = 0 // Not supported by OpenTelemetry
}
case class OpentelemetryHistogram(name: String, histogram: LongHistogram) extends Histogram {
override def update(value: Long): Unit = histogram.record(value)
override def update(value: Int): Unit = histogram.record(value.toLong)
override def update(value: Long)(implicit
context: MetricsContext
): Unit = histogram.record(value, context.asAttributes)
override def update(value: Int)(implicit
context: MetricsContext
): Unit = histogram.record(value.toLong, context.asAttributes)
}

View File

@ -21,8 +21,10 @@ import com.daml.platform.store.backend._
import com.daml.platform.store.dao.DbDispatcher
import com.daml.platform.store.dao.events.{CompressionStrategy, LfValueTranslation}
import com.daml.platform.store.interning.{InternizingStringInterningView, StringInterning}
import java.sql.Connection
import com.daml.metrics.api.MetricsContext
import scala.concurrent.Future
private[platform] case class ParallelIndexerSubscription[DB_BATCH](
@ -132,7 +134,7 @@ object ParallelIndexerSubscription {
)(implicit
loggingContext: LoggingContext
): Iterable[(Offset, state.Update)] => Batch[Vector[DbDto]] = { input =>
metrics.daml.parallelIndexer.inputMapping.batchSize.update(input.size)
metrics.daml.parallelIndexer.inputMapping.batchSize.update(input.size)(MetricsContext.Empty)
input.foreach { case (offset, update) =>
withEnrichedLoggingContext("offset" -> offset, "update" -> update) {
implicit loggingContext =>
@ -240,7 +242,7 @@ object ParallelIndexerSubscription {
withEnrichedLoggingContext("updateOffsets" -> batch.offsetsUpdates.map(_._1)) {
implicit loggingContext =>
dbDispatcher.executeSql(metrics.daml.parallelIndexer.ingestion) { connection =>
metrics.daml.parallelIndexer.updates.inc(batch.batchSize.toLong)
metrics.daml.parallelIndexer.updates.inc(batch.batchSize.toLong)(MetricsContext.Empty)
ingestFunction(connection, batch.batch)
cleanUnusedBatch(zeroDbBatch)(batch)
}

View File

@ -5,6 +5,7 @@ package com.daml.platform.store.cache
import com.daml.ledger.offset.Offset
import com.daml.logging.ContextualizedLogger
import com.daml.metrics.api.MetricsContext
import com.daml.metrics.{Metrics, Timed}
import com.daml.platform.store.cache.InMemoryFanoutBuffer._
import com.daml.platform.store.interfaces.TransactionLogUpdate
@ -142,7 +143,7 @@ class InMemoryFanoutBuffer(
val currentLookupMapSize = _lookupMap.size
if (currentLookupMapSize <= currentBufferLogSize) {
bufferSizeHistogram.update(currentBufferLogSize)
bufferSizeHistogram.update(currentBufferLogSize)(MetricsContext.Empty)
if (currentBufferLogSize > targetSize) {
dropOldest(dropCount = currentBufferLogSize - targetSize)

View File

@ -7,6 +7,7 @@ import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.ledger.offset.Offset
import com.daml.logging.LoggingContext
import com.daml.metrics.api.MetricsContext
import com.daml.metrics.{Metrics, Timed}
import com.daml.platform.store.cache.InMemoryFanoutBuffer
import com.daml.platform.store.cache.InMemoryFanoutBuffer.BufferSlice
@ -84,7 +85,7 @@ class BufferedStreamsReader[PERSISTENCE_FETCH_ARGS, API_RESPONSE](
),
)
bufferReaderMetrics.sliceSize.update(bufferSlice.slice.size)
bufferReaderMetrics.sliceSize.update(bufferSlice.slice.size)(MetricsContext.Empty)
bufferSlice match {
case BufferSlice.Inclusive(slice) =>

View File

@ -6,6 +6,7 @@ package com.daml.platform.store.dao.events
import java.io.ByteArrayOutputStream
import com.daml.metrics.Metrics
import com.daml.metrics.api.MetricsContext
import com.daml.platform.store.serialization.Compression
final case class CompressionStrategy(
@ -62,8 +63,8 @@ object FieldCompressionStrategy {
}
val compressed = output.toByteArray
output.close()
metric.compressed.update(compressed.length)
metric.uncompressed.update(uncompressed.length)
metric.compressed.update(compressed.length)(MetricsContext.Empty)
metric.uncompressed.update(uncompressed.length)(MetricsContext.Empty)
compressed
},
)

View File

@ -7,9 +7,10 @@ import com.daml.ledger.participant.state.v2.ChangeId
import com.daml.ledger.sandbox.bridge.BridgeMetrics
import com.daml.ledger.sandbox.bridge.validate.DeduplicationState.DeduplicationQueue
import com.daml.lf.data.Time
import java.time.Duration
import com.daml.metrics.api.MetricsContext
case class DeduplicationState private (
private[validate] val deduplicationQueue: DeduplicationQueue,
private val maxDeduplicationDuration: Duration,
@ -30,7 +31,8 @@ case class DeduplicationState private (
s"Cannot deduplicate for a period ($commandDeduplicationDuration) longer than the max deduplication duration ($maxDeduplicationDuration).",
)
bridgeMetrics.Stages.Sequence.deduplicationQueueLength.update(deduplicationQueue.size)
bridgeMetrics.Stages.Sequence.deduplicationQueueLength
.update(deduplicationQueue.size)(MetricsContext.Empty)
val expiredTimestamp = expiredThreshold(maxDeduplicationDuration, recordTime)
val queueAfterEvictions = deduplicationQueue.withoutOlderThan(expiredTimestamp)

View File

@ -8,6 +8,7 @@ import com.daml.ledger.sandbox.bridge.BridgeMetrics
import com.daml.ledger.sandbox.bridge.validate.SequencerState.{LastUpdatedAt, SequencerQueue}
import com.daml.lf.transaction.GlobalKey
import com.daml.lf.value.Value.ContractId
import com.daml.metrics.api.MetricsContext.withEmptyMetricsContext
import scala.collection.Searching
import scala.util.chaining._
@ -70,11 +71,12 @@ case class SequencerState private (
)
}
private def updateMetrics(newState: SequencerState): Unit = {
val stateMetrics = bridgeMetrics.Stages.Sequence
stateMetrics.sequencerQueueLength.update(newState.sequencerQueue.length)
stateMetrics.keyStateSize.update(newState.keyState.size)
stateMetrics.consumedContractsStateSize.update(newState.consumedContractsState.size)
private def updateMetrics(newState: SequencerState): Unit = withEmptyMetricsContext {
implicit metricsContext =>
val stateMetrics = bridgeMetrics.Stages.Sequence
stateMetrics.sequencerQueueLength.update(newState.sequencerQueue.length)
stateMetrics.keyStateSize.update(newState.keyState.size)
stateMetrics.consumedContractsStateSize.update(newState.consumedContractsState.size)
}
}