Extract metrics API that is decoupled from dropwizard [PLEN-28] (#15231)

This commit is contained in:
Nicu Reut 2022-10-21 16:30:21 +02:00 committed by GitHub
parent 20a639467b
commit 5b4c7f7677
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 349 additions and 244 deletions

View File

@ -42,7 +42,6 @@ da_scala_library(
"//ledger/metrics",
"//libs-scala/contextualized-logging",
"//libs-scala/nonempty",
"@maven//:io_dropwizard_metrics_metrics_core",
],
)

View File

@ -80,7 +80,6 @@ hj_scalacopts = lf_scalacopts + [
"//libs-scala/resources",
"//libs-scala/scala-utils",
"//libs-scala/struct-json",
"@maven//:io_dropwizard_metrics_metrics_core",
],
)
for edition in [

View File

@ -453,9 +453,9 @@ class ContractsService(
): doobie.ConnectionIO[A] = {
for {
_ <- fconn.pure(())
ctx <- fconn.pure(timer.time())
timerStop <- fconn.pure(timer.startAsync())
res <- it
_ <- fconn.pure(ctx.stop())
_ <- fconn.pure(timerStop())
} yield res
}

View File

@ -45,7 +45,7 @@ private[http] final class ContractList(
metrics: Metrics,
): ET[domain.SyncResponse[JsValue]] =
for {
parseAndDecodeTimerCtx <- getParseAndDecodeTimerCtx()
parseAndDecodeTimerStop <- getParseAndDecodeTimerCtx()
input <- inputJsValAndJwtPayload(req): ET[(Jwt, JwtPayload, JsValue)]
(jwt, jwtPayload, reqBody) = input
@ -66,7 +66,7 @@ private[http] final class ContractList(
.liftErr(InvalidUserInput)
)
): ET[domain.FetchRequest[LfValue]]
_ <- EitherT.pure(parseAndDecodeTimerCtx.close())
_ <- EitherT.pure(parseAndDecodeTimerStop())
_ = logger.debug(s"/v1/fetch fr: $fr")
_ <- either(ensureReadAsAllowedByJwt(fr.readAs, jwtPayload))
@ -86,12 +86,12 @@ private[http] final class ContractList(
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: Metrics,
): Future[Error \/ SearchResult[Error \/ JsValue]] = for {
parseAndDecodeTimerCtx <- Future(
metrics.daml.HttpJsonApi.incomingJsonParsingAndValidationTimer.time()
parseAndDecodeTimerStop <- Future(
metrics.daml.HttpJsonApi.incomingJsonParsingAndValidationTimer.startAsync()
)
res <- inputAndJwtPayload[JwtPayload](req).run.map {
_.map { case (jwt, jwtPayload, _) =>
parseAndDecodeTimerCtx.close()
parseAndDecodeTimerStop()
withJwtPayloadLoggingContext(jwtPayload) { implicit lc =>
val result: SearchResult[
ContractsService.Error \/ domain.ActiveContract.ResolvedCtTyId[LfValue]

View File

@ -42,7 +42,7 @@ private[http] final class CreateAndExercise(
ec: ExecutionContext,
metrics: Metrics,
): ET[domain.SyncResponse[JsValue]] =
handleCommand(req) { (jwt, jwtPayload, reqBody, parseAndDecodeTimerCtx) => implicit lc =>
handleCommand(req) { (jwt, jwtPayload, reqBody, parseAndDecodeTimerStop) => implicit lc =>
for {
cmd <-
decoder
@ -50,7 +50,7 @@ private[http] final class CreateAndExercise(
.liftErr(InvalidUserInput): ET[
domain.CreateCommand[ApiRecord, ContractTypeId.Template.RequiredPkg]
]
_ <- EitherT.pure(parseAndDecodeTimerCtx.close())
_ <- EitherT.pure(parseAndDecodeTimerStop())
response <- eitherT(
Timed.future(
@ -66,7 +66,7 @@ private[http] final class CreateAndExercise(
ec: ExecutionContext,
metrics: Metrics,
): ET[domain.SyncResponse[JsValue]] =
handleCommand(req) { (jwt, jwtPayload, reqBody, parseAndDecodeTimerCtx) => implicit lc =>
handleCommand(req) { (jwt, jwtPayload, reqBody, parseAndDecodeTimerStop) => implicit lc =>
for {
cmd <-
decoder
@ -74,7 +74,7 @@ private[http] final class CreateAndExercise(
.liftErr(InvalidUserInput): ET[
domain.ExerciseCommand[LfValue, domain.ContractLocator[LfValue]]
]
_ <- EitherT.pure(parseAndDecodeTimerCtx.close())
_ <- EitherT.pure(parseAndDecodeTimerStop())
resolvedRef <- eitherT(
resolveReference(jwt, jwtPayload, cmd.meta, cmd.reference)
): ET[domain.ResolvedContractRef[ApiValue]]
@ -99,7 +99,7 @@ private[http] final class CreateAndExercise(
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: Metrics,
): ET[domain.SyncResponse[JsValue]] =
handleCommand(req) { (jwt, jwtPayload, reqBody, parseAndDecodeTimerCtx) => implicit lc =>
handleCommand(req) { (jwt, jwtPayload, reqBody, parseAndDecodeTimerStop) => implicit lc =>
for {
cmd <-
decoder
@ -107,7 +107,7 @@ private[http] final class CreateAndExercise(
.liftErr(InvalidUserInput): ET[
domain.CreateAndExerciseCommand.LAVResolved
]
_ <- EitherT.pure(parseAndDecodeTimerCtx.close())
_ <- EitherT.pure(parseAndDecodeTimerStop())
resp <- eitherT(
Timed.future(

View File

@ -36,11 +36,11 @@ class PackagesAndDars(routeSetup: RouteSetup, packageManagementService: PackageM
metrics: Metrics,
): ET[domain.SyncResponse[Unit]] =
for {
parseAndDecodeTimerCtx <- getParseAndDecodeTimerCtx()
parseAndDecodeTimerStop <- getParseAndDecodeTimerCtx()
_ <- EitherT.pure(metrics.daml.HttpJsonApi.uploadPackagesThroughput.mark())
t2 <- inputSource(req)
(jwt, payload, source) = t2
_ <- EitherT.pure(parseAndDecodeTimerCtx.close())
_ <- EitherT.pure(parseAndDecodeTimerStop())
_ <- eitherT(
handleFutureFailure(

View File

@ -15,7 +15,6 @@ import akka.http.scaladsl.model.headers.{
import akka.stream.Materializer
import Endpoints.ET
import EndpointsCompanion._
import com.codahale.metrics.Timer
import com.daml.logging.LoggingContextOf.withEnrichedLoggingContext
import com.daml.metrics.Metrics
import com.daml.scalautil.Statement.discard
@ -29,7 +28,7 @@ import com.daml.ledger.api.{v1 => lav1}
import lav1.value.{Value => ApiValue}
import scalaz.std.scalaFuture._
import scalaz.syntax.std.option._
import scalaz.{-\/, \/, \/-, EitherT, Traverse}
import scalaz.{-\/, EitherT, Traverse, \/, \/-}
import spray.json._
import scala.concurrent.duration.FiniteDuration
@ -39,6 +38,7 @@ import com.daml.ledger.api.{domain => LedgerApiDomain}
import com.daml.ledger.client.services.admin.UserManagementClient
import com.daml.ledger.client.services.identity.LedgerIdentityClient
import com.daml.logging.{ContextualizedLogger, LoggingContextOf}
import com.daml.metrics.MetricHandle.Timer.TimerStop
private[http] final class RouteSetup(
allowNonHttps: Boolean,
@ -85,7 +85,7 @@ private[http] final class RouteSetup(
Jwt,
JwtWritePayload,
JsValue,
Timer.Context,
TimerStop,
) => LoggingContextOf[JwtPayloadTag with InstanceUUID with RequestID] => ET[
T[ApiValue]
]
@ -132,8 +132,8 @@ private[http] final class RouteSetup(
def getParseAndDecodeTimerCtx()(implicit
metrics: Metrics
): ET[Timer.Context] =
EitherT.pure(metrics.daml.HttpJsonApi.incomingJsonParsingAndValidationTimer.time())
): ET[TimerStop] =
EitherT.pure(metrics.daml.HttpJsonApi.incomingJsonParsingAndValidationTimer.startAsync())
private[endpoints] def input(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]

View File

@ -4,7 +4,8 @@
package com.daml.ledger.indexerbenchmark
import com.codahale.metrics.Snapshot
import com.daml.metrics.Metrics
import com.daml.metrics.MetricHandle.{Histogram, Timer}
import com.daml.metrics.{MetricHandle, Metrics}
class IndexerBenchmarkResult(config: Config, metrics: Metrics, startTime: Long, stopTime: Long) {
@ -54,26 +55,32 @@ class IndexerBenchmarkResult(config: Config, metrics: Metrics, startTime: Long,
|
|Other metrics:
| inputMapping.batchSize: ${histogramToString(
metrics.daml.parallelIndexer.inputMapping.batchSize.getSnapshot
metrics.daml.parallelIndexer.inputMapping.batchSize
)}
| inputMapping.duration: ${histogramToString(
inputMappingDurationMetric.getSnapshot
| inputMapping.duration: ${timerToString(
inputMappingDurationMetric
)}
| inputMapping.duration.rate: ${inputMappingDurationMetric.getMeanRate}
| batching.duration: ${histogramToString(batchingDurationMetric.getSnapshot)}
| batching.duration.rate: ${batchingDurationMetric.getMeanRate}
| seqMapping.duration: ${histogramToString(
metrics.daml.parallelIndexer.seqMapping.duration.getSnapshot
| inputMapping.duration.rate: ${timerMeanRate(inputMappingDurationMetric)}
| batching.duration: ${timerToString(batchingDurationMetric)}
| batching.duration.rate: ${timerMeanRate(batchingDurationMetric)}
| seqMapping.duration: ${timerToString(
metrics.daml.parallelIndexer.seqMapping.duration
)}|
| seqMapping.duration.rate: ${metrics.daml.parallelIndexer.seqMapping.duration.getMeanRate}|
| ingestion.duration: ${histogramToString(
metrics.daml.parallelIndexer.ingestion.executionTimer.getSnapshot
| seqMapping.duration.rate: ${timerMeanRate(
metrics.daml.parallelIndexer.seqMapping.duration
)}|
| ingestion.duration: ${timerToString(
metrics.daml.parallelIndexer.ingestion.executionTimer
)}
| ingestion.duration.rate: ${metrics.daml.parallelIndexer.ingestion.executionTimer.getMeanRate}
| tailIngestion.duration: ${histogramToString(
metrics.daml.parallelIndexer.tailIngestion.executionTimer.getSnapshot
| ingestion.duration.rate: ${timerMeanRate(
metrics.daml.parallelIndexer.ingestion.executionTimer
)}
| tailIngestion.duration: ${timerToString(
metrics.daml.parallelIndexer.tailIngestion.executionTimer
)}
| tailIngestion.duration.rate: ${timerMeanRate(
metrics.daml.parallelIndexer.tailIngestion.executionTimer
)}
| tailIngestion.duration.rate: ${metrics.daml.parallelIndexer.tailIngestion.executionTimer.getMeanRate}
|
|Notes:
| The above numbers include all ingested updates, including package uploads.
@ -83,7 +90,31 @@ class IndexerBenchmarkResult(config: Config, metrics: Metrics, startTime: Long,
|--------------------------------------------------------------------------------
|""".stripMargin
private[this] def histogramToString(data: Snapshot): String = {
s"[min: ${data.getMin}, median: ${data.getMedian}, max: ${data.getMax}]"
private[this] def histogramToString(histogram: Histogram): String = {
histogram match {
case MetricHandle.DropwizardHistogram(_, metric) =>
val data = metric.getSnapshot
dropwizardSnapshotToString(data)
}
}
private[this] def timerToString(timer: Timer): String = {
timer match {
case MetricHandle.DropwizardTimer(_, metric) =>
val data = metric.getSnapshot
dropwizardSnapshotToString(data)
case MetricHandle.Timer.NoOpTimer(_) => ""
}
}
private[this] def timerMeanRate(timer: Timer): Double = {
timer match {
case MetricHandle.DropwizardTimer(_, metric) =>
metric.getMeanRate
case MetricHandle.Timer.NoOpTimer(_) => 0
}
}
private def dropwizardSnapshotToString(data: Snapshot) = {
s"[min: ${data.getMin}, median: ${data.getMedian}, max: ${data.getMax}"
}
}

View File

@ -3,16 +3,23 @@
package com.daml.ledger.api.benchtool.metrics
import java.time.{Clock, Duration}
import java.util.concurrent.TimeUnit
import com.codahale.metrics.{MetricRegistry, SlidingTimeWindowArrayReservoir}
import com.codahale.{metrics => codahale}
import com.daml.ledger.api.benchtool.util.TimeUtil
import com.daml.metrics.MetricHandle.{Counter, Gauge, Histogram, VarGauge}
import com.daml.metrics.MetricHandle.{
Counter,
DropwizardCounter,
DropwizardGauge,
DropwizardHistogram,
Gauge,
Histogram,
}
import com.daml.metrics.{Gauges, MetricName}
import com.google.protobuf.timestamp.Timestamp
import java.time.{Clock, Duration}
import java.util.concurrent.TimeUnit
final class ExposedMetrics[T](
counterMetric: ExposedMetrics.CounterMetric[T],
bytesProcessedMetric: ExposedMetrics.BytesProcessedMetric[T],
@ -36,7 +43,7 @@ final class ExposedMetrics[T](
metric
.recordTimeFunction(elem)
.lastOption
.foreach(recordTime => metric.latestRecordTime.metric.updateValue(recordTime.seconds))
.foreach(recordTime => metric.latestRecordTime.updateValue(recordTime.seconds))
}
}
@ -49,7 +56,7 @@ object ExposedMetrics {
case class BytesProcessedMetric[T](bytesProcessed: Counter, sizingFunction: T => Long)
case class DelayMetric[T](delays: Histogram, recordTimeFunction: T => Seq[Timestamp])
case class LatestRecordTimeMetric[T](
latestRecordTime: VarGauge[Long],
latestRecordTime: Gauge[Long],
recordTimeFunction: T => Seq[Timestamp],
)
@ -63,12 +70,14 @@ object ExposedMetrics {
clock: Clock = Clock.systemUTC(),
): ExposedMetrics[T] = {
val counterMetric = CounterMetric[T](
counter =
Counter(Prefix :+ "count" :+ streamName, registry.counter(Prefix :+ "count" :+ streamName)),
counter = DropwizardCounter(
Prefix :+ "count" :+ streamName,
registry.counter(Prefix :+ "count" :+ streamName),
),
countingFunction = countingFunction,
)
val bytesProcessedMetric = BytesProcessedMetric[T](
bytesProcessed = Counter(
bytesProcessed = DropwizardCounter(
Prefix :+ "bytes_read" :+ streamName,
registry.counter(Prefix :+ "bytes_read" :+ streamName),
),
@ -79,7 +88,7 @@ object ExposedMetrics {
)
val delayMetric = recordTimeFunction.map { f =>
DelayMetric[T](
delays = Histogram(
delays = DropwizardHistogram(
Prefix :+ "delay" :+ streamName,
registry.register(Prefix :+ "delay" :+ streamName, delaysHistogram),
),
@ -88,7 +97,7 @@ object ExposedMetrics {
}
val latestRecordTimeMetric = recordTimeFunction.map { f =>
LatestRecordTimeMetric[T](
latestRecordTime = Gauge(
latestRecordTime = DropwizardGauge(
Prefix :+ "latest_record_time" :+ streamName,
registry
.register(Prefix :+ "latest_record_time" :+ streamName, new Gauges.VarGauge[Long](0L)),

View File

@ -29,7 +29,7 @@ import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionSuccess,
}
import com.daml.ledger.client.services.commands.tracker.TrackedCommandKey
import com.daml.metrics.MetricHandle.Counter
import com.daml.metrics.MetricHandle.DropwizardCounter
import com.daml.util.Ctx
import com.daml.util.akkastreams.MaxInFlight
import com.google.protobuf.empty.Empty
@ -129,8 +129,8 @@ private[daml] final class CommandClient(
// The counters are ignored on the client
MaxInFlight(
config.maxCommandsInFlight,
Counter("capacity", new codahale.Counter),
Counter("name", new codahale.Counter),
DropwizardCounter("capacity", new codahale.Counter),
DropwizardCounter("name", new codahale.Counter),
)
.joinMat(tracker)(Keep.right)
}

View File

@ -8,7 +8,7 @@ import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.codahale.{metrics => codahale}
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.metrics.MetricHandle.Counter
import com.daml.metrics.MetricHandle.{DropwizardCounter => Counter}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Minute, Span}
import org.scalatest.matchers.should.Matchers

View File

@ -52,7 +52,6 @@ da_scala_library(
"//libs-scala/timer-utils",
"@maven//:commons_codec_commons_codec",
"@maven//:commons_io_commons_io",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_grpc_grpc_netty",
"@maven//:io_netty_netty_buffer",
"@maven//:io_netty_netty_handler",

View File

@ -16,7 +16,6 @@ da_scala_library(
"@maven//:com_typesafe_akka_akka_stream",
"@maven//:org_scalaz_scalaz_core",
"@maven//:com_github_scopt_scopt",
"@maven//:org_typelevel_cats_core",
],
tags = ["maven_coordinates=com.daml:metrics:__VERSION__"],
visibility = [

View File

@ -6,11 +6,10 @@ package com.daml.metrics
import com.daml.metrics.MetricDoc.MetricQualification.Debug
import com.daml.metrics.MetricHandle.Counter
import com.codahale.metrics.MetricRegistry.MetricSupplier
import com.codahale.metrics.{Gauge, MetricRegistry}
final class CacheMetrics(override val prefix: MetricName, override val registry: MetricRegistry)
extends MetricHandle.Factory {
extends MetricHandle.DropwizardFactory {
@MetricDoc.Tag(
summary = "The number of cache hits.",
@ -43,12 +42,8 @@ final class CacheMetrics(override val prefix: MetricName, override val registry:
val evictionWeight: Counter = counter(prefix :+ "evicted_weight")
def registerSizeGauge(sizeGauge: Gauge[Long]): Unit =
register(prefix :+ "size", () => sizeGauge)
gaugeWithSupplier(prefix :+ "size", () => () => sizeGauge.getValue)
def registerWeightGauge(weightGauge: Gauge[Long]): Unit =
register(prefix :+ "weight", () => weightGauge)
gaugeWithSupplier(prefix :+ "weight", () => () => weightGauge.getValue)
private def register(name: MetricName, gaugeSupplier: MetricSupplier[Gauge[_]]): Unit = {
gaugeWithSupplier(name, gaugeSupplier)
()
}
}

View File

@ -9,7 +9,7 @@ import com.daml.metrics.MetricHandle.{Counter, Meter, Timer}
import com.codahale.metrics.{MetricRegistry}
class CommandMetrics(override val prefix: MetricName, override val registry: MetricRegistry)
extends MetricHandle.Factory {
extends MetricHandle.DropwizardFactory {
@MetricDoc.Tag(
summary = "The time to validate a Daml command.",

View File

@ -12,7 +12,7 @@ class DatabaseMetrics private[metrics] (
override val prefix: MetricName,
val name: String,
override val registry: MetricRegistry,
) extends MetricHandle.Factory {
) extends MetricHandle.DropwizardFactory {
protected val dbPrefix: MetricName = prefix :+ name
@MetricDoc.Tag(

View File

@ -9,7 +9,7 @@ import com.daml.metrics.MetricHandle.{Counter, Histogram, Meter, Timer}
import com.codahale.metrics.MetricRegistry
class ExecutionMetrics(override val prefix: MetricName, override val registry: MetricRegistry)
extends MetricHandle.Factory {
extends MetricHandle.DropwizardFactory {
@MetricDoc.Tag(
summary = "The time to lookup individual active contracts during interpretation.",
@ -130,7 +130,7 @@ class ExecutionMetrics(override val prefix: MetricName, override val registry: M
@MetricDoc.GroupTag(
representative = "daml.execution.cache.<state_cache>.evicted_weight"
)
object cache extends MetricHandle.Factory {
object cache extends MetricHandle.DropwizardFactory {
override val prefix: MetricName = ExecutionMetrics.this.prefix :+ "cache"
override val registry = ExecutionMetrics.this.registry

View File

@ -8,7 +8,13 @@ import java.util.concurrent.atomic.AtomicReference
import com.codahale.metrics.Gauge
object Gauges {
case class VarGauge[T](initial: T) extends Gauge[T] {
trait GaugeWithUpdate[T] extends Gauge[T] {
def updateValue(x: T): Unit
}
case class VarGauge[T](initial: T) extends GaugeWithUpdate[T] {
private val ref = new AtomicReference[T](initial)
def updateValue(x: T): Unit = ref.set(x)
def updateValue(up: T => T): Unit = {

View File

@ -8,9 +8,9 @@ import com.daml.metrics.MetricHandle.{Counter, Meter, Timer}
import com.codahale.metrics.MetricRegistry
class HttpJsonApiMetrics(override val prefix: MetricName, override val registry: MetricRegistry)
extends MetricHandle.Factory {
extends MetricHandle.DropwizardFactory {
object Db extends MetricHandle.Factory {
object Db extends MetricHandle.DropwizardFactory {
override val prefix: MetricName = HttpJsonApiMetrics.this.prefix :+ "db"
override val registry: MetricRegistry = HttpJsonApiMetrics.this.registry

View File

@ -3,13 +3,12 @@
package com.daml.metrics
import com.daml.metrics.MetricDoc.MetricQualification.Debug
import com.daml.metrics.MetricHandle.{Counter, Timer, VarGauge}
import com.codahale.metrics.MetricRegistry
import com.daml.metrics.MetricDoc.MetricQualification.Debug
import com.daml.metrics.MetricHandle.{Counter, Gauge, Timer}
class IndexMetrics(override val prefix: MetricName, override val registry: MetricRegistry)
extends MetricHandle.Factory {
extends MetricHandle.DropwizardFactory {
@MetricDoc.Tag(
summary = "The buffer size for transaction trees requests.",
@ -72,8 +71,8 @@ class IndexMetrics(override val prefix: MetricName, override val registry: Metri
|in-memory data set.""",
qualification = Debug,
)
val ledgerEndSequentialId: VarGauge[Long] =
varGauge(prefix :+ "ledger_end_sequential_id", 0)
val ledgerEndSequentialId: Gauge[Long] =
gauge(prefix :+ "ledger_end_sequential_id", 0)
object lfValue {
private val prefix = IndexMetrics.this.prefix :+ "lf_value"

View File

@ -3,15 +3,14 @@
package com.daml.metrics
import com.daml.metrics.MetricDoc.MetricQualification.Debug
import com.daml.metrics.MetricHandle.{Gauge, VarGauge}
import com.codahale.metrics.MetricRegistry
import java.time.Instant
import com.codahale.metrics.MetricRegistry
import com.daml.metrics.MetricDoc.MetricQualification.Debug
import com.daml.metrics.MetricHandle.{DropwizardGauge, Gauge}
class IndexerMetrics(override val prefix: MetricName, override val registry: MetricRegistry)
extends MetricHandle.Factory {
extends MetricHandle.DropwizardFactory {
@MetricDoc.Tag(
summary = "The time of the last event ingested by the index db (in milliseconds since EPOCH).",
@ -20,8 +19,8 @@ class IndexerMetrics(override val prefix: MetricName, override val registry: Met
|db. It is measured in milliseconds since the EPOCH time.""",
qualification = Debug,
)
val lastReceivedRecordTime: VarGauge[Long] =
varGauge(prefix :+ "last_received_record_time", 0)
val lastReceivedRecordTime: Gauge[Long] =
gauge(prefix :+ "last_received_record_time", 0)
@MetricDoc.Tag(
summary = "A string value representing the last ledger offset ingested by the index db.",
@ -29,8 +28,8 @@ class IndexerMetrics(override val prefix: MetricName, override val registry: Met
|it is not available in Prometheus.""",
qualification = Debug,
)
val lastReceivedOffset: VarGauge[String] =
varGauge(prefix :+ "last_received_offset", "<none>")
val lastReceivedOffset: Gauge[String] =
gauge(prefix :+ "last_received_offset", "<none>")
@MetricDoc.Tag(
summary = "The sequential id of the current ledger end kept in the database.",
@ -44,8 +43,8 @@ class IndexerMetrics(override val prefix: MetricName, override val registry: Met
|database.""",
qualification = Debug,
)
val ledgerEndSequentialId: VarGauge[Long] =
varGauge(prefix :+ "ledger_end_sequential_id", 0)
val ledgerEndSequentialId: Gauge[Long] =
gauge(prefix :+ "ledger_end_sequential_id", 0)
@MetricDoc.Tag(
summary =
@ -54,10 +53,10 @@ class IndexerMetrics(override val prefix: MetricName, override val registry: Met
|can be negative.""",
qualification = Debug,
)
val currentRecordTimeLag: VarGauge[Long] = Gauge(prefix :+ "current_record_time_lag", null)
val currentRecordTimeLag: Gauge[Long] = DropwizardGauge(prefix :+ "current_record_time_lag", null)
gaugeWithSupplier(
prefix :+ "current_record_time_lag",
() => () => Instant.now().toEpochMilli - lastReceivedRecordTime.metric.getValue,
() => () => Instant.now().toEpochMilli - lastReceivedRecordTime.getValue,
)
}

View File

@ -4,7 +4,14 @@
package com.daml.metrics
import com.daml.metrics.MetricDoc.MetricQualification.Debug
import com.daml.metrics.MetricHandle.{Counter, Meter, Timer}
import com.daml.metrics.MetricHandle.{
Counter,
DropwizardCounter,
DropwizardMeter,
DropwizardTimer,
Meter,
Timer,
}
class InstrumentedExecutorServiceForDocs(name: MetricName) {
@ -15,7 +22,7 @@ class InstrumentedExecutorServiceForDocs(name: MetricName) {
|https://www.javadoc.io/doc/io.dropwizard.metrics/metrics-core/latest/com/codahale/metrics/InstrumentedExecutorService.html""",
qualification = Debug,
)
val submitted: Meter = Meter(name :+ "submitted", null)
val submitted: Meter = DropwizardMeter(name :+ "submitted", null)
@MetricDoc.Tag(
summary = "The number of tasks running in an instrumented executor.",
@ -24,7 +31,7 @@ class InstrumentedExecutorServiceForDocs(name: MetricName) {
|https://www.javadoc.io/doc/io.dropwizard.metrics/metrics-core/latest/com/codahale/metrics/InstrumentedExecutorService.html""",
qualification = Debug,
)
val running: Counter = Counter(name :+ "running", null)
val running: Counter = DropwizardCounter(name :+ "running", null)
@MetricDoc.Tag(
summary = "The number of tasks completed in an instrumented executor.",
@ -33,7 +40,7 @@ class InstrumentedExecutorServiceForDocs(name: MetricName) {
|https://www.javadoc.io/doc/io.dropwizard.metrics/metrics-core/latest/com/codahale/metrics/InstrumentedExecutorService.html""",
qualification = Debug,
)
val completed: Meter = Meter(name :+ "completed", null)
val completed: Meter = DropwizardMeter(name :+ "completed", null)
@MetricDoc.Tag(
summary = "The time that a task is idle in an instrumented executor.",
@ -42,7 +49,7 @@ class InstrumentedExecutorServiceForDocs(name: MetricName) {
|https://www.javadoc.io/doc/io.dropwizard.metrics/metrics-core/latest/com/codahale/metrics/InstrumentedExecutorService.html""",
qualification = Debug,
)
val idle: Timer = Timer(name :+ "idle", null)
val idle: Timer = DropwizardTimer(name :+ "idle", null)
@MetricDoc.Tag(
summary = "The duration of a task is running in an instrumented executor.",
@ -51,5 +58,5 @@ class InstrumentedExecutorServiceForDocs(name: MetricName) {
|https://www.javadoc.io/doc/io.dropwizard.metrics/metrics-core/latest/com/codahale/metrics/InstrumentedExecutorService.html""",
qualification = Debug,
)
val duration: Timer = Timer(name :+ "duration", null)
val duration: Timer = DropwizardTimer(name :+ "duration", null)
}

View File

@ -6,14 +6,14 @@ package com.daml.metrics
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.{BoundedSourceQueue, Materializer, OverflowStrategy, QueueOfferResult}
import com.daml.metrics.MetricHandle.{Counter, Timer}
import com.codahale.{metrics => codahale}
import com.daml.metrics.MetricHandle.Timer.TimerStop
import scala.util.chaining._
object InstrumentedGraph {
final class InstrumentedBoundedSourceQueue[T](
delegate: BoundedSourceQueue[(codahale.Timer.Context, T)],
delegate: BoundedSourceQueue[(TimerStop, T)],
bufferSize: Int,
capacityCounter: Counter,
lengthCounter: Counter,
@ -31,7 +31,7 @@ object InstrumentedGraph {
override def offer(elem: T): QueueOfferResult = {
val result = delegate.offer(
delayTimer.time() -> elem
delayTimer.startAsync() -> elem
)
result match {
case QueueOfferResult.Enqueued =>
@ -70,7 +70,7 @@ object InstrumentedGraph {
materializer: Materializer
): Source[T, BoundedSourceQueue[T]] = {
val (boundedQueue, source) =
Source.queue[(codahale.Timer.Context, T)](bufferSize).preMaterialize()
Source.queue[(TimerStop, T)](bufferSize).preMaterialize()
val instrumentedQueue =
new InstrumentedBoundedSourceQueue[T](
@ -82,8 +82,8 @@ object InstrumentedGraph {
)
capacityCounter.inc(bufferSize.toLong)
source.mapMaterializedValue(_ => instrumentedQueue).map { case (timingContext, item) =>
timingContext.stop()
source.mapMaterializedValue(_ => instrumentedQueue).map { case (stopTimer, item) =>
stopTimer()
lengthCounter.dec()
item
}

View File

@ -3,13 +3,12 @@
package com.daml.metrics
import com.daml.metrics.MetricDoc.MetricQualification.Debug
import com.daml.metrics.MetricHandle.{Counter, Timer}
import com.codahale.metrics.MetricRegistry
import com.daml.metrics.MetricDoc.MetricQualification.Debug
import com.daml.metrics.MetricHandle.{Counter, DropwizardCounter, DropwizardTimer, Timer}
class LAPIMetrics(override val prefix: MetricName, override val registry: MetricRegistry)
extends MetricHandle.Factory {
extends MetricHandle.DropwizardFactory {
@MetricDoc.Tag(
summary = "The time spent serving a ledger api grpc request.",
@ -18,8 +17,7 @@ class LAPIMetrics(override val prefix: MetricName, override val registry: Metric
|time to return the first response.""",
qualification = Debug,
)
val forMethodForDocs: Timer = Timer(prefix :+ "<service_method>", null)
val forMethodForDocs: Timer = DropwizardTimer(prefix :+ "<service_method>", null)
def forMethod(name: String): Timer = timer(prefix :+ name)
object return_status {
@ -31,7 +29,7 @@ class LAPIMetrics(override val prefix: MetricName, override val registry: Metric
|the ledger api.""",
qualification = Debug,
)
val forCodeForDocs = Counter(prefix :+ "<gRPC_status_code>", null)
val forCodeForDocs = DropwizardCounter(prefix :+ "<gRPC_status_code>", null)
def forCode(code: String): Counter = counter(prefix :+ code)
}

View File

@ -3,19 +3,18 @@
package com.daml.metrics
import java.time.Duration
import java.util.concurrent.TimeUnit
import cats.data.EitherT
import com.codahale.metrics.MetricRegistry.MetricSupplier
import com.codahale.metrics.Snapshot
import com.codahale.metrics.Timer.Context
import com.codahale.{metrics => codahale}
import com.daml.metrics.Gauges.{GaugeWithUpdate, VarGauge}
import com.daml.metrics.MetricHandle.Timer.TimerStop
import scala.annotation.StaticAnnotation
import scala.concurrent.{Future, blocking}
import scala.concurrent.{ExecutionContext, Future, blocking}
sealed trait MetricHandle[T <: codahale.Metric] {
trait MetricHandle {
def name: String
def metric: T
def metricType: String // type string used for documentation purposes
}
@ -25,112 +24,185 @@ object MetricHandle {
def prefix: MetricName
def timer(name: MetricName): Timer
def gauge[T](name: MetricName, initial: T): Gauge[T]
def gaugeWithSupplier[T](
name: MetricName,
gaugeSupplier: () => () => T,
): Unit
def meter(name: MetricName): Meter
def counter(name: MetricName): Counter
def histogram(name: MetricName): Histogram
}
trait DropwizardFactory extends Factory {
def registry: codahale.MetricRegistry
def timer(name: MetricName): Timer = Timer(name, registry.timer(name))
def timer(name: MetricName): Timer = DropwizardTimer(name, registry.timer(name))
def varGauge[T](name: MetricName, initial: T): VarGauge[T] =
addGauge(name, Gauges.VarGauge[T](initial), _.updateValue(initial))
def gauge[T](name: MetricName, initial: T): Gauge[T] = {
val registeredgauge = reRegisterGauge[T, VarGauge[T]](name, Gauges.VarGauge(initial))
DropwizardGauge(name, registeredgauge)
}
def addGauge[T <: codahale.Gauge[M], M](
def gaugeWithSupplier[T](
name: MetricName,
newGauge: => T,
resetGauge: (T => Unit),
): Gauge[T, M] = gauge(name, registry.register(name, newGauge), resetGauge)
def gaugeWithSupplier[T <: codahale.Gauge[M], M](
name: MetricName,
gaugeSupplier: MetricSupplier[codahale.Gauge[_]],
): Gauge[T, M] =
gauge(name, registry.gauge(name, gaugeSupplier).asInstanceOf[T], _ => ())
def gauge[T <: codahale.Gauge[M], M](
name: MetricName,
registerGauge: => T,
resetGauge: (T => Unit),
): Gauge[T, M] = blocking {
gaugeSupplier: () => () => T,
): Unit =
synchronized {
registry.remove(name)
val res: Gauge[T, M] = {
val gauge = registerGauge
Gauge(name, gauge)
}
resetGauge(res.metric)
res
val _ = registry.gauge(
name,
() => {
val valueGetter = gaugeSupplier()
new codahale.Gauge[T] { override def getValue: T = valueGetter() }
},
)
()
}
}
def meter(name: MetricName): Meter = {
// This is idempotent
Meter(name, registry.meter(name))
DropwizardMeter(name, registry.meter(name))
}
def counter(name: MetricName): Counter = {
// This is idempotent
Counter(name, registry.counter(name))
DropwizardCounter(name, registry.counter(name))
}
def histogram(name: MetricName): Histogram = {
Histogram(name, registry.histogram(name))
DropwizardHistogram(name, registry.histogram(name))
}
protected def reRegisterGauge[T, G <: codahale.Gauge[T]](
name: MetricName,
gauge: G,
): G = blocking {
synchronized {
registry.remove(name)
registry.register(name, gauge)
}
}
}
trait FactoryWithDBMetrics extends MetricHandle.Factory {
trait FactoryWithDBMetrics extends MetricHandle.DropwizardFactory {
def createDbMetrics(name: String): DatabaseMetrics =
new DatabaseMetrics(prefix, name, registry)
}
sealed case class Timer(name: String, metric: codahale.Timer)
extends MetricHandle[codahale.Timer] {
sealed trait Timer extends MetricHandle {
def metricType: String = "Timer"
def timeEitherT[E, A](ev: EitherT[Future, E, A]): EitherT[Future, E, A] = {
EitherT(Timed.future(metric, ev.value))
def update(duration: Long, unit: TimeUnit): Unit
def update(duration: Duration): Unit
def time[T](call: => T): T
def startAsync(): TimerStop
def timeFuture[T](call: => Future[T]): Future[T] = {
val stop = startAsync()
val result = call
result.onComplete(_ => stop())(ExecutionContext.parasitic)
result
}
}
object Timer {
type TimerStop = () => Unit
sealed case class NoOpTimer(name: String) extends Timer {
override def update(duration: Long, unit: TimeUnit): Unit = ()
override def update(duration: Duration): Unit = ()
override def time[T](call: => T): T = call
override def startAsync(): TimerStop = () => ()
}
}
sealed case class DropwizardTimer(name: String, metric: codahale.Timer) extends Timer {
def update(duration: Long, unit: TimeUnit): Unit = metric.update(duration, unit)
def getCount: Long = metric.getCount
def getSnapshot: Snapshot = metric.getSnapshot
def getMeanRate: Double = metric.getMeanRate
def time(): Context = metric.time()
}
sealed case class Gauge[U <: codahale.Gauge[T], T](name: String, metric: U)
extends MetricHandle[codahale.Gauge[T]] {
def update(duration: Duration): Unit = metric.update(duration)
override def time[T](call: => T): T = metric.time(() => call)
override def startAsync(): TimerStop = {
val ctx = metric.time()
() => {
ctx.stop()
()
}
}
}
sealed trait Gauge[T] extends MetricHandle {
def metricType: String = "Gauge"
def updateValue(newValue: T): Unit
def updateValue(f: T => T): Unit = updateValue(f(getValue))
def getValue: T
}
sealed case class Meter(name: String, metric: codahale.Meter)
extends MetricHandle[codahale.Meter] {
sealed case class DropwizardGauge[T](name: String, metric: GaugeWithUpdate[T]) extends Gauge[T] {
def updateValue(newValue: T): Unit = metric.updateValue(newValue)
override def getValue: T = metric.getValue
}
sealed trait Meter extends MetricHandle {
def metricType: String = "Meter"
def mark(): Unit = metric.mark()
def mark(): Unit = mark(1)
def mark(value: Long): Unit
}
sealed case class DropwizardMeter(name: String, metric: codahale.Meter) extends Meter {
def mark(value: Long): Unit = metric.mark(value)
}
sealed case class Counter(name: String, metric: codahale.Counter)
extends MetricHandle[codahale.Counter] {
def metricType: String = "Counter"
sealed trait Counter extends MetricHandle {
def inc(): Unit = metric.inc
def inc(n: Long): Unit = metric.inc(n)
def dec(): Unit = metric.dec
def dec(n: Long): Unit = metric.dec(n)
override def metricType: String = "Counter"
def inc(): Unit
def inc(n: Long): Unit
def dec(): Unit
def dec(n: Long): Unit
def getCount: Long
}
sealed case class DropwizardCounter(name: String, metric: codahale.Counter) extends Counter {
def getCount: Long = metric.getCount
override def inc(): Unit = metric.inc
override def inc(n: Long): Unit = metric.inc(n)
override def dec(): Unit = metric.dec
override def dec(n: Long): Unit = metric.dec(n)
override def getCount: Long = metric.getCount
}
sealed case class Histogram(name: String, metric: codahale.Histogram)
extends MetricHandle[codahale.Histogram] {
sealed trait Histogram extends MetricHandle {
def metricType: String = "Histogram"
def update(value: Long): Unit
def update(value: Int): Unit
def update(value: Long): Unit = metric.update(value)
def update(value: Int): Unit = metric.update(value)
def getSnapshot: Snapshot = metric.getSnapshot
}
type VarGauge[T] = Gauge[Gauges.VarGauge[T], T]
sealed case class DropwizardHistogram(name: String, metric: codahale.Histogram)
extends MetricHandle
with Histogram {
override def update(value: Long): Unit = metric.update(value)
override def update(value: Int): Unit = metric.update(value)
}
}

View File

@ -9,6 +9,8 @@ final class MetricName(private val segments: Vector[String]) extends AnyVal {
def :+(segment: String): MetricName =
new MetricName(segments :+ segment)
def asPrometheus = segments.mkString("_")
override def toString: String =
segments.mkString(".")
}

View File

@ -11,7 +11,7 @@ object Metrics {
new Metrics(SharedMetricRegistries.getOrCreate(registryName))
}
final class Metrics(override val registry: MetricRegistry) extends MetricHandle.Factory {
final class Metrics(override val registry: MetricRegistry) extends MetricHandle.DropwizardFactory {
override val prefix = MetricName("")
object test {
@ -20,7 +20,7 @@ final class Metrics(override val registry: MetricRegistry) extends MetricHandle.
val db: DatabaseMetrics = new DatabaseMetrics(prefix, "db", registry)
}
object daml extends MetricHandle.Factory {
object daml extends MetricHandle.DropwizardFactory {
override val prefix: MetricName = MetricName.Daml
override val registry = Metrics.this.registry

View File

@ -42,7 +42,7 @@ import com.codahale.metrics.MetricRegistry
representative = "daml.parallel_indexer.<stage>.executor.duration"
)
class ParallelIndexerMetrics(override val prefix: MetricName, override val registry: MetricRegistry)
extends MetricHandle.Factory {
extends MetricHandle.DropwizardFactory {
val initialization = new DatabaseMetrics(prefix, "initialization", registry)
// Number of state updates persisted to the database

View File

@ -3,15 +3,14 @@
package com.daml.metrics
import com.daml.metrics.MetricDoc.MetricQualification.Debug
import com.daml.metrics.MetricHandle.{Counter, Histogram, Meter, Timer}
import com.codahale.metrics.MetricRegistry
import com.daml.metrics.MetricDoc.MetricQualification.Debug
import com.daml.metrics.MetricHandle.{Counter, DropwizardTimer, Histogram, Meter, Timer}
class ServicesMetrics(override val prefix: MetricName, override val registry: MetricRegistry)
extends MetricHandle.Factory {
extends MetricHandle.DropwizardFactory {
object index extends MetricHandle.Factory {
object index extends MetricHandle.DropwizardFactory {
override val prefix: MetricName = ServicesMetrics.this.prefix :+ "index"
override val registry: MetricRegistry = ServicesMetrics.this.registry
@ -23,7 +22,7 @@ class ServicesMetrics(override val prefix: MetricName, override val registry: Me
|time statistics of such operations.""",
qualification = Debug,
)
val operationForDocs: Timer = Timer(prefix :+ "<operation>", null)
val operationForDocs: Timer = DropwizardTimer(prefix :+ "<operation>", null)
val listLfPackages: Timer = timer(prefix :+ "list_lf_packages")
val getLfArchive: Timer = timer(prefix :+ "get_lf_archive")
@ -52,7 +51,7 @@ class ServicesMetrics(override val prefix: MetricName, override val registry: Me
val prune: Timer = timer(prefix :+ "prune")
val getTransactionMetering: Timer = timer(prefix :+ "get_transaction_metering")
object InMemoryFanoutBuffer extends MetricHandle.Factory {
object InMemoryFanoutBuffer extends MetricHandle.DropwizardFactory {
override val prefix: MetricName = index.prefix :+ "in_memory_fan_out_buffer"
override val registry: MetricRegistry = index.registry
@ -86,7 +85,7 @@ class ServicesMetrics(override val prefix: MetricName, override val registry: Me
val bufferSize: Histogram = histogram(prefix :+ "size")
}
case class BufferedReader(streamName: String) extends MetricHandle.Factory {
case class BufferedReader(streamName: String) extends MetricHandle.DropwizardFactory {
override val prefix: MetricName = index.prefix :+ s"${streamName}_buffer_reader"
override val registry: MetricRegistry = index.registry
@ -148,7 +147,7 @@ class ServicesMetrics(override val prefix: MetricName, override val registry: Me
}
}
object read extends MetricHandle.Factory {
object read extends MetricHandle.DropwizardFactory {
override val prefix: MetricName = ServicesMetrics.this.prefix :+ "read"
override val registry: MetricRegistry = index.registry
@ -159,13 +158,13 @@ class ServicesMetrics(override val prefix: MetricName, override val registry: Me
|each operation.""",
qualification = Debug,
)
val readOperationForDocs: Timer = Timer(prefix :+ "<operation>", null)
val readOperationForDocs: Timer = DropwizardTimer(prefix :+ "<operation>", null)
val getLedgerInitialConditions: Timer = timer(prefix :+ "get_ledger_initial_conditions")
val stateUpdates: Timer = timer(prefix :+ "state_updates")
}
object write extends MetricHandle.Factory {
object write extends MetricHandle.DropwizardFactory {
override val prefix: MetricName = ServicesMetrics.this.prefix :+ "write"
override val registry: MetricRegistry = index.registry
@ -177,7 +176,7 @@ class ServicesMetrics(override val prefix: MetricName, override val registry: Me
|exposes the time needed to execute each operation.""",
qualification = Debug,
)
val writeOperationForDocs: Timer = Timer(prefix :+ "<operation>", null)
val writeOperationForDocs: Timer = DropwizardTimer(prefix :+ "<operation>", null)
val submitTransaction: Timer = timer(prefix :+ "submit_transaction")
val submitTransactionRunning: Meter = meter(prefix :+ "submit_transaction_running")

View File

@ -7,24 +7,20 @@ import java.util.concurrent.CompletionStage
import akka.Done
import akka.stream.scaladsl.{Keep, Source}
import com.codahale.{metrics => codahale}
import com.daml.metrics.MetricHandle.{Counter, Meter, Timer}
import com.daml.concurrent
import com.daml.metrics.MetricHandle.{Counter, Meter, Timer}
import scala.concurrent.{ExecutionContext, Future}
object Timed {
def value[T](timer: Timer, value: => T): T =
this.value[T](timer.metric, value)
def value[T](timer: codahale.Timer, value: => T): T =
timer.time(() => value)
timer.time(value)
def trackedValue[T](meter: Meter, value: => T): T = {
meter.metric.mark(+1)
meter.mark(+1)
val result = value
meter.metric.mark(-1)
meter.mark(-1)
result
}
@ -33,17 +29,17 @@ object Timed {
}
def completionStage[T](timer: Timer, future: => CompletionStage[T]): CompletionStage[T] = {
val ctx = timer.time()
val stop = timer.startAsync()
future.whenComplete { (_, _) =>
ctx.stop()
stop()
()
}
}
def trackedCompletionStage[T](meter: Meter, future: => CompletionStage[T]): CompletionStage[T] = {
meter.metric.mark(+1)
meter.mark(+1)
future.whenComplete { (_, _) =>
meter.metric.mark(-1)
meter.mark(-1)
()
}
}
@ -57,31 +53,24 @@ object Timed {
}
def future[T](timer: Timer, future: => Future[T]): Future[T] = {
this.future[T](timer.metric, future)
}
def future[T](timer: codahale.Timer, future: => Future[T]): Future[T] = {
val ctx = timer.time()
val result = future
result.onComplete(_ => ctx.stop())(ExecutionContext.parasitic)
result
timer.timeFuture(future)
}
def future[EC, T](timer: Timer, future: => concurrent.Future[EC, T]): concurrent.Future[EC, T] = {
val ctx = timer.time()
val stop = timer.startAsync()
val result = future
result.onComplete(_ => ctx.stop())(concurrent.ExecutionContext.parasitic)
result.onComplete(_ => stop())(concurrent.ExecutionContext.parasitic)
result
}
def trackedFuture[T](counter: Counter, future: => Future[T]): Future[T] = {
counter.metric.inc()
future.andThen { case _ => counter.metric.dec() }(ExecutionContext.parasitic)
counter.inc()
future.andThen { case _ => counter.dec() }(ExecutionContext.parasitic)
}
def trackedFuture[T](meter: Meter, future: => Future[T]): Future[T] = {
meter.metric.mark(+1)
future.andThen { case _ => meter.metric.mark(-1) }(ExecutionContext.parasitic)
meter.mark(+1)
future.andThen { case _ => meter.mark(-1) }(ExecutionContext.parasitic)
}
def timedAndTrackedFuture[T](timer: Timer, counter: Counter, future: => Future[T]): Future[T] = {
@ -92,12 +81,14 @@ object Timed {
Timed.future(timer, trackedFuture(meter, future))
}
/** Be advised that this will time the source when it's created and not when it's actually run.
*/
def source[Out, Mat](timer: Timer, source: => Source[Out, Mat]): Source[Out, Mat] = {
val ctx = timer.time()
val stop = timer.startAsync()
source
.watchTermination()(Keep.both[Mat, Future[Done]])
.mapMaterializedValue { case (mat, done) =>
done.onComplete(_ => ctx.stop())(ExecutionContext.parasitic)
done.onComplete(_ => stop())(ExecutionContext.parasitic)
mat
}
}

View File

@ -4,27 +4,28 @@
package com.daml.metrics
import java.util.concurrent.atomic.AtomicLong
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.QueueOfferResult
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.codahale.{metrics => codahale}
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.metrics.InstrumentedGraph._
import com.daml.metrics.InstrumentedGraphSpec.SamplingCounter
import com.daml.metrics.MetricHandle.{DropwizardCounter, DropwizardTimer}
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import com.daml.metrics.InstrumentedGraph._
import com.daml.metrics.MetricHandle.{Counter, Timer}
import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{Future, Promise}
final class InstrumentedGraphSpec extends AsyncFlatSpec with Matchers with AkkaBeforeAndAfterAll {
behavior of "InstrumentedSource.queue"
it should "correctly enqueue and measure queue delay" in {
val capacityCounter = Counter("test-capacity", new codahale.Counter())
val maxBuffered = Counter("test-length", new InstrumentedGraphSpec.MaxValueCounter())
val delayTimer = Timer("test-delay", new codahale.Timer())
val capacityCounter = DropwizardCounter("test-capacity", new codahale.Counter())
val maxBuffered = DropwizardCounter("test-length", new InstrumentedGraphSpec.MaxValueCounter())
val delayTimer = DropwizardTimer("test-delay", new codahale.Timer())
val bufferSize = 2
val (source, sink) =
@ -43,7 +44,7 @@ final class InstrumentedGraphSpec extends AsyncFlatSpec with Matchers with AkkaB
sink.map { output =>
all(result) shouldBe QueueOfferResult.Enqueued
output shouldEqual input
delayTimer.getCount shouldEqual bufferSize
delayTimer.metric.getCount shouldEqual bufferSize
delayTimer.metric.getSnapshot.getMax should be >= 5.millis.toNanos
}
}
@ -60,9 +61,9 @@ final class InstrumentedGraphSpec extends AsyncFlatSpec with Matchers with AkkaB
val lowAcceptanceThreshold = bufferSize - acceptanceTolerance
val highAcceptanceThreshold = bufferSize + acceptanceTolerance
val maxBuffered = Counter("test-length", new InstrumentedGraphSpec.MaxValueCounter())
val capacityCounter = Counter("test-capacity", new codahale.Counter())
val delayTimer = Timer("test-delay", new codahale.Timer())
val maxBuffered = DropwizardCounter("test-length", new InstrumentedGraphSpec.MaxValueCounter())
val capacityCounter = DropwizardCounter("test-capacity", new codahale.Counter())
val delayTimer = DropwizardTimer("test-delay", new codahale.Timer())
val stop = Promise[Unit]()
@ -108,7 +109,7 @@ final class InstrumentedGraphSpec extends AsyncFlatSpec with Matchers with AkkaB
val counter = new SamplingCounter(10.millis)
Source(List.fill(1000)("element"))
.throttle(producerMaxSpeed, FiniteDuration(10, "millis"))
.buffered(Counter("test", counter), 100)
.buffered(DropwizardCounter("test", counter), 100)
.throttle(consumerMaxSpeed, FiniteDuration(10, "millis"))
.run()
.map(_ => counter.finishSampling())

View File

@ -4,7 +4,7 @@
package com.daml.platform.apiserver
import com.daml.metrics.MetricHandle.Timer
import com.codahale.{metrics => codahale}
import com.daml.metrics.MetricHandle.Timer.TimerStop
import com.daml.metrics.Metrics
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall
import io.grpc._
@ -43,18 +43,18 @@ private[apiserver] final class MetricsInterceptor(metrics: Metrics) extends Serv
fullMethodName,
metrics.daml.lapi.forMethod(MetricsNaming.nameFor(fullMethodName)),
)
val timerCtx = timer.metric.time
val timerCtx = timer.startAsync()
next.startCall(new TimedServerCall(call, timerCtx), headers)
}
private final class TimedServerCall[ReqT, RespT](
delegate: ServerCall[ReqT, RespT],
timer: codahale.Timer.Context,
timerStop: TimerStop,
) extends SimpleForwardingServerCall[ReqT, RespT](delegate) {
override def close(status: Status, trailers: Metadata): Unit = {
metrics.daml.lapi.return_status.forCode(status.getCode.toString).inc()
delegate.close(status, trailers)
timer.stop()
timerStop()
()
}
}

View File

@ -68,7 +68,7 @@ private[platform] object InMemoryStateUpdaterFlow {
.mapAsync(1) { result =>
Future {
update(result)
metrics.daml.index.ledgerEndSequentialId.metric.updateValue(result.lastEventSequentialId)
metrics.daml.index.ledgerEndSequentialId.updateValue(result.lastEventSequentialId)
}(updateCachesExecutionContext)
}
}

View File

@ -266,11 +266,11 @@ object ParallelIndexerSubscription {
implicit loggingContext =>
dbDispatcher.executeSql(metrics.daml.parallelIndexer.tailIngestion) { connection =>
ingestTailFunction(ledgerEndFrom(lastBatch))(connection)
metrics.daml.indexer.ledgerEndSequentialId.metric
metrics.daml.indexer.ledgerEndSequentialId
.updateValue(lastBatch.lastSeqEventId)
metrics.daml.indexer.lastReceivedRecordTime.metric
metrics.daml.indexer.lastReceivedRecordTime
.updateValue(lastBatch.lastRecordTime)
metrics.daml.indexer.lastReceivedOffset.metric
metrics.daml.indexer.lastReceivedOffset
.updateValue(lastBatch.lastOffset.toHexString)
logger.info("Ledger end updated in IndexDB")
batchOfBatches

View File

@ -6,6 +6,7 @@ package com.daml.platform.packages
import java.io.File
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
import akka.actor.{ActorSystem, Scheduler}
import com.codahale.metrics.MetricRegistry
import com.daml.bazeltools.BazelRunfiles.rlocation
@ -14,11 +15,11 @@ import com.daml.ledger.resources.TestResourceContext
import com.daml.ledger.test.ModelTestDar
import com.daml.lf.archive.DarParser
import com.daml.lf.data.Ref.PackageId
import com.daml.metrics.MetricHandle.Timer
import com.daml.metrics.MetricHandle.DropwizardTimer
import com.daml.platform.testing.LogCollector
import org.scalatest.BeforeAndAfterEach
import org.scalatest.wordspec.AsyncWordSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{Await, Future}
@ -32,7 +33,7 @@ class DeduplicatingPackageLoaderSpec
private[this] var actorSystem: ActorSystem = _
private[this] val loadCount = new AtomicLong()
private[this] val metricRegistry = new MetricRegistry
private[this] val metric = Timer("test-metric", metricRegistry.timer("test-metric"))
private[this] val metric = DropwizardTimer("test-metric", metricRegistry.timer("test-metric"))
private[this] val dar = {
val fileName = new File(rlocation(ModelTestDar.path))

View File

@ -30,6 +30,5 @@ da_scala_library(
"//ledger/participant-state",
"//libs-scala/contextualized-logging",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:io_dropwizard_metrics_metrics_core",
],
)

View File

@ -7,7 +7,7 @@ import com.daml.metrics.{MetricHandle, MetricName, Metrics}
import com.codahale.metrics.MetricRegistry
import com.daml.metrics.MetricHandle.{Counter, Histogram, Timer}
class BridgeMetrics(metrics: Metrics) extends MetricHandle.Factory {
class BridgeMetrics(metrics: Metrics) extends MetricHandle.DropwizardFactory {
override val registry: MetricRegistry = metrics.registry