mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
Fix ledger-api-bench-tool reporting NaN result [DPP-989] (#12928)
* Do not calculate too frequent periodic reports in ledger-api-bench-tool. CHANGELOG_BEGIN CHANGELOG_END * Deduplicate MetricsCollectorSpec
This commit is contained in:
parent
2a06a0a78c
commit
f950690f1a
@ -96,6 +96,7 @@ da_scala_test_suite(
|
||||
"//language-support/scala/bindings",
|
||||
"//ledger/ledger-api-common",
|
||||
"//ledger/metrics",
|
||||
"//libs-scala/adjustable-clock",
|
||||
"@maven//:com_typesafe_config",
|
||||
"@maven//:io_dropwizard_metrics_metrics_core",
|
||||
"@maven//:io_grpc_grpc_api",
|
||||
|
@ -14,14 +14,16 @@ object MetricsCollector {
|
||||
sealed trait Message
|
||||
object Message {
|
||||
final case class NewValue[T](value: T) extends Message
|
||||
final case class PeriodicReportRequest(replyTo: ActorRef[Response.PeriodicReport])
|
||||
final case class PeriodicReportRequest(replyTo: ActorRef[Response.PeriodicReportResponse])
|
||||
extends Message
|
||||
final case class FinalReportRequest(replyTo: ActorRef[Response.FinalReport]) extends Message
|
||||
}
|
||||
|
||||
sealed trait Response
|
||||
object Response {
|
||||
final case class PeriodicReport(values: List[MetricValue]) extends Response
|
||||
sealed trait PeriodicReportResponse extends Response
|
||||
final case class PeriodicReport(values: List[MetricValue]) extends PeriodicReportResponse
|
||||
final case object ReportNotReady extends PeriodicReportResponse
|
||||
final case class MetricFinalReportData(
|
||||
name: String,
|
||||
value: MetricValue,
|
||||
@ -37,11 +39,17 @@ object MetricsCollector {
|
||||
): Behavior[Message] = {
|
||||
val clock = Clock.systemUTC()
|
||||
val startTime: Instant = clock.instant()
|
||||
new MetricsCollector[T](exposedMetrics, clock).handlingMessages(metrics, startTime, startTime)
|
||||
val minimumTimePeriodBetweenSubsequentReports: Duration = Duration.ofMillis(100)
|
||||
new MetricsCollector[T](exposedMetrics, minimumTimePeriodBetweenSubsequentReports, clock)
|
||||
.handlingMessages(metrics, startTime, startTime)
|
||||
}
|
||||
}
|
||||
|
||||
class MetricsCollector[T](exposedMetrics: Option[ExposedMetrics[T]], clock: Clock) {
|
||||
class MetricsCollector[T](
|
||||
exposedMetrics: Option[ExposedMetrics[T]],
|
||||
minimumTimePeriodBetweenSubsequentReports: Duration = Duration.ofMillis(100),
|
||||
clock: Clock,
|
||||
) {
|
||||
import MetricsCollector._
|
||||
import MetricsCollector.Message._
|
||||
import MetricsCollector.Response._
|
||||
@ -60,11 +68,20 @@ class MetricsCollector[T](exposedMetrics: Option[ExposedMetrics[T]], clock: Cloc
|
||||
|
||||
case request: PeriodicReportRequest =>
|
||||
val currentTime = clock.instant()
|
||||
val (newMetrics, values) = metrics
|
||||
.map(_.periodicValue(TimeUtil.durationBetween(lastPeriodicCheck, currentTime)))
|
||||
.unzip
|
||||
request.replyTo ! Response.PeriodicReport(values)
|
||||
handlingMessages(newMetrics, currentTime, startTime)
|
||||
val periodSinceLastReport: Duration =
|
||||
TimeUtil.durationBetween(lastPeriodicCheck, currentTime)
|
||||
if (
|
||||
TimeUtil.isAtLeast(periodSinceLastReport, minimumTimePeriodBetweenSubsequentReports)
|
||||
) {
|
||||
val (newMetrics, values) = metrics
|
||||
.map(_.periodicValue(periodSinceLastReport))
|
||||
.unzip
|
||||
request.replyTo ! Response.PeriodicReport(values)
|
||||
handlingMessages(newMetrics, currentTime, startTime)
|
||||
} else {
|
||||
request.replyTo ! Response.ReportNotReady
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
case request: FinalReportRequest =>
|
||||
val duration = TimeUtil.durationBetween(startTime, clock.instant())
|
||||
|
@ -7,6 +7,7 @@ import akka.actor.{Cancellable, CoordinatedShutdown}
|
||||
import akka.actor.typed.scaladsl.AskPattern._
|
||||
import akka.actor.typed.{ActorRef, ActorSystem, Props, SpawnProtocol}
|
||||
import akka.util.Timeout
|
||||
import com.daml.ledger.api.benchtool.metrics.MetricsCollector.Response
|
||||
import com.daml.ledger.api.benchtool.util.ReportFormatter
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
@ -57,13 +58,15 @@ case class MetricsManager[T](
|
||||
implicit val timeout: Timeout = Timeout(logInterval)
|
||||
collector
|
||||
.ask(MetricsCollector.Message.PeriodicReportRequest)
|
||||
.map { response =>
|
||||
logger.info(
|
||||
ReportFormatter.formatPeriodicReport(
|
||||
streamName = streamName,
|
||||
periodicReport = response,
|
||||
.collect {
|
||||
case Response.ReportNotReady => ()
|
||||
case response: Response.PeriodicReport =>
|
||||
logger.info(
|
||||
ReportFormatter.formatPeriodicReport(
|
||||
streamName = streamName,
|
||||
periodicReport = response,
|
||||
)
|
||||
)
|
||||
)
|
||||
}(system.executionContext)
|
||||
()
|
||||
})(system.executionContext)
|
||||
|
@ -17,4 +17,8 @@ object TimeUtil {
|
||||
|
||||
def durationBetween(before: Instant, after: Instant): Duration =
|
||||
Duration.between(before, after)
|
||||
|
||||
/** Returns `true` if `a` is longer or equal to `b`. */
|
||||
def isAtLeast(a: Duration, b: Duration): Boolean =
|
||||
a.compareTo(b) >= 0
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ package com.daml.ledger.api.benchtool
|
||||
|
||||
import akka.actor.testkit.typed.scaladsl.{BehaviorTestKit, ScalaTestWithActorTestKit}
|
||||
import akka.actor.typed.{ActorRef, Behavior}
|
||||
import com.daml.clock.AdjustableClock
|
||||
import com.daml.ledger.api.benchtool.metrics.{
|
||||
Metric,
|
||||
MetricValue,
|
||||
@ -21,9 +22,8 @@ class MetricsCollectorSpec extends ScalaTestWithActorTestKit with AnyWordSpecLik
|
||||
import MetricsCollector.Response
|
||||
|
||||
"The MetricsCollector" should {
|
||||
"respond with empty periodic report" in {
|
||||
val collector = spawn()
|
||||
val probe = testKit.createTestProbe[Response.PeriodicReport]()
|
||||
"respond with empty periodic report" in new CollectorFixture {
|
||||
val probe = testKit.createTestProbe[Response.PeriodicReportResponse]()
|
||||
|
||||
collector ! Message.PeriodicReportRequest(probe.ref)
|
||||
|
||||
@ -36,9 +36,8 @@ class MetricsCollectorSpec extends ScalaTestWithActorTestKit with AnyWordSpecLik
|
||||
)
|
||||
}
|
||||
|
||||
"respond with correct periodic report" in {
|
||||
val collector = spawn()
|
||||
val probe = testKit.createTestProbe[Response.PeriodicReport]()
|
||||
"respond with correct periodic report" in new CollectorFixture {
|
||||
val probe = testKit.createTestProbe[Response.PeriodicReportResponse]()
|
||||
|
||||
collector ! Message.NewValue("banana")
|
||||
collector ! Message.NewValue("mango")
|
||||
@ -53,9 +52,23 @@ class MetricsCollectorSpec extends ScalaTestWithActorTestKit with AnyWordSpecLik
|
||||
)
|
||||
}
|
||||
|
||||
"include objective-violating values in periodic report" in {
|
||||
val collector = spawn()
|
||||
val probe = testKit.createTestProbe[Response.PeriodicReport]()
|
||||
"not respond with a periodic report when requests are too frequent" in new CollectorFixture {
|
||||
val probe = testKit.createTestProbe[Response.PeriodicReportResponse]()
|
||||
|
||||
collector ! Message.NewValue("banana")
|
||||
collector ! Message.NewValue("mango")
|
||||
collector ! Message.PeriodicReportRequest(probe.ref)
|
||||
|
||||
probe.expectMessageType[Response.PeriodicReport]
|
||||
|
||||
clock.fastForward(Duration.ofSeconds(1))
|
||||
collector ! Message.PeriodicReportRequest(probe.ref)
|
||||
|
||||
probe.expectMessage(Response.ReportNotReady)
|
||||
}
|
||||
|
||||
"include objective-violating values in periodic report" in new CollectorFixture {
|
||||
val probe = testKit.createTestProbe[Response.PeriodicReportResponse]()
|
||||
|
||||
collector ! Message.NewValue("banana")
|
||||
collector ! Message.NewValue(TestObjective.TestViolatingValue)
|
||||
@ -71,11 +84,7 @@ class MetricsCollectorSpec extends ScalaTestWithActorTestKit with AnyWordSpecLik
|
||||
)
|
||||
}
|
||||
|
||||
"respond with empty final report" in {
|
||||
val now = Clock.systemUTC().instant()
|
||||
val tenSecondsAgo = now.minusSeconds(10)
|
||||
val clock = Clock.fixed(now, ZoneId.of("UTC"))
|
||||
val collector = spawnWithFixedClock(clock, tenSecondsAgo, tenSecondsAgo)
|
||||
"respond with empty final report" in new CollectorFixture {
|
||||
val probe = testKit.createTestProbe[Response.FinalReport]()
|
||||
|
||||
collector ! Message.FinalReportRequest(probe.ref)
|
||||
@ -94,11 +103,7 @@ class MetricsCollectorSpec extends ScalaTestWithActorTestKit with AnyWordSpecLik
|
||||
)
|
||||
}
|
||||
|
||||
"respond with correct final report" in {
|
||||
val now = Clock.systemUTC().instant()
|
||||
val tenSecondsAgo = now.minusSeconds(10)
|
||||
val clock = Clock.fixed(now, ZoneId.of("UTC"))
|
||||
val collector = spawnWithFixedClock(clock, tenSecondsAgo, tenSecondsAgo)
|
||||
"respond with correct final report" in new CollectorFixture {
|
||||
val probe = testKit.createTestProbe[Response.FinalReport]()
|
||||
|
||||
collector ! Message.NewValue("mango")
|
||||
@ -120,11 +125,7 @@ class MetricsCollectorSpec extends ScalaTestWithActorTestKit with AnyWordSpecLik
|
||||
)
|
||||
}
|
||||
|
||||
"include information about violated objective in the final report" in {
|
||||
val now = Clock.systemUTC().instant()
|
||||
val tenSecondsAgo = now.minusSeconds(10)
|
||||
val clock = Clock.fixed(now, ZoneId.of("UTC"))
|
||||
val collector = spawnWithFixedClock(clock, tenSecondsAgo, tenSecondsAgo)
|
||||
"include information about violated objective in the final report" in new CollectorFixture {
|
||||
val probe = testKit.createTestProbe[Response.FinalReport]()
|
||||
|
||||
collector ! Message.NewValue("mango")
|
||||
@ -163,22 +164,31 @@ class MetricsCollectorSpec extends ScalaTestWithActorTestKit with AnyWordSpecLik
|
||||
}
|
||||
}
|
||||
|
||||
private def spawn(): ActorRef[Message] =
|
||||
testKit.spawn(
|
||||
behavior = behavior,
|
||||
name = Random.alphanumeric.take(10).mkString,
|
||||
private class CollectorFixture {
|
||||
private val now = Clock.systemUTC().instant()
|
||||
private val tenSecondsAgo = now.minusSeconds(10)
|
||||
private val minimumReportInterval = Duration.ofSeconds(5)
|
||||
val clock = AdjustableClock(
|
||||
baseClock = Clock.fixed(now, ZoneId.of("UTC")),
|
||||
offset = Duration.ZERO,
|
||||
)
|
||||
val collector: ActorRef[Message] =
|
||||
spawnWithFixedClock(clock, tenSecondsAgo, tenSecondsAgo, minimumReportInterval)
|
||||
}
|
||||
|
||||
private def spawnWithFixedClock(
|
||||
clock: Clock,
|
||||
startTime: Instant,
|
||||
lastPeriodicCheck: Instant,
|
||||
minimumTimePeriodBetweenSubsequentReports: Duration,
|
||||
) = {
|
||||
val behavior = new MetricsCollector[String](None, clock).handlingMessages(
|
||||
metrics = List(TestMetric()),
|
||||
lastPeriodicCheck = lastPeriodicCheck,
|
||||
startTime = startTime,
|
||||
)
|
||||
val behavior =
|
||||
new MetricsCollector[String](None, minimumTimePeriodBetweenSubsequentReports, clock)
|
||||
.handlingMessages(
|
||||
metrics = List(TestMetric()),
|
||||
lastPeriodicCheck = lastPeriodicCheck,
|
||||
startTime = startTime,
|
||||
)
|
||||
testKit.spawn(
|
||||
behavior = behavior,
|
||||
name = Random.alphanumeric.take(10).mkString,
|
||||
|
Loading…
Reference in New Issue
Block a user