Total average rate SLO in the ledger-api-bench-tool [DPP-826] (#12219)

* Total average item rate service-level objective in the ledger-api-bench-tool.

CHANGELOG_BEGIN
- [Integration Kit] - ledger-api-bench-tool - Support for minimum and maximum average item rate objectives.
CHANGELOG_END

* Updated copyright headers

* Make stream objectives optional

* Add unit tests for optional objectives

* Formatted code changes
This commit is contained in:
Kamil Bozek 2022-01-05 18:46:26 +01:00 committed by GitHub
parent cf45255a0e
commit 39dc4676ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 556 additions and 236 deletions

View File

@ -81,7 +81,7 @@ object Benchmark {
.observer(
streamName = streamConfig.name,
logInterval = reportingPeriod,
metrics = MetricsSet.activeContractsMetrics,
metrics = MetricsSet.activeContractsMetrics(streamConfig.objectives),
logger = logger,
exposedMetrics = Some(
MetricsSet.activeContractsExposedMetrics(
@ -99,7 +99,7 @@ object Benchmark {
.observer(
streamName = streamConfig.name,
logInterval = reportingPeriod,
metrics = MetricsSet.completionsMetrics,
metrics = MetricsSet.completionsMetrics(streamConfig.objectives),
logger = logger,
exposedMetrics = Some(
MetricsSet

View File

@ -37,8 +37,6 @@ object LedgerApiBenchTool {
logger.info(s"Benchmark finished successfully.")
case Left(error) =>
logger.info(s"Benchmark failed: $error")
// Exit with error status code to facilitate scripting
sys.exit(1)
}
.recover { case ex =>
logger.error(s"ledger-api-bench-tool failure: ${ex.getMessage}", ex)

View File

@ -101,7 +101,7 @@ object Cli {
private def note(level: Int, param: String, desc: String = ""): OptionDef[Unit, Config] = {
val paddedParam = s"${" " * level * 2}$param"
val internalPadding = math.max(1, 40 - paddedParam.length)
val internalPadding = math.max(1, 50 - paddedParam.length)
note(s"$paddedParam${" " * internalPadding}$desc")
}
@ -110,12 +110,17 @@ object Cli {
note(1, "Transactions/transaction trees:")
note(2, "stream-type=<transactions|transaction-trees>", "(required)")
note(2, "name=<stream-name>", "Stream name used to identify results (required)")
note(2, "filters=party1@template1@template2+party2", "(required)")
note(
2,
"filters=party1@template1@template2+party2",
"List of per-party filters separated by the plus symbol (required)",
)
note(2, "begin-offset=<offset>")
note(2, "end-offset=<offset>")
note(2, "max-delay=<seconds>", "Max record time delay objective")
note(2, "min-consumption-speed=<speed>", "Min consumption speed objective")
note(2, "min-rate=<rate>", "Min item rate per second")
note(2, "min-item-rate=<rate>", "Min item rate per second")
note(2, "max-item-rate=<rate>", "Max item rate per second")
note(1, "Active contract sets:")
note(2, "stream-type=active-contracts", "(required)")
note(2, "name=<stream-name>", "Stream name used to identify results (required)")
@ -124,13 +129,16 @@ object Cli {
"filters=party1@template1@template2+party2",
"List of per-party filters separated by the plus symbol (required)",
)
note(2, "min-rate=<rate>", "Min item rate per second")
note(2, "min-item-rate=<rate>", "Min item rate per second")
note(2, "max-item-rate=<rate>", "Max item rate per second")
note(1, "Command completions:")
note(2, "stream-type=completions", "(required)")
note(2, "name=<stream-name>", "Stream name used to identify results (required)")
note(2, "party=<party>", "(required)")
note(2, "begin-offset=<offset>")
note(2, "template-ids=<id1>|<id2>")
note(2, "min-item-rate=<rate>", "Min item rate per second")
note(2, "max-item-rate=<rate>", "Max item rate per second")
}
def config(args: Array[String]): Option[Config] =
@ -172,16 +180,20 @@ object Cli {
endOffset <- optionalStringField("end-offset").map(_.map(offset))
maxDelaySeconds <- optionalLongField("max-delay")
minConsumptionSpeed <- optionalDoubleField("min-consumption-speed")
minItemRate <- optionalDoubleField("min-rate")
minItemRate <- optionalDoubleField("min-item-rate")
maxItemRate <- optionalDoubleField("max-item-rate")
} yield WorkflowConfig.StreamConfig.TransactionsStreamConfig(
name = name,
filters = filters,
beginOffset = beginOffset,
endOffset = endOffset,
objectives = WorkflowConfig.StreamConfig.Objectives(
maxDelaySeconds = maxDelaySeconds,
minConsumptionSpeed = minConsumptionSpeed,
minItemRate = minItemRate,
objectives = Some(
WorkflowConfig.StreamConfig.TransactionObjectives(
maxDelaySeconds = maxDelaySeconds,
minConsumptionSpeed = minConsumptionSpeed,
minItemRate = minItemRate,
maxItemRate = maxItemRate,
)
),
)
@ -194,16 +206,20 @@ object Cli {
endOffset <- optionalStringField("end-offset").map(_.map(offset))
maxDelaySeconds <- optionalLongField("max-delay")
minConsumptionSpeed <- optionalDoubleField("min-consumption-speed")
minItemRate <- optionalDoubleField("min-rate")
minItemRate <- optionalDoubleField("min-item-rate")
maxItemRate <- optionalDoubleField("max-item-rate")
} yield WorkflowConfig.StreamConfig.TransactionTreesStreamConfig(
name = name,
filters = filters,
beginOffset = beginOffset,
endOffset = endOffset,
objectives = WorkflowConfig.StreamConfig.Objectives(
maxDelaySeconds = maxDelaySeconds,
minConsumptionSpeed = minConsumptionSpeed,
minItemRate = minItemRate,
objectives = Some(
WorkflowConfig.StreamConfig.TransactionObjectives(
maxDelaySeconds = maxDelaySeconds,
minConsumptionSpeed = minConsumptionSpeed,
minItemRate = minItemRate,
maxItemRate = maxItemRate,
)
),
)
@ -211,14 +227,16 @@ object Cli {
: Either[String, WorkflowConfig.StreamConfig.ActiveContractsStreamConfig] = for {
name <- stringField("name")
filters <- stringField("filters").flatMap(filters)
minItemRate <- optionalDoubleField("min-rate")
minItemRate <- optionalDoubleField("min-item-rate")
maxItemRate <- optionalDoubleField("max-item-rate")
} yield WorkflowConfig.StreamConfig.ActiveContractsStreamConfig(
name = name,
filters = filters,
objectives = WorkflowConfig.StreamConfig.Objectives(
maxDelaySeconds = None,
minConsumptionSpeed = None,
minItemRate = minItemRate,
objectives = Some(
WorkflowConfig.StreamConfig.RateObjectives(
minItemRate = minItemRate,
maxItemRate = maxItemRate,
)
),
)
@ -228,11 +246,19 @@ object Cli {
party <- stringField("party")
applicationId <- stringField("application-id")
beginOffset <- optionalStringField("begin-offset").map(_.map(offset))
minItemRate <- optionalDoubleField("min-item-rate")
maxItemRate <- optionalDoubleField("max-item-rate")
} yield WorkflowConfig.StreamConfig.CompletionsStreamConfig(
name = name,
party = party,
applicationId = applicationId,
beginOffset = beginOffset,
objectives = Some(
WorkflowConfig.StreamConfig.RateObjectives(
minItemRate = minItemRate,
maxItemRate = maxItemRate,
)
),
)
val config = stringField("stream-type").flatMap[String, WorkflowConfig.StreamConfig] {

View File

@ -38,7 +38,7 @@ object WorkflowConfig {
filters: List[PartyFilter],
beginOffset: Option[LedgerOffset],
endOffset: Option[LedgerOffset],
objectives: StreamConfig.Objectives,
objectives: Option[StreamConfig.TransactionObjectives],
) extends StreamConfig
final case class TransactionTreesStreamConfig(
@ -46,13 +46,13 @@ object WorkflowConfig {
filters: List[PartyFilter],
beginOffset: Option[LedgerOffset],
endOffset: Option[LedgerOffset],
objectives: StreamConfig.Objectives,
objectives: Option[StreamConfig.TransactionObjectives],
) extends StreamConfig
final case class ActiveContractsStreamConfig(
name: String,
filters: List[PartyFilter],
objectives: StreamConfig.Objectives,
objectives: Option[StreamConfig.RateObjectives],
) extends StreamConfig
final case class CompletionsStreamConfig(
@ -60,14 +60,21 @@ object WorkflowConfig {
party: String,
applicationId: String,
beginOffset: Option[LedgerOffset],
objectives: Option[StreamConfig.RateObjectives],
) extends StreamConfig
final case class PartyFilter(party: String, templates: List[String])
case class Objectives(
case class TransactionObjectives(
maxDelaySeconds: Option[Long],
minConsumptionSpeed: Option[Double],
minItemRate: Option[Double],
maxItemRate: Option[Double],
)
case class RateObjectives(
minItemRate: Option[Double],
maxItemRate: Option[Double],
)
}
}

View File

@ -24,12 +24,19 @@ object WorkflowConfigParser {
case class ParserError(details: String)
object Decoders {
implicit val objectivesDecoder: Decoder[StreamConfig.Objectives] =
Decoder.forProduct3(
implicit val transactionObjectivesDecoder: Decoder[StreamConfig.TransactionObjectives] =
Decoder.forProduct4(
"max_delay_seconds",
"min_consumption_speed",
"min_item_rate",
)(StreamConfig.Objectives.apply)
"max_item_rate",
)(StreamConfig.TransactionObjectives.apply)
implicit val rateObjectivesDecoder: Decoder[StreamConfig.RateObjectives] =
Decoder.forProduct2(
"min_item_rate",
"max_item_rate",
)(StreamConfig.RateObjectives.apply)
implicit val offsetDecoder: Decoder[LedgerOffset] =
Decoder.decodeString.map(LedgerOffset.defaultInstance.withAbsolute)
@ -66,11 +73,12 @@ object WorkflowConfigParser {
)(StreamConfig.ActiveContractsStreamConfig.apply)
implicit val completionsStreamDecoder: Decoder[StreamConfig.CompletionsStreamConfig] =
Decoder.forProduct4(
Decoder.forProduct5(
"name",
"party",
"application_id",
"begin_offset",
"objectives",
)(StreamConfig.CompletionsStreamConfig.apply)
implicit val streamConfigDecoder: Decoder[StreamConfig] =

View File

@ -3,7 +3,6 @@
package com.daml.ledger.api.benchtool.metrics
import com.daml.ledger.api.benchtool.metrics.objectives.ServiceLevelObjective
import com.daml.ledger.api.benchtool.util.TimeUtil
import com.google.protobuf.timestamp.Timestamp
@ -12,7 +11,7 @@ import java.time.{Duration, Instant}
final case class ConsumptionSpeedMetric[T](
recordTimeFunction: T => Seq[Timestamp],
objective: Option[
(ServiceLevelObjective[ConsumptionSpeedMetric.Value], Option[ConsumptionSpeedMetric.Value])
(ConsumptionSpeedMetric.MinConsumptionSpeed, Option[ConsumptionSpeedMetric.Value])
],
previousLatest: Option[Instant] = None,
currentPeriodLatest: Option[Instant] = None,
@ -20,7 +19,7 @@ final case class ConsumptionSpeedMetric[T](
import ConsumptionSpeedMetric._
override type V = Value
override type Objective = ServiceLevelObjective[Value]
override type Objective = MinConsumptionSpeed
override def onNext(value: T): ConsumptionSpeedMetric[T] = {
val recordTimes = recordTimeFunction(value)
@ -50,10 +49,14 @@ final case class ConsumptionSpeedMetric[T](
override def finalValue(totalDuration: Duration): Value =
Value(None)
override def violatedObjective: Option[(ServiceLevelObjective[Value], Value)] =
override def violatedPeriodicObjectives: List[(MinConsumptionSpeed, Value)] =
objective.collect {
case (objective, value) if value.isDefined => objective -> value.get
}
}.toList
override def violatedFinalObjectives(
totalDuration: Duration
): List[(MinConsumptionSpeed, Value)] = Nil
private def periodicSpeed(periodDuration: Duration): Double =
(previousLatest, currentPeriodLatest) match {
@ -64,7 +67,7 @@ final case class ConsumptionSpeedMetric[T](
}
private def updatedObjectives(newValue: Value): Option[
(ServiceLevelObjective[ConsumptionSpeedMetric.Value], Option[ConsumptionSpeedMetric.Value])
(MinConsumptionSpeed, Option[Value])
] =
objective.map { case (objective, currentMaxValue) =>
if (objective.isViolatedBy(newValue)) {
@ -84,7 +87,7 @@ object ConsumptionSpeedMetric {
def empty[T](
recordTimeFunction: T => Seq[Timestamp],
objective: Option[ServiceLevelObjective[Value]] = None,
objective: Option[MinConsumptionSpeed] = None,
): ConsumptionSpeedMetric[T] =
ConsumptionSpeedMetric(
recordTimeFunction,
@ -107,4 +110,11 @@ object ConsumptionSpeedMetric {
}
}
}
final case class MinConsumptionSpeed(minSpeed: Double) extends ServiceLevelObjective[Value] {
override def isViolatedBy(metricValue: Value): Boolean =
Ordering[Value].lt(metricValue, v)
private val v = Value(Some(minSpeed))
}
}

View File

@ -2,22 +2,22 @@
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.metrics
import com.daml.ledger.api.benchtool.metrics.objectives.ServiceLevelObjective
import java.time.Duration
final case class CountRateMetric[T](
countingFunction: T => Int,
objective: Option[
(ServiceLevelObjective[CountRateMetric.Value], Option[CountRateMetric.Value])
periodicObjectives: List[
(CountRateMetric.RateObjective, Option[CountRateMetric.Value])
],
finalObjectives: List[CountRateMetric.RateObjective],
counter: Int = 0,
lastCount: Int = 0,
) extends Metric[T] {
import CountRateMetric._
override type V = Value
override type Objective = ServiceLevelObjective[Value]
override type Objective = RateObjective
override def onNext(value: T): CountRateMetric[T] =
this.copy(counter = counter + countingFunction(value))
@ -25,7 +25,7 @@ final case class CountRateMetric[T](
override def periodicValue(periodDuration: Duration): (Metric[T], Value) = {
val value = Value(periodicRate(periodDuration))
val updatedMetric = this.copy(
objective = updatedObjective(value),
periodicObjectives = updatedPeriodicObjectives(value),
lastCount = counter,
)
(updatedMetric, value)
@ -34,21 +34,29 @@ final case class CountRateMetric[T](
override def finalValue(totalDuration: Duration): Value =
Value(ratePerSecond = totalRate(totalDuration))
override def violatedObjective: Option[(ServiceLevelObjective[Value], Value)] =
objective.collect { case (objective, Some(value)) =>
override def violatedPeriodicObjectives: List[(RateObjective, Value)] =
periodicObjectives.collect { case (objective, Some(value)) =>
objective -> value
}
override def violatedFinalObjectives(
totalDuration: Duration
): List[(RateObjective, Value)] =
finalObjectives.collect {
case objective if objective.isViolatedBy(finalValue(totalDuration)) =>
(objective, finalValue(totalDuration))
}
private def periodicRate(periodDuration: Duration): Double =
(counter - lastCount) * 1000.0 / periodDuration.toMillis
private def totalRate(totalDuration: Duration): Double =
counter / totalDuration.toMillis.toDouble * 1000.0
private def updatedObjective(
private def updatedPeriodicObjectives(
newValue: Value
): Option[(ServiceLevelObjective[CountRateMetric.Value], Option[CountRateMetric.Value])] = {
objective.map { case (objective, currentMinValue) =>
): List[(RateObjective, Option[Value])] = {
periodicObjectives.map { case (objective, currentMinValue) =>
if (objective.isViolatedBy(newValue)) {
currentMinValue match {
case None => objective -> Some(newValue)
@ -69,11 +77,30 @@ object CountRateMetric {
Ordering.fromLessThan(_.ratePerSecond < _.ratePerSecond)
}
abstract class RateObjective extends ServiceLevelObjective[Value] with Product with Serializable
object RateObjective {
final case class MinRate(minAllowedRatePerSecond: Double) extends RateObjective {
override def isViolatedBy(metricValue: CountRateMetric.Value): Boolean =
Ordering[CountRateMetric.Value].lt(metricValue, v)
private val v = CountRateMetric.Value(minAllowedRatePerSecond)
}
final case class MaxRate(minAllowedRatePerSecond: Double) extends RateObjective {
override def isViolatedBy(metricValue: CountRateMetric.Value): Boolean =
Ordering[CountRateMetric.Value].gt(metricValue, v)
private val v = CountRateMetric.Value(minAllowedRatePerSecond)
}
}
def empty[T](
countingFunction: T => Int,
objective: Option[ServiceLevelObjective[Value]] = None,
periodicObjectives: List[RateObjective],
finalObjectives: List[RateObjective],
): CountRateMetric[T] = CountRateMetric[T](
countingFunction,
objective.map(obj => obj -> None),
periodicObjectives.map(obj => obj -> None),
finalObjectives = finalObjectives,
)
}

View File

@ -3,7 +3,6 @@
package com.daml.ledger.api.benchtool.metrics
import com.daml.ledger.api.benchtool.metrics.objectives.ServiceLevelObjective
import com.daml.ledger.api.benchtool.util.TimeUtil
import com.google.protobuf.timestamp.Timestamp
@ -12,13 +11,13 @@ import java.time.{Clock, Duration}
final case class DelayMetric[T](
recordTimeFunction: T => Seq[Timestamp],
clock: Clock,
objective: Option[(ServiceLevelObjective[DelayMetric.Value], Option[DelayMetric.Value])],
objective: Option[(DelayMetric.MaxDelay, Option[DelayMetric.Value])],
delaysInCurrentInterval: List[Duration] = List.empty,
) extends Metric[T] {
import DelayMetric._
override type V = Value
override type Objective = ServiceLevelObjective[Value]
override type Objective = MaxDelay
override def onNext(value: T): DelayMetric[T] = {
val now = clock.instant()
@ -39,14 +38,18 @@ final case class DelayMetric[T](
override def finalValue(totalDuration: Duration): Value =
Value(None)
override def violatedObjective: Option[(ServiceLevelObjective[Value], Value)] =
override def violatedPeriodicObjectives: List[(MaxDelay, Value)] =
objective.collect {
case (objective, value) if value.isDefined => objective -> value.get
}
}.toList
override def violatedFinalObjectives(
totalDuration: Duration
): List[(MaxDelay, Value)] = Nil
private def updatedObjective(
newValue: Value
): Option[(ServiceLevelObjective[DelayMetric.Value], Option[DelayMetric.Value])] =
): Option[(MaxDelay, Option[DelayMetric.Value])] =
objective.map { case (objective, currentViolatingValue) =>
// verify if the new value violates objective's requirements
if (objective.isViolatedBy(newValue)) {
@ -80,7 +83,7 @@ object DelayMetric {
def empty[T](
recordTimeFunction: T => Seq[Timestamp],
clock: Clock,
objective: Option[ServiceLevelObjective[Value]] = None,
objective: Option[MaxDelay] = None,
): DelayMetric[T] =
DelayMetric(
recordTimeFunction = recordTimeFunction,
@ -103,4 +106,11 @@ object DelayMetric {
}
}
}
final case class MaxDelay(maxDelaySeconds: Long)
extends ServiceLevelObjective[DelayMetric.Value] {
override def isViolatedBy(metricValue: DelayMetric.Value): Boolean =
metricValue.meanDelaySeconds.exists(_ > maxDelaySeconds)
}
}

View File

@ -3,8 +3,6 @@
package com.daml.ledger.api.benchtool.metrics
import com.daml.ledger.api.benchtool.metrics.objectives.ServiceLevelObjective
import java.time.Duration
trait Metric[Elem] {
@ -19,9 +17,11 @@ trait Metric[Elem] {
def finalValue(totalDuration: Duration): V
def violatedObjective: Option[(Objective, V)] = None
def violatedPeriodicObjectives: List[(Objective, V)] = Nil
def name: String = getClass.getSimpleName
def violatedFinalObjectives(totalDuration: Duration): List[(Objective, V)]
def name: String = getClass.getSimpleName()
}

View File

@ -5,7 +5,6 @@ package com.daml.ledger.api.benchtool.metrics
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import com.daml.ledger.api.benchtool.metrics.objectives.ServiceLevelObjective
import com.daml.ledger.api.benchtool.util.TimeUtil
import java.time.{Clock, Duration, Instant}
@ -26,7 +25,7 @@ object MetricsCollector {
final case class MetricFinalReportData(
name: String,
value: MetricValue,
violatedObjective: Option[(ServiceLevelObjective[_], MetricValue)],
violatedObjectives: List[(ServiceLevelObjective[_], MetricValue)],
)
final case class FinalReport(totalDuration: Duration, metricsData: List[MetricFinalReportData])
extends Response
@ -73,7 +72,8 @@ class MetricsCollector[T](exposedMetrics: Option[ExposedMetrics[T]], clock: Cloc
MetricFinalReportData(
name = metric.name,
value = metric.finalValue(duration),
violatedObjective = metric.violatedObjective,
violatedObjectives =
metric.violatedPeriodicObjectives ::: metric.violatedFinalObjectives(duration),
)
}
request.replyTo ! FinalReport(duration, data)

View File

@ -36,7 +36,8 @@ case class MetricsManager[T](
finalReport = response,
)
)
if (response.metricsData.exists(_.violatedObjective.isDefined))
val atLeastOneObjectiveViolated = response.metricsData.exists(_.violatedObjectives.nonEmpty)
if (atLeastOneObjectiveViolated)
StreamResult.ObjectivesViolated
else
StreamResult.Ok

View File

@ -4,8 +4,7 @@
package com.daml.ledger.api.benchtool.metrics
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig.Objectives
import com.daml.ledger.api.benchtool.metrics.objectives.{MaxDelay, MinConsumptionSpeed, MinRate}
import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig._
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.api.v1.transaction_service.{
@ -18,8 +17,10 @@ import java.time.{Clock, Duration}
import scala.concurrent.duration.FiniteDuration
object MetricsSet {
def transactionMetrics(objectives: Objectives): List[Metric[GetTransactionsResponse]] =
all[GetTransactionsResponse](
def transactionMetrics(
objectives: Option[TransactionObjectives]
): List[Metric[GetTransactionsResponse]] =
transactionMetrics[GetTransactionsResponse](
countingFunction = _.transactions.length,
sizingFunction = _.serializedSize.toLong,
recordTimeFunction = _.transactions.collect {
@ -45,9 +46,9 @@ object MetricsSet {
)
def transactionTreesMetrics(
objectives: Objectives
objectives: Option[TransactionObjectives]
): List[Metric[GetTransactionTreesResponse]] =
all[GetTransactionTreesResponse](
transactionMetrics[GetTransactionTreesResponse](
countingFunction = _.transactions.length,
sizingFunction = _.serializedSize.toLong,
recordTimeFunction = _.transactions.collect {
@ -72,10 +73,17 @@ object MetricsSet {
}),
)
def activeContractsMetrics: List[Metric[GetActiveContractsResponse]] =
def activeContractsMetrics(
objectives: Option[RateObjectives]
): List[Metric[GetActiveContractsResponse]] =
List[Metric[GetActiveContractsResponse]](
CountRateMetric.empty[GetActiveContractsResponse](
countingFunction = _.activeContracts.length
countingFunction = _.activeContracts.length,
periodicObjectives = Nil,
finalObjectives = List(
objectives.flatMap(_.minItemRate.map(CountRateMetric.RateObjective.MinRate)),
objectives.flatMap(_.maxItemRate.map(CountRateMetric.RateObjective.MaxRate)),
).flatten,
),
TotalCountMetric.empty[GetActiveContractsResponse](
countingFunction = _.activeContracts.length
@ -99,10 +107,17 @@ object MetricsSet {
recordTimeFunction = None,
)
def completionsMetrics: List[Metric[CompletionStreamResponse]] =
def completionsMetrics(
objectives: Option[RateObjectives]
): List[Metric[CompletionStreamResponse]] =
List[Metric[CompletionStreamResponse]](
CountRateMetric.empty(
countingFunction = _.completions.length
countingFunction = _.completions.length,
periodicObjectives = Nil,
finalObjectives = List(
objectives.flatMap(_.minItemRate.map(CountRateMetric.RateObjective.MinRate)),
objectives.flatMap(_.maxItemRate.map(CountRateMetric.RateObjective.MaxRate)),
).flatten,
),
TotalCountMetric.empty(
countingFunction = _.completions.length
@ -126,28 +141,33 @@ object MetricsSet {
recordTimeFunction = None,
)
private def all[T](
private def transactionMetrics[T](
countingFunction: T => Int,
sizingFunction: T => Long,
recordTimeFunction: T => Seq[Timestamp],
objectives: Objectives,
objectives: Option[TransactionObjectives],
): List[Metric[T]] = {
List[Metric[T]](
CountRateMetric.empty[T](
countingFunction = countingFunction,
objective = objectives.minItemRate.map(MinRate),
periodicObjectives = Nil,
finalObjectives = List(
objectives.flatMap(_.minItemRate.map(CountRateMetric.RateObjective.MinRate)),
objectives.flatMap(_.maxItemRate.map(CountRateMetric.RateObjective.MaxRate)),
).flatten,
),
TotalCountMetric.empty[T](
countingFunction = countingFunction
),
ConsumptionSpeedMetric.empty[T](
recordTimeFunction = recordTimeFunction,
objective = objectives.minConsumptionSpeed.map(MinConsumptionSpeed),
objective =
objectives.flatMap(_.minConsumptionSpeed.map(ConsumptionSpeedMetric.MinConsumptionSpeed)),
),
DelayMetric.empty[T](
recordTimeFunction = recordTimeFunction,
clock = Clock.systemUTC(),
objective = objectives.maxDelaySeconds.map(MaxDelay),
objective = objectives.flatMap(_.maxDelaySeconds.map(DelayMetric.MaxDelay)),
),
SizeMetric.empty[T](
sizingFunction = sizingFunction

View File

@ -1,9 +1,7 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.metrics.objectives
import com.daml.ledger.api.benchtool.metrics.MetricValue
package com.daml.ledger.api.benchtool.metrics
trait ServiceLevelObjective[MetricValueType <: MetricValue] {
def isViolatedBy(metricValue: MetricValueType): Boolean

View File

@ -35,6 +35,8 @@ final case class SizeMetric[T](
Value(value)
}
override def violatedFinalObjectives(totalDuration: Duration): List[(Objective, Value)] = Nil
private def periodicSizeRate(periodDuration: Duration): Double =
(currentSizeBytesBucket.toDouble / periodDuration.toMillis) * 1000.0 / (1024 * 1024)
}

View File

@ -21,6 +21,8 @@ final case class TotalCountMetric[T](
override def finalValue(totalDuration: Duration): Value =
Value(totalCount = counter)
override def violatedFinalObjectives(totalDuration: Duration): List[(Objective, Value)] = Nil
}
object TotalCountMetric {

View File

@ -1,11 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.metrics.objectives
import com.daml.ledger.api.benchtool.metrics.DelayMetric
final case class MaxDelay(maxDelaySeconds: Long) extends ServiceLevelObjective[DelayMetric.Value] {
override def isViolatedBy(metricValue: DelayMetric.Value): Boolean =
metricValue.meanDelaySeconds.exists(_ > maxDelaySeconds)
}

View File

@ -1,15 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.metrics.objectives
import com.daml.ledger.api.benchtool.metrics.ConsumptionSpeedMetric
// TODO: add warm-up parameter
final case class MinConsumptionSpeed(minSpeed: Double)
extends ServiceLevelObjective[ConsumptionSpeedMetric.Value] {
override def isViolatedBy(metricValue: ConsumptionSpeedMetric.Value): Boolean =
Ordering[ConsumptionSpeedMetric.Value].lt(metricValue, v)
private val v = ConsumptionSpeedMetric.Value(Some(minSpeed))
}

View File

@ -1,14 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.metrics.objectives
import com.daml.ledger.api.benchtool.metrics.CountRateMetric
final case class MinRate(minAllowedRatePerSecond: Double)
extends ServiceLevelObjective[CountRateMetric.Value] {
override def isViolatedBy(metricValue: CountRateMetric.Value): Boolean =
Ordering[CountRateMetric.Value].lt(metricValue, v)
private val v = CountRateMetric.Value(minAllowedRatePerSecond)
}

View File

@ -5,12 +5,6 @@ package com.daml.ledger.api.benchtool.util
import com.daml.ledger.api.benchtool.metrics.MetricsCollector.Response.{FinalReport, PeriodicReport}
import com.daml.ledger.api.benchtool.metrics._
import com.daml.ledger.api.benchtool.metrics.objectives.{
MaxDelay,
MinConsumptionSpeed,
MinRate,
ServiceLevelObjective,
}
object ReportFormatter {
def formatPeriodicReport(streamName: String, periodicReport: PeriodicReport): String = {
@ -30,14 +24,14 @@ object ReportFormatter {
else
None
val violatedObjective: Option[String] = metricData.violatedObjective
.map { case (objective, value) =>
val violatedObjectives: List[String] =
metricData.violatedObjectives.map { case (objective, value) =>
val info =
s"${objectiveName(objective)}: required: ${formattedObjectiveValue(objective)}, metered: ${formattedValue(value)}"
failureFormat(info)
}
valueLog.toList ::: violatedObjective.toList
valueLog.toList ::: violatedObjectives
}
val durationLog =
@ -93,21 +87,25 @@ object ReportFormatter {
private def objectiveName(objective: ServiceLevelObjective[_]): String =
objective match {
case _: MaxDelay =>
case _: DelayMetric.MaxDelay =>
s"Maximum record time delay [s]"
case _: MinConsumptionSpeed =>
case _: ConsumptionSpeedMetric.MinConsumptionSpeed =>
s"Minimum consumption speed [-]"
case _: MinRate =>
case _: CountRateMetric.RateObjective.MinRate =>
s"Minimum item rate [item/s]"
case _: CountRateMetric.RateObjective.MaxRate =>
s"Maximum item rate [item/s]"
}
private def formattedObjectiveValue(objective: ServiceLevelObjective[_]): String =
objective match {
case obj: MaxDelay =>
case obj: DelayMetric.MaxDelay =>
obj.maxDelaySeconds.toString
case obj: MinConsumptionSpeed =>
case obj: ConsumptionSpeedMetric.MinConsumptionSpeed =>
obj.minSpeed.toString
case obj: MinRate =>
case obj: CountRateMetric.RateObjective.MinRate =>
obj.minAllowedRatePerSecond.toString
case obj: CountRateMetric.RateObjective.MaxRate =>
obj.minAllowedRatePerSecond.toString
}

View File

@ -32,7 +32,8 @@ class WorkflowConfigParserSpec extends AnyWordSpec with Matchers {
| - Foo1
| - Foo3
| objectives:
| min_item_rate: 123""".stripMargin
| min_item_rate: 123
| max_item_rate: 456""".stripMargin
parseYaml(yaml) shouldBe Right(
WorkflowConfig(
@ -60,10 +61,11 @@ class WorkflowConfigParserSpec extends AnyWordSpec with Matchers {
templates = List("Foo1", "Foo3"),
)
),
objectives = WorkflowConfig.StreamConfig.Objectives(
maxDelaySeconds = None,
minConsumptionSpeed = None,
minItemRate = Some(123),
objectives = Some(
WorkflowConfig.StreamConfig.RateObjectives(
minItemRate = Some(123),
maxItemRate = Some(456),
)
),
)
),
@ -139,7 +141,9 @@ class WorkflowConfigParserSpec extends AnyWordSpec with Matchers {
| end_offset: bar
| objectives:
| max_delay_seconds: 123
| min_consumption_speed: 2.34""".stripMargin
| min_consumption_speed: 2.34
| min_item_rate: 12
| max_item_rate: 34""".stripMargin
parseYaml(yaml) shouldBe Right(
WorkflowConfig(
submission = None,
@ -154,10 +158,13 @@ class WorkflowConfigParserSpec extends AnyWordSpec with Matchers {
),
beginOffset = Some(offset("foo")),
endOffset = Some(offset("bar")),
objectives = WorkflowConfig.StreamConfig.Objectives(
maxDelaySeconds = Some(123),
minConsumptionSpeed = Some(2.34),
minItemRate = None,
objectives = Some(
WorkflowConfig.StreamConfig.TransactionObjectives(
maxDelaySeconds = Some(123),
minConsumptionSpeed = Some(2.34),
minItemRate = Some(12),
maxItemRate = Some(34),
)
),
)
),
@ -165,6 +172,82 @@ class WorkflowConfigParserSpec extends AnyWordSpec with Matchers {
)
}
"parse stream configuration with some objectives set" in {
val yaml =
"""streams:
| - type: transactions
| name: stream-1
| filters:
| - party: Obs-2
| templates:
| - Foo1
| - Foo3
| begin_offset: foo
| end_offset: bar
| objectives:
| min_consumption_speed: 2.34
| min_item_rate: 12""".stripMargin
parseYaml(yaml) shouldBe Right(
WorkflowConfig(
submission = None,
streams = List(
WorkflowConfig.StreamConfig.TransactionsStreamConfig(
name = "stream-1",
filters = List(
WorkflowConfig.StreamConfig.PartyFilter(
party = "Obs-2",
templates = List("Foo1", "Foo3"),
)
),
beginOffset = Some(offset("foo")),
endOffset = Some(offset("bar")),
objectives = Some(
WorkflowConfig.StreamConfig.TransactionObjectives(
maxDelaySeconds = None,
minConsumptionSpeed = Some(2.34),
minItemRate = Some(12),
maxItemRate = None,
)
),
)
),
)
)
}
"parse stream configuration without objectives" in {
val yaml =
"""streams:
| - type: transactions
| name: stream-1
| filters:
| - party: Obs-2
| templates:
| - Foo1
| - Foo3
| begin_offset: foo
| end_offset: bar""".stripMargin
parseYaml(yaml) shouldBe Right(
WorkflowConfig(
submission = None,
streams = List(
WorkflowConfig.StreamConfig.TransactionsStreamConfig(
name = "stream-1",
filters = List(
WorkflowConfig.StreamConfig.PartyFilter(
party = "Obs-2",
templates = List("Foo1", "Foo3"),
)
),
beginOffset = Some(offset("foo")),
endOffset = Some(offset("bar")),
objectives = None,
)
),
)
)
}
"parse transaction-trees stream configuration" in {
val yaml =
"""streams:
@ -179,7 +262,9 @@ class WorkflowConfigParserSpec extends AnyWordSpec with Matchers {
| end_offset: bar
| objectives:
| max_delay_seconds: 123
| min_consumption_speed: 2.34""".stripMargin
| min_consumption_speed: 2.34
| min_item_rate: 12
| max_item_rate: 34""".stripMargin
parseYaml(yaml) shouldBe Right(
WorkflowConfig(
submission = None,
@ -194,10 +279,13 @@ class WorkflowConfigParserSpec extends AnyWordSpec with Matchers {
),
beginOffset = Some(offset("foo")),
endOffset = Some(offset("bar")),
objectives = WorkflowConfig.StreamConfig.Objectives(
maxDelaySeconds = Some(123),
minConsumptionSpeed = Some(2.34),
minItemRate = None,
objectives = Some(
WorkflowConfig.StreamConfig.TransactionObjectives(
maxDelaySeconds = Some(123),
minConsumptionSpeed = Some(2.34),
minItemRate = Some(12),
maxItemRate = Some(34),
)
),
)
),
@ -216,7 +304,8 @@ class WorkflowConfigParserSpec extends AnyWordSpec with Matchers {
| - Foo1
| - Foo3
| objectives:
| min_item_rate: 123""".stripMargin
| min_item_rate: 123
| max_item_rate: 4567""".stripMargin
parseYaml(yaml) shouldBe Right(
WorkflowConfig(
submission = None,
@ -229,10 +318,11 @@ class WorkflowConfigParserSpec extends AnyWordSpec with Matchers {
templates = List("Foo1", "Foo3"),
)
),
objectives = WorkflowConfig.StreamConfig.Objectives(
minConsumptionSpeed = None,
maxDelaySeconds = None,
minItemRate = Some(123),
objectives = Some(
WorkflowConfig.StreamConfig.RateObjectives(
minItemRate = Some(123),
maxItemRate = Some(4567),
)
),
)
),
@ -247,7 +337,10 @@ class WorkflowConfigParserSpec extends AnyWordSpec with Matchers {
| name: stream-1
| party: Obs-2
| begin_offset: foo
| application_id: foobar""".stripMargin
| application_id: foobar
| objectives:
| min_item_rate: 12
| max_item_rate: 345""".stripMargin
parseYaml(yaml) shouldBe Right(
WorkflowConfig(
submission = None,
@ -257,6 +350,12 @@ class WorkflowConfigParserSpec extends AnyWordSpec with Matchers {
party = "Obs-2",
beginOffset = Some(offset("foo")),
applicationId = "foobar",
objectives = Some(
WorkflowConfig.StreamConfig.RateObjectives(
minItemRate = Some(12),
maxItemRate = Some(345),
)
),
)
),
)

View File

@ -4,7 +4,7 @@
package com.daml.ledger.api.benchtool
import com.daml.ledger.api.benchtool.metrics.ConsumptionSpeedMetric
import com.daml.ledger.api.benchtool.metrics.objectives.MinConsumptionSpeed
import com.daml.ledger.api.benchtool.metrics.ConsumptionSpeedMetric._
import com.google.protobuf.timestamp.Timestamp
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
@ -20,8 +20,8 @@ class ConsumptionSpeedMetricSpec extends AnyWordSpec with Matchers {
val (_, periodicValue) = metric.periodicValue(Duration.ofMillis(100))
val finalValue = metric.finalValue(Duration.ofSeconds(1))
periodicValue shouldBe ConsumptionSpeedMetric.Value(Some(0.0))
finalValue shouldBe ConsumptionSpeedMetric.Value(None)
periodicValue shouldBe Value(Some(0.0))
finalValue shouldBe Value(None)
}
"compute values after processing elements" in {
@ -57,8 +57,8 @@ class ConsumptionSpeedMetricSpec extends AnyWordSpec with Matchers {
val expectedSpeed =
(lastElementOfThePeriod.getEpochSecond - firstElementOfThePeriod.getEpochSecond) * 1000.0 / periodDuration.toMillis
periodicValue shouldBe ConsumptionSpeedMetric.Value(Some(expectedSpeed))
finalValue shouldBe ConsumptionSpeedMetric.Value(None)
periodicValue shouldBe Value(Some(expectedSpeed))
finalValue shouldBe Value(None)
}
"correctly handle initial periods with a single record time" in {
@ -81,8 +81,8 @@ class ConsumptionSpeedMetricSpec extends AnyWordSpec with Matchers {
.periodicValue(periodDuration)
val finalValue = newMetric.finalValue(totalDuration)
periodicValue shouldBe ConsumptionSpeedMetric.Value(Some(0.0))
finalValue shouldBe ConsumptionSpeedMetric.Value(None)
periodicValue shouldBe Value(Some(0.0))
finalValue shouldBe Value(None)
}
"correctly handle non-initial periods with a single record time" in {
@ -116,8 +116,8 @@ class ConsumptionSpeedMetricSpec extends AnyWordSpec with Matchers {
.periodicValue(periodDuration)
val finalValue = newMetric.finalValue(totalDuration)
periodicValue shouldBe ConsumptionSpeedMetric.Value(Some(300.0))
finalValue shouldBe ConsumptionSpeedMetric.Value(None)
periodicValue shouldBe Value(Some(300.0))
finalValue shouldBe Value(None)
}
"correctly handle periods with no elements" in {
@ -149,8 +149,8 @@ class ConsumptionSpeedMetricSpec extends AnyWordSpec with Matchers {
.periodicValue(periodDuration)
val finalValue = newMetric.finalValue(totalDuration)
periodicValue shouldBe ConsumptionSpeedMetric.Value(Some(0.0))
finalValue shouldBe ConsumptionSpeedMetric.Value(None)
periodicValue shouldBe Value(Some(0.0))
finalValue shouldBe Value(None)
}
"correctly handle multiple periods with elements" in {
@ -201,8 +201,8 @@ class ConsumptionSpeedMetricSpec extends AnyWordSpec with Matchers {
val expectedSpeed =
(last.getEpochSecond - first.getEpochSecond) * 1000.0 / period3Duration.toMillis
periodicValue shouldBe ConsumptionSpeedMetric.Value(Some(expectedSpeed))
finalValue shouldBe ConsumptionSpeedMetric.Value(None)
periodicValue shouldBe Value(Some(expectedSpeed))
finalValue shouldBe Value(None)
}
"compute violated min speed SLO and the minimum speed" in {
@ -250,10 +250,10 @@ class ConsumptionSpeedMetricSpec extends AnyWordSpec with Matchers {
.onNext(elem3)
.periodicValue(periodDuration)
._1
.violatedObjective
.violatedPeriodicObjectives
violatedObjectives shouldBe Some(
objective -> ConsumptionSpeedMetric.Value(Some(0.8))
violatedObjectives shouldBe List(
objective -> Value(Some(0.8))
)
}
}

View File

@ -4,8 +4,7 @@
package com.daml.ledger.api.benchtool
import com.daml.ledger.api.benchtool.metrics.CountRateMetric
import com.daml.ledger.api.benchtool.metrics.CountRateMetric.Value
import com.daml.ledger.api.benchtool.metrics.objectives.MinRate
import com.daml.ledger.api.benchtool.metrics.CountRateMetric._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
@ -13,7 +12,7 @@ import java.time.Duration
import scala.language.existentials
class CountRateMetricSpec extends AnyWordSpec with Matchers {
CountRateMetric.getClass.getSimpleName should {
"CountRateMetric" should {
"correctly handle initial state" in {
val periodDuration: Duration = Duration.ofMillis(100)
val totalDuration: Duration = Duration.ofSeconds(1)
@ -98,11 +97,11 @@ class CountRateMetricSpec extends AnyWordSpec with Matchers {
)
}
"compute violated minimum rate SLO and the corresponing violating value" in {
"compute violated minimum rate periodic SLO and the corresponing violating value" in {
val periodDuration: Duration = Duration.ofSeconds(2)
val minAllowedRatePerSecond = 2.0
val objective = MinRate(minAllowedRatePerSecond)
val metric = anEmptyStringMetric(Some(objective))
val objective = RateObjective.MinRate(minAllowedRatePerSecond)
val metric = anEmptyStringMetric(periodicObjectives = List(objective))
val violatedObjective =
metric
@ -116,18 +115,18 @@ class CountRateMetricSpec extends AnyWordSpec with Matchers {
.periodicValue(periodDuration)
._1
.onNext("ijklmn")
.violatedObjective
.violatedPeriodicObjectives
violatedObjective shouldBe Some(
objective -> CountRateMetric.Value(1.5)
violatedObjective shouldBe List(
objective -> Value(1.5)
)
}
"not report not violated objectives" in {
"not report not violated periodic min rate objectives" in {
val periodDuration: Duration = Duration.ofSeconds(2)
val minAllowedRatePerSecond = 2.0
val objective = MinRate(minAllowedRatePerSecond)
val metric = anEmptyStringMetric(Some(objective))
val objective = RateObjective.MinRate(minAllowedRatePerSecond)
val metric = anEmptyStringMetric(periodicObjectives = List(objective))
val violatedObjective =
metric
@ -141,13 +140,148 @@ class CountRateMetricSpec extends AnyWordSpec with Matchers {
.periodicValue(periodDuration)
._1
.onNext("lmnoprst")
.violatedObjective
.violatedPeriodicObjectives
violatedObjective shouldBe None
violatedObjective shouldBe Nil
}
"report violated min rate final objective" in {
val periodDuration: Duration = Duration.ofSeconds(2)
val totalDuration: Duration = Duration.ofSeconds(6)
val minAllowedRatePerSecond = 2.0
val objective = RateObjective.MinRate(minAllowedRatePerSecond)
val metric = anEmptyStringMetric(finalObjectives = List(objective))
val violatedObjective =
metric
.onNext("abc")
.periodicValue(periodDuration)
._1
.onNext("def")
.onNext("ghi")
// total rate is (3 + 3 + 3) / 6.0
.violatedFinalObjectives(totalDuration)
violatedObjective shouldBe List(
objective -> Value(1.5)
)
}
"not report non-violated min rate final objective" in {
val periodDuration: Duration = Duration.ofSeconds(2)
val totalDuration: Duration = Duration.ofSeconds(6)
val minAllowedRatePerSecond = 2.0
val objective = RateObjective.MinRate(minAllowedRatePerSecond)
val metric = anEmptyStringMetric(finalObjectives = List(objective))
val violatedObjective =
metric
.onNext("abc")
.periodicValue(periodDuration)
._1
.onNext("def")
.onNext("ghi")
.onNext("jklmno")
// total rate is (3 + 3 + 3 + 6) / 6.0
.violatedFinalObjectives(totalDuration)
violatedObjective shouldBe Nil
}
"not report non-violated min rate final objective if the objective is violated only in a period" in {
val periodDuration: Duration = Duration.ofSeconds(2)
val totalDuration: Duration = Duration.ofSeconds(3)
val minAllowedRatePerSecond = 2.0
val objective = RateObjective.MinRate(minAllowedRatePerSecond)
val metric = anEmptyStringMetric(finalObjectives = List(objective))
val violatedObjective =
metric
.onNext("abc")
// periodic rate is 3 / 2.0 = 1.5
.periodicValue(periodDuration)
._1
.onNext("def")
.onNext("ghi")
// total rate is (3 + 3 + 3) / 3.0 = 3.0
.violatedFinalObjectives(totalDuration)
violatedObjective shouldBe Nil
}
"report violated max rate final objective" in {
val periodDuration: Duration = Duration.ofSeconds(2)
val totalDuration: Duration = Duration.ofSeconds(3)
val objective = RateObjective.MaxRate(3.0)
val metric = CountRateMetric.empty[String](
countingFunction = stringLength,
periodicObjectives = Nil,
finalObjectives = List(objective),
)
val violatedObjective =
metric
.onNext("abc")
.periodicValue(periodDuration)
._1
.onNext("def")
.onNext("ghijkl")
// total rate is (3 + 3 + 6) / 3.0 = 4.0
.violatedFinalObjectives(totalDuration)
violatedObjective shouldBe List(
objective -> Value(4.0)
)
}
"not report non-violated max rate final objective" in {
val periodDuration: Duration = Duration.ofSeconds(2)
val totalDuration: Duration = Duration.ofSeconds(3)
val objective = RateObjective.MaxRate(3.0)
val metric = anEmptyStringMetric(finalObjectives = List(objective))
val violatedObjective =
metric
.onNext("abc")
.periodicValue(periodDuration)
._1
.onNext("def")
.onNext("ghi")
// total rate is (3 + 3 + 3) / 3.0 = 3.0
.violatedFinalObjectives(totalDuration)
violatedObjective shouldBe Nil
}
"not report non-violated max rate final objective if the objective is violated only in a period" in {
val periodDuration: Duration = Duration.ofSeconds(2)
val totalDuration: Duration = Duration.ofSeconds(4)
val objective = RateObjective.MaxRate(2.0)
val metric = anEmptyStringMetric(finalObjectives = List(objective))
val violatedObjective =
metric
.onNext("abcde")
// periodic rate is 5 / 2.0 = 2.5
.periodicValue(periodDuration)
._1
.onNext("f")
.onNext("gh")
// total rate is (5 + 1 + 2) / 4.0 = 2.0
.violatedFinalObjectives(totalDuration)
violatedObjective shouldBe Nil
}
}
private def stringLength(value: String): Int = value.length
private def anEmptyStringMetric(objective: Option[MinRate] = None): CountRateMetric[String] =
CountRateMetric.empty[String](countingFunction = stringLength, objective = objective)
private def anEmptyStringMetric(
periodicObjectives: List[CountRateMetric.RateObjective] = Nil,
finalObjectives: List[CountRateMetric.RateObjective] = Nil,
): CountRateMetric[String] =
CountRateMetric.empty[String](
countingFunction = stringLength,
periodicObjectives = periodicObjectives,
finalObjectives = finalObjectives,
)
}

View File

@ -4,7 +4,7 @@
package com.daml.ledger.api.benchtool
import com.daml.ledger.api.benchtool.metrics.DelayMetric
import com.daml.ledger.api.benchtool.metrics.objectives.MaxDelay
import com.daml.ledger.api.benchtool.metrics.DelayMetric._
import com.google.protobuf.timestamp.Timestamp
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
@ -22,8 +22,8 @@ class DelayMetricSpec extends AnyWordSpec with Matchers {
val totalDuration: Duration = Duration.ofSeconds(1)
val finalValue = metric.finalValue(totalDuration)
periodicValue shouldBe DelayMetric.Value(None)
finalValue shouldBe DelayMetric.Value(None)
periodicValue shouldBe Value(None)
finalValue shouldBe Value(None)
}
"compute values after processing elements" in {
@ -58,8 +58,8 @@ class DelayMetricSpec extends AnyWordSpec with Matchers {
val finalValue = newMetric.finalValue(totalDuration)
val expectedMean = (delay1 + delay2 + delay3) / 3
periodicValue shouldBe DelayMetric.Value(Some(expectedMean))
finalValue shouldBe DelayMetric.Value(None)
periodicValue shouldBe Value(Some(expectedMean))
finalValue shouldBe Value(None)
}
"correctly handle periods with no elements" in {
@ -92,8 +92,8 @@ class DelayMetricSpec extends AnyWordSpec with Matchers {
.periodicValue(periodDuration)
val finalValue = newMetric.finalValue(totalDuration)
periodicValue shouldBe DelayMetric.Value(None)
finalValue shouldBe DelayMetric.Value(None)
periodicValue shouldBe Value(None)
finalValue shouldBe Value(None)
}
"correctly handle multiple periods with elements" in {
@ -136,8 +136,8 @@ class DelayMetricSpec extends AnyWordSpec with Matchers {
val finalValue = newMetric.finalValue(totalDuration)
val expectedMean = (delay4 + delay5) / 2
periodicValue shouldBe DelayMetric.Value(Some(expectedMean))
finalValue shouldBe DelayMetric.Value(None)
periodicValue shouldBe Value(Some(expectedMean))
finalValue shouldBe Value(None)
}
"compute violated max delay SLO with the most extreme value" in {
@ -213,10 +213,10 @@ class DelayMetricSpec extends AnyWordSpec with Matchers {
.onNext(elem4)
.periodicValue(periodDuration)
._1
.violatedObjective
.violatedPeriodicObjectives
violatedObjectives shouldBe Some(
expectedViolatedObjective -> DelayMetric.Value(Some(maxDelay))
violatedObjectives shouldBe List(
expectedViolatedObjective -> Value(Some(maxDelay))
)
}
}

View File

@ -3,22 +3,21 @@
package com.daml.ledger.api.benchtool
import com.daml.ledger.api.benchtool.metrics.objectives.{MaxDelay, MinConsumptionSpeed}
import com.daml.ledger.api.benchtool.metrics.{ConsumptionSpeedMetric, DelayMetric}
import com.daml.ledger.api.benchtool.metrics.DelayMetric
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks
import org.scalatest.wordspec.AnyWordSpec
import scala.util.Random
class ServiceLevelObjectiveSpec extends AnyWordSpec with Matchers with TableDrivenPropertyChecks {
class MaxDelaySpec extends AnyWordSpec with Matchers with TableDrivenPropertyChecks {
"Maximum delay SLO" should {
"correctly report violation" in {
import DelayMetric.Value
val randomValue = Random.nextInt(10000).toLong
val randomSmaller = randomValue - 1
val randomLarger = randomValue + 1
val maxDelay = MaxDelay(randomValue)
val maxDelay = DelayMetric.MaxDelay(randomValue)
val cases = Table(
("Metric value", "Expected violated"),
(Value(None), false),
@ -50,25 +49,4 @@ class ServiceLevelObjectiveSpec extends AnyWordSpec with Matchers with TableDriv
}
}
}
"Min consumption speed SLO" should {
"correctly report violation" in {
import ConsumptionSpeedMetric.Value
val objectiveSpeed = Random.nextDouble()
val objective = MinConsumptionSpeed(objectiveSpeed)
val lowerSpeed = objectiveSpeed - 1.0
val higherSpeed = objectiveSpeed + 1.0
val cases = Table(
("Metric value", "Expected violated"),
(Value(None), true),
(Value(Some(lowerSpeed)), true),
(Value(Some(objectiveSpeed)), false),
(Value(Some(higherSpeed)), false),
)
forAll(cases) { (metricValue, expectedViolated) =>
objective.isViolatedBy(metricValue) shouldBe expectedViolated
}
}
}
}

View File

@ -5,8 +5,12 @@ package com.daml.ledger.api.benchtool
import akka.actor.testkit.typed.scaladsl.{BehaviorTestKit, ScalaTestWithActorTestKit}
import akka.actor.typed.{ActorRef, Behavior}
import com.daml.ledger.api.benchtool.metrics.objectives.ServiceLevelObjective
import com.daml.ledger.api.benchtool.metrics.{Metric, MetricValue, MetricsCollector}
import com.daml.ledger.api.benchtool.metrics.{
Metric,
MetricValue,
MetricsCollector,
ServiceLevelObjective,
}
import org.scalatest.wordspec.AnyWordSpecLike
import java.time.{Clock, Duration, Instant, ZoneId}
@ -82,7 +86,7 @@ class MetricsCollectorSpec extends ScalaTestWithActorTestKit with AnyWordSpecLik
Response.MetricFinalReportData(
name = "Test Metric",
value = TestMetricValue("FINAL:"),
violatedObjective = None,
violatedObjectives = Nil,
)
),
totalDuration = Duration.ofSeconds(10),
@ -108,7 +112,7 @@ class MetricsCollectorSpec extends ScalaTestWithActorTestKit with AnyWordSpecLik
Response.MetricFinalReportData(
name = "Test Metric",
value = TestMetricValue("FINAL:mango-banana-cherry"),
violatedObjective = None,
violatedObjectives = Nil,
)
),
totalDuration = Duration.ofSeconds(10),
@ -134,7 +138,7 @@ class MetricsCollectorSpec extends ScalaTestWithActorTestKit with AnyWordSpecLik
Response.MetricFinalReportData(
name = "Test Metric",
value = TestMetricValue("FINAL:mango-tomato-cherry"),
violatedObjective = Some(
violatedObjectives = List(
(
TestObjective,
TestMetricValue(TestObjective.TestViolatingValue),
@ -217,11 +221,15 @@ class MetricsCollectorSpec extends ScalaTestWithActorTestKit with AnyWordSpecLik
TestMetricValue(s"FINAL:${processedElems.mkString("-")}")
}
override def violatedObjective: Option[(TestObjective.type, TestMetricValue)] =
override def violatedPeriodicObjectives: List[(TestObjective.type, TestMetricValue)] =
if (processedElems.contains(TestObjective.TestViolatingValue))
Some(TestObjective -> TestMetricValue(TestObjective.TestViolatingValue))
List(TestObjective -> TestMetricValue(TestObjective.TestViolatingValue))
else
None
Nil
override def violatedFinalObjectives(
totalDuration: Duration
): List[(TestObjective.type, TestMetricValue)] = Nil
}

View File

@ -0,0 +1,34 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool
import com.daml.ledger.api.benchtool.metrics.ConsumptionSpeedMetric
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks
import org.scalatest.wordspec.AnyWordSpec
import scala.util.Random
class MinConsumptionSpeedSpec extends AnyWordSpec with Matchers with TableDrivenPropertyChecks {
"Min consumption speed SLO" should {
"correctly report violation" in {
import ConsumptionSpeedMetric.Value
val objectiveSpeed = Random.nextDouble()
val objective = ConsumptionSpeedMetric.MinConsumptionSpeed(objectiveSpeed)
val lowerSpeed = objectiveSpeed - 1.0
val higherSpeed = objectiveSpeed + 1.0
val cases = Table(
("Metric value", "Expected violated"),
(Value(None), true),
(Value(Some(lowerSpeed)), true),
(Value(Some(objectiveSpeed)), false),
(Value(Some(higherSpeed)), false),
)
forAll(cases) { (metricValue, expectedViolated) =>
objective.isViolatedBy(metricValue) shouldBe expectedViolated
}
}
}
}