Add counter for metered events [PLEN-39] (#15878)

This commit is contained in:
Nicu Reut 2022-12-16 12:03:30 +01:00 committed by GitHub
parent d76603d137
commit f572d31222
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 91 additions and 22 deletions

View File

@ -0,0 +1,32 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.metrics
import com.daml.metrics.api.MetricDoc.MetricQualification.Debug
import com.daml.metrics.api.MetricHandle.Factory
import com.daml.metrics.api.{MetricDoc, MetricHandle, MetricName}
class IndexedUpdatesMetrics(prefix: MetricName, metricFactory: Factory) {
@MetricDoc.Tag(
summary = "Number of events that will be metered",
description = """Represents the number of events that will be included in the metering report.
|This is an estimate of the total number and not a substitute for the metering report.""".stripMargin,
qualification = Debug,
)
val meteredEventsMeter: MetricHandle.Meter = metricFactory.meter(
prefix :+ "metered_events",
"Number of events that will be metered.",
)
}
object IndexedUpdatesMetrics {
object Labels {
val applicationId = "application_id"
}
}

View File

@ -53,6 +53,8 @@ final class Metrics(override val registry: MetricRegistry, val otelMeter: OtelMe
object indexer extends IndexerMetrics(prefix :+ "indexer", registry)
object indexerEvents extends IndexedUpdatesMetrics(prefix :+ "indexer", openTelemetryFactory)
object parallelIndexer extends ParallelIndexerMetrics(prefix :+ "parallel_indexer", registry)
object services extends ServicesMetrics(prefix :+ "services", registry)

View File

@ -319,6 +319,7 @@ da_scala_test_suite(
"//libs-scala/scalatest-utils",
"//libs-scala/timer-utils",
"//observability/metrics",
"//observability/metrics:metrics-test-lib",
"//observability/telemetry",
"//observability/telemetry:telemetry-test-lib",
"@maven//:ch_qos_logback_logback_classic",

View File

@ -46,7 +46,9 @@ private[platform] case class ParallelIndexerSubscription[DB_BATCH](
stringInterningView: StringInterning with InternizingStringInterningView,
) {
import ParallelIndexerSubscription._
private implicit val metricsContext: MetricsContext = MetricsContext(
"participant_id" -> participantId
)
def apply(
inputMapperExecutor: Executor,
batcherExecutor: Executor,
@ -65,7 +67,7 @@ private[platform] case class ParallelIndexerSubscription[DB_BATCH](
translation = translation,
compressionStrategy = compressionStrategy,
),
UpdateToMeteringDbDto(),
UpdateToMeteringDbDto(metrics = metrics.daml.indexerEvents),
)
),
seqMapperZero =

View File

@ -7,11 +7,17 @@ import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v2.Update.TransactionAccepted
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf.data.Time.Timestamp
import com.daml.metrics.IndexedUpdatesMetrics
import com.daml.metrics.api.MetricsContext
import com.daml.metrics.api.MetricsContext.withExtraMetricLabels
object UpdateToMeteringDbDto {
def apply(
clock: () => Long = () => Timestamp.now().micros
clock: () => Long = () => Timestamp.now().micros,
metrics: IndexedUpdatesMetrics,
)(implicit
mc: MetricsContext
): Iterable[(Offset, state.Update)] => Vector[DbDto.TransactionMetering] = input => {
val time = clock()
@ -32,6 +38,10 @@ object UpdateToMeteringDbDto {
.filter(_._2 != 0)
.sortBy(_._1)
.map { case (applicationId, count) =>
withExtraMetricLabels(IndexedUpdatesMetrics.Labels.applicationId -> applicationId) {
implicit mc =>
metrics.meteredEventsMeter.mark(count.toLong)
}
DbDto.TransactionMetering(
application_id = applicationId,
action_count = count,

View File

@ -14,12 +14,17 @@ import com.daml.lf.transaction.{
TransactionVersion,
VersionedTransaction,
}
import com.daml.metrics.IndexedUpdatesMetrics
import com.daml.metrics.api.testing.{InMemoryMetricsFactory, MetricValues}
import com.daml.metrics.api.{MetricName, MetricsContext}
import org.scalatest.wordspec.AnyWordSpec
class UpdateToMeteringDbDtoSpec extends AnyWordSpec {
class UpdateToMeteringDbDtoSpec extends AnyWordSpec with MetricValues {
import DbDtoEq._
private val updateEventsMetrics = newUpdateMetrics
"UpdateMeteringToDbDto" should {
val applicationId = Ref.ApplicationId.assertFromString("a0")
@ -69,9 +74,10 @@ class UpdateToMeteringDbDtoSpec extends AnyWordSpec {
"extract transaction metering" in {
val actual = UpdateToMeteringDbDto(clock = () => timestamp)(
List((Offset.fromHexString(offset), someTransactionAccepted))
)
val actual =
UpdateToMeteringDbDto(clock = () => timestamp, updateEventsMetrics)(MetricsContext.Empty)(
List((Offset.fromHexString(offset), someTransactionAccepted))
)
val expected: Vector[DbDto.TransactionMetering] = Vector(
DbDto.TransactionMetering(
@ -97,18 +103,19 @@ class UpdateToMeteringDbDtoSpec extends AnyWordSpec {
val expected: Vector[DbDto.TransactionMetering] = Vector(metering)
val actual = UpdateToMeteringDbDto(clock = () => timestamp)(
List(
(
Offset.fromHexString(Ref.HexString.assertFromString("01")),
someTransactionAccepted,
),
(
Offset.fromHexString(Ref.HexString.assertFromString(metering.ledger_offset)),
someTransactionAccepted,
),
val actual =
UpdateToMeteringDbDto(clock = () => timestamp, updateEventsMetrics)(MetricsContext.Empty)(
List(
(
Offset.fromHexString(Ref.HexString.assertFromString("01")),
someTransactionAccepted,
),
(
Offset.fromHexString(Ref.HexString.assertFromString(metering.ledger_offset)),
someTransactionAccepted,
),
)
)
)
actual should equal(expected)(decided by DbDtoSeqEq)
@ -116,7 +123,9 @@ class UpdateToMeteringDbDtoSpec extends AnyWordSpec {
"return empty vector if input iterable is empty" in {
val expected: Vector[DbDto.TransactionMetering] = Vector.empty
val actual = UpdateToMeteringDbDto(clock = () => timestamp)(List.empty)
val actual = UpdateToMeteringDbDto(clock = () => timestamp, updateEventsMetrics)(
MetricsContext.Empty
)(List.empty)
actual should equal(expected)(decided by DbDtoSeqEq)
}
@ -127,12 +136,25 @@ class UpdateToMeteringDbDtoSpec extends AnyWordSpec {
Some(someCompletionInfo.copy(statistics = Some(TransactionNodeStatistics.Empty)))
)
val actual = UpdateToMeteringDbDto(clock = () => timestamp)(
List((Offset.fromHexString(offset), txWithNoActionCount))
)
val actual =
UpdateToMeteringDbDto(clock = () => timestamp, updateEventsMetrics)(MetricsContext.Empty)(
List((Offset.fromHexString(offset), txWithNoActionCount))
)
actual.isEmpty shouldBe true
}
"increment metered events counter" in {
val updateEventsMetrics = newUpdateMetrics
UpdateToMeteringDbDto(clock = () => timestamp, updateEventsMetrics)(MetricsContext.Empty)(
List((Offset.fromHexString(offset), someTransactionAccepted))
)
updateEventsMetrics.meteredEventsMeter.value shouldBe (statistics.committed.actions + statistics.rolledBack.actions)
}
}
private def newUpdateMetrics = {
new IndexedUpdatesMetrics(MetricName("test"), InMemoryMetricsFactory)
}
}