ledger-api-bench-tool - Active contracts stream, completions stream [DPP-398, DPP-399] (#9857)

* ledger-api-bench-tool: Active contracts streams

CHANGELOG_BEGIN
- [Integration Kit] - ledger-api-bench-tool - reading active contract streams
CHANGELOG_END

* ledger-api-bench-tool: Completions stream (#9872)

CHANGELOG_BEGIN
- [Integration Kit] - ledger-api-bench-tool - Reading completions stream
CHANGELOG_END
This commit is contained in:
Kamil Bożek 2021-06-09 09:55:07 +02:00 committed by GitHub
parent fce8e54469
commit 4e49cf6814
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 551 additions and 268 deletions

View File

@ -3,10 +3,11 @@
package com.daml.ledger.api.benchtool
import com.daml.ledger.api.benchtool.Config.StreamConfig
import com.daml.ledger.api.tls.TlsConfigurationCli
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.v1.value.Identifier
import scopt.{OptionParser, Read}
import scopt.{OptionDef, OptionParser, Read}
import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success, Try}
@ -35,7 +36,7 @@ object Cli {
s"Stream configuration."
)
.valueName(
"stream-type=<transactions|transaction-trees>,name=<streamName>,party=<party>[,begin-offset=<offset>][,end-offset=<offset>][,template-ids=<id1>|<id2>][,max-delay=<seconds>]"
"<param1>=<value1>,<param2>=<value2>,..."
)
.action { case (streamConfig, config) =>
config.copy(streams = config.streams :+ streamConfig)
@ -66,6 +67,34 @@ object Cli {
help("help").text("Prints this information")
private def note(level: Int, param: String, desc: String = ""): OptionDef[Unit, Config] = {
val paddedParam = s"${" " * level * 2}$param"
val internalPadding = math.max(1, 40 - paddedParam.length)
note(s"$paddedParam${" " * internalPadding}$desc")
}
note(0, "")
note(0, "Stream configuration parameters:")
note(1, "Transactions/transaction trees:")
note(2, "stream-type=<transactions|transaction-trees>", "(required)")
note(2, "name=<stream-name>", "Stream name used to identify results (required)")
note(2, "party=<party>", "(required)")
note(2, "begin-offset=<offset>")
note(2, "end-offset=<offset>")
note(2, "template-ids=<id1>|<id2>")
note(2, "max-delay=<seconds>", "Max record time delay objective")
note(2, "min-consumption-speed=<speed>", "Min consumption speed objective")
note(1, "Active contract sets:")
note(2, "stream-type=active-contracts", "(required)")
note(2, "name=<stream-name>", "Stream name used to identify results (required)")
note(2, "party=<party>", "(required)")
note(2, "template-ids=<id1>|<id2>")
note(1, "Command completions:")
note(2, "stream-type=completions", "(required)")
note(2, "name=<stream-name>", "Stream name used to identify results (required)")
note(2, "party=<party>", "(required)")
note(2, "begin-offset=<offset>")
note(2, "template-ids=<id1>|<id2>")
}
def config(args: Array[String]): Option[Config] =
@ -99,14 +128,9 @@ object Cli {
def offset(stringValue: String): LedgerOffset =
LedgerOffset.defaultInstance.withAbsolute(stringValue)
val config = for {
def transactionsConfig: Either[String, StreamConfig.TransactionsStreamConfig] = for {
name <- stringField("name")
party <- stringField("party")
streamType <- stringField("stream-type").flatMap[String, Config.StreamConfig.StreamType] {
case "transactions" => Right(Config.StreamConfig.StreamType.Transactions)
case "transaction-trees" => Right(Config.StreamConfig.StreamType.TransactionTrees)
case invalid => Left(s"Invalid stream type: $invalid")
}
templateIds <- optionalStringField("template-ids").flatMap {
case Some(ids) => listOfTemplateIds(ids).map(Some(_))
case None => Right(None)
@ -115,9 +139,8 @@ object Cli {
endOffset <- optionalStringField("end-offset").map(_.map(offset))
maxDelaySeconds <- optionalLongField("max-delay")
minConsumptionSpeed <- optionalDoubleField("min-consumption-speed")
} yield Config.StreamConfig(
} yield Config.StreamConfig.TransactionsStreamConfig(
name = name,
streamType = streamType,
party = party,
templateIds = templateIds,
beginOffset = beginOffset,
@ -128,6 +151,63 @@ object Cli {
),
)
def transactionTreesConfig: Either[String, StreamConfig.TransactionTreesStreamConfig] =
for {
name <- stringField("name")
party <- stringField("party")
templateIds <- optionalStringField("template-ids").flatMap {
case Some(ids) => listOfTemplateIds(ids).map(Some(_))
case None => Right(None)
}
beginOffset <- optionalStringField("begin-offset").map(_.map(offset))
endOffset <- optionalStringField("end-offset").map(_.map(offset))
maxDelaySeconds <- optionalLongField("max-delay")
minConsumptionSpeed <- optionalDoubleField("min-consumption-speed")
} yield Config.StreamConfig.TransactionTreesStreamConfig(
name = name,
party = party,
templateIds = templateIds,
beginOffset = beginOffset,
endOffset = endOffset,
objectives = Config.StreamConfig.Objectives(
maxDelaySeconds = maxDelaySeconds,
minConsumptionSpeed = minConsumptionSpeed,
),
)
def activeContractsConfig: Either[String, StreamConfig.ActiveContractsStreamConfig] = for {
name <- stringField("name")
party <- stringField("party")
templateIds <- optionalStringField("template-ids").flatMap {
case Some(ids) => listOfTemplateIds(ids).map(Some(_))
case None => Right(None)
}
} yield Config.StreamConfig.ActiveContractsStreamConfig(
name = name,
party = party,
templateIds = templateIds,
)
def completionsConfig: Either[String, StreamConfig.CompletionsStreamConfig] = for {
name <- stringField("name")
party <- stringField("party")
applicationId <- stringField("application-id")
beginOffset <- optionalStringField("begin-offset").map(_.map(offset))
} yield Config.StreamConfig.CompletionsStreamConfig(
name = name,
party = party,
applicationId = applicationId,
beginOffset = beginOffset,
)
val config = stringField("stream-type").flatMap[String, Config.StreamConfig] {
case "transactions" => transactionsConfig
case "transaction-trees" => transactionTreesConfig
case "active-contracts" => activeContractsConfig
case "completions" => completionsConfig
case invalid => Left(s"Invalid stream type: $invalid")
}
config.fold(error => throw new IllegalArgumentException(error), identity)
}

View File

@ -18,22 +18,42 @@ case class Config(
)
object Config {
case class StreamConfig(
name: String,
streamType: Config.StreamConfig.StreamType,
party: String,
templateIds: Option[List[Identifier]],
beginOffset: Option[LedgerOffset],
endOffset: Option[LedgerOffset],
objectives: StreamConfig.Objectives,
)
trait StreamConfig {
def name: String
def party: String
}
object StreamConfig {
sealed trait StreamType
object StreamType {
case object Transactions extends StreamType
case object TransactionTrees extends StreamType
}
case class TransactionsStreamConfig(
name: String,
party: String,
templateIds: Option[List[Identifier]],
beginOffset: Option[LedgerOffset],
endOffset: Option[LedgerOffset],
objectives: StreamConfig.Objectives,
) extends StreamConfig
case class TransactionTreesStreamConfig(
name: String,
party: String,
templateIds: Option[List[Identifier]],
beginOffset: Option[LedgerOffset],
endOffset: Option[LedgerOffset],
objectives: StreamConfig.Objectives,
) extends StreamConfig
case class ActiveContractsStreamConfig(
name: String,
party: String,
templateIds: Option[List[Identifier]],
) extends StreamConfig
case class CompletionsStreamConfig(
name: String,
party: String,
applicationId: String,
beginOffset: Option[LedgerOffset],
) extends StreamConfig
case class Objectives(
maxDelaySeconds: Option[Long],

View File

@ -3,20 +3,15 @@
package com.daml.ledger.api.benchtool
import akka.actor.typed.{ActorSystem, SpawnProtocol}
import com.daml.ledger.api.benchtool.metrics.{
Creator,
MeteredStreamObserver,
MetricsManager,
TransactionMetrics,
import com.daml.ledger.api.benchtool.metrics.{MetricsCollector, MetricsSet, StreamMetrics}
import com.daml.ledger.api.benchtool.services.{
ActiveContractsService,
CommandCompletionService,
LedgerIdentityService,
TransactionService,
}
import com.daml.ledger.api.benchtool.services.{LedgerIdentityService, TransactionService}
import com.daml.ledger.api.benchtool.util.TypedActorSystemResourceOwner
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.ledger.api.v1.transaction_service.{
GetTransactionTreesResponse,
GetTransactionsResponse,
}
import com.daml.ledger.resources.{ResourceContext, ResourceOwner}
import io.grpc.Channel
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
@ -58,53 +53,65 @@ object LedgerApiBenchTool {
val resources = for {
executorService <- threadPoolExecutorOwner(config.concurrency)
channel <- channelOwner(config.ledger, config.tls, executorService)
system <- actorSystemResourceOwner()
system <- TypedActorSystemResourceOwner.owner()
} yield (channel, system)
resources.use { case (channel, system) =>
val ledgerIdentityService: LedgerIdentityService = new LedgerIdentityService(channel)
val ledgerId: String = ledgerIdentityService.fetchLedgerId()
val transactionService = new TransactionService(channel, ledgerId)
val activeContractsService = new ActiveContractsService(channel, ledgerId)
val commandCompletionService = new CommandCompletionService(channel, ledgerId)
Future
.traverse(config.streams) { streamConfig =>
streamConfig.streamType match {
case Config.StreamConfig.StreamType.Transactions =>
TransactionMetrics
.transactionsMetricsManager(
streamConfig.name,
config.reportingPeriod,
streamConfig.objectives,
)(system)
.flatMap { manager =>
val observer: MeteredStreamObserver[GetTransactionsResponse] =
new MeteredStreamObserver[GetTransactionsResponse](
streamConfig.name,
logger,
manager,
)(system)
transactionService.transactions(streamConfig, observer)
}
case Config.StreamConfig.StreamType.TransactionTrees =>
TransactionMetrics
.transactionTreesMetricsManager(
streamConfig.name,
config.reportingPeriod,
streamConfig.objectives,
)(system)
.flatMap { manager =>
val observer =
new MeteredStreamObserver[GetTransactionTreesResponse](
streamConfig.name,
logger,
manager,
)(system)
transactionService.transactionTrees(streamConfig, observer)
}
}
.traverse(config.streams) {
case streamConfig: Config.StreamConfig.TransactionsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.transactionMetrics(streamConfig.objectives),
logger = logger,
)(system, ec)
.flatMap { observer =>
transactionService.transactions(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.TransactionTreesStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.transactionTreesMetrics(streamConfig.objectives),
logger = logger,
)(system, ec)
.flatMap { observer =>
transactionService.transactionTrees(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.ActiveContractsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.activeContractsMetrics,
logger = logger,
)(system, ec)
.flatMap { observer =>
activeContractsService.getActiveContracts(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.CompletionsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.completionsMetrics,
logger = logger,
)(system, ec)
.flatMap { observer =>
commandCompletionService.completions(streamConfig, observer)
}
}
.transform {
case Success(results) =>
if (results.contains(MetricsManager.Message.MetricsResult.ObjectivesViolated))
if (results.contains(MetricsCollector.Message.MetricsResult.ObjectivesViolated))
Failure(new RuntimeException("Metrics objectives not met."))
else Success(())
case Failure(ex) =>
@ -143,11 +150,6 @@ object LedgerApiBenchTool {
ResourceOwner.forChannel(channelBuilder, ShutdownTimeout)
}
private def actorSystemResourceOwner(): ResourceOwner[ActorSystem[SpawnProtocol.Command]] =
new TypedActorSystemResourceOwner[SpawnProtocol.Command](() =>
ActorSystem(Creator(), "Creator")
)
private def threadPoolExecutorOwner(
config: Config.Concurrency
): ResourceOwner[ThreadPoolExecutor] =

View File

@ -3,7 +3,6 @@
package com.daml.ledger.api.benchtool.metrics
import akka.actor.typed.{ActorRef, ActorSystem}
import com.daml.ledger.api.benchtool.util.ObserverWithResult
import org.slf4j.Logger
@ -12,25 +11,17 @@ import scala.concurrent.Future
class MeteredStreamObserver[T](
val streamName: String,
logger: Logger,
metricsManager: ActorRef[MetricsManager.Message],
)(implicit system: ActorSystem[_])
extends ObserverWithResult[T, MetricsManager.Message.MetricsResult](logger) {
import MetricsManager.Message._
manager: MetricsManager[T],
) extends ObserverWithResult[T, MetricsCollector.Message.MetricsResult](logger) {
override def onNext(value: T): Unit = {
metricsManager ! NewValue(value)
manager.sendNewValue(value)
super.onNext(value)
}
override def completeWith(): Future[MetricsResult] = {
// TODO: abstract over the ask pattern (possibly a container with the actor and a method for asking)
import akka.actor.typed.scaladsl.AskPattern._
import akka.util.Timeout
import scala.concurrent.duration._
logger.debug(withStreamName(s"Sending $StreamCompleted notification."))
implicit val timeout: Timeout = 3.seconds
metricsManager.ask(StreamCompleted(_))
override def completeWith(): Future[MetricsCollector.Message.MetricsResult] = {
logger.debug(withStreamName(s"Asking for stream result..."))
manager.result()
}
}

View File

@ -0,0 +1,94 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.metrics
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
import akka.actor.typed.{ActorRef, Behavior}
import com.daml.ledger.api.benchtool.util.{MetricReporter, TimeUtil}
import java.time.Instant
import scala.concurrent.duration._
object MetricsCollector {
sealed trait Message
object Message {
sealed trait MetricsResult
object MetricsResult {
final case object Ok extends MetricsResult
final case object ObjectivesViolated extends MetricsResult
}
final case class NewValue[T](value: T) extends Message
final case class PeriodicUpdateCommand() extends Message
final case class StreamCompleted(replyTo: ActorRef[MetricsResult]) extends Message
}
def apply[T](
streamName: String,
metrics: List[Metric[T]],
logInterval: FiniteDuration,
reporter: MetricReporter,
): Behavior[Message] =
Behaviors.withTimers { timers =>
val startTime: Instant = Instant.now()
new MetricsCollector[T](timers, streamName, logInterval, reporter, startTime)
.handlingMessages(metrics, startTime)
}
}
class MetricsCollector[T](
timers: TimerScheduler[MetricsCollector.Message],
streamName: String,
logInterval: FiniteDuration,
reporter: MetricReporter,
startTime: Instant,
) {
import MetricsCollector._
import MetricsCollector.Message._
timers.startTimerWithFixedDelay(PeriodicUpdateCommand(), logInterval)
def handlingMessages(metrics: List[Metric[T]], lastPeriodicCheck: Instant): Behavior[Message] = {
Behaviors.receive { case (context, message) =>
message match {
case newValue: NewValue[T] =>
handlingMessages(metrics.map(_.onNext(newValue.value)), lastPeriodicCheck)
case _: PeriodicUpdateCommand =>
val currentTime = Instant.now()
val (newMetrics, values) = metrics
.map(_.periodicValue(TimeUtil.durationBetween(lastPeriodicCheck, currentTime)))
.unzip
context.log.info(namedMessage(reporter.formattedValues(values)))
handlingMessages(newMetrics, currentTime)
case message: StreamCompleted =>
context.log.info(
namedMessage(
reporter.finalReport(
streamName = streamName,
metrics = metrics,
duration = totalDuration,
)
)
)
message.replyTo ! result(metrics)
Behaviors.stopped
}
}
}
private def result(metrics: List[Metric[T]]): MetricsResult = {
val atLeastOneObjectiveViolated = metrics.exists(_.violatedObjective.nonEmpty)
if (atLeastOneObjectiveViolated) MetricsResult.ObjectivesViolated
else MetricsResult.Ok
}
private def namedMessage(message: String) = s"[$streamName] $message"
private def totalDuration: java.time.Duration =
TimeUtil.durationBetween(startTime, Instant.now())
}

View File

@ -3,100 +3,51 @@
package com.daml.ledger.api.benchtool.metrics
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
import akka.actor.typed.{ActorRef, Behavior, SpawnProtocol}
import com.daml.ledger.api.benchtool.util.{MetricReporter, TimeUtil}
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.{ActorRef, ActorSystem, Props, SpawnProtocol}
import akka.util.Timeout
import com.daml.ledger.api.benchtool.util.MetricReporter
import java.time.Instant
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
case class MetricsManager[T](collector: ActorRef[MetricsCollector.Message])(implicit
system: ActorSystem[SpawnProtocol.Command]
) {
def sendNewValue(value: T): Unit =
collector ! MetricsCollector.Message.NewValue(value)
def result[Result](): Future[MetricsCollector.Message.MetricsResult] = {
implicit val timeout: Timeout = Timeout(3.seconds)
collector.ask(MetricsCollector.Message.StreamCompleted)
}
}
object MetricsManager {
sealed trait Message
object Message {
sealed trait MetricsResult
object MetricsResult {
final case object Ok extends MetricsResult
final case object ObjectivesViolated extends MetricsResult
}
final case class NewValue[T](value: T) extends Message
final case class PeriodicUpdateCommand() extends Message
final case class StreamCompleted(replyTo: ActorRef[MetricsResult]) extends Message
}
def apply[T](
def apply[StreamElem](
streamName: String,
metrics: List[Metric[T]],
logInterval: FiniteDuration,
reporter: MetricReporter,
): Behavior[Message] =
Behaviors.withTimers { timers =>
val startTime: Instant = Instant.now()
new MetricsManager[T](timers, streamName, logInterval, reporter, startTime)
.handlingMessages(metrics, startTime)
}
metrics: List[Metric[StreamElem]],
)(implicit
system: ActorSystem[SpawnProtocol.Command],
ec: ExecutionContext,
): Future[MetricsManager[StreamElem]] = {
implicit val timeout: Timeout = Timeout(3.seconds)
}
val collectorActor: Future[ActorRef[MetricsCollector.Message]] = system.ask(
SpawnProtocol.Spawn(
behavior = MetricsCollector(
streamName = streamName,
metrics = metrics,
logInterval = logInterval,
reporter = MetricReporter.Default,
),
name = s"${streamName}-collector",
props = Props.empty,
_,
)
)
class MetricsManager[T](
timers: TimerScheduler[MetricsManager.Message],
streamName: String,
logInterval: FiniteDuration,
reporter: MetricReporter,
startTime: Instant,
) {
import MetricsManager._
import MetricsManager.Message._
timers.startTimerWithFixedDelay(PeriodicUpdateCommand(), logInterval)
def handlingMessages(metrics: List[Metric[T]], lastPeriodicCheck: Instant): Behavior[Message] = {
Behaviors.receive { case (context, message) =>
message match {
case newValue: NewValue[T] =>
handlingMessages(metrics.map(_.onNext(newValue.value)), lastPeriodicCheck)
case _: PeriodicUpdateCommand =>
val currentTime = Instant.now()
val (newMetrics, values) = metrics
.map(_.periodicValue(TimeUtil.durationBetween(lastPeriodicCheck, currentTime)))
.unzip
context.log.info(namedMessage(reporter.formattedValues(values)))
handlingMessages(newMetrics, currentTime)
case message: StreamCompleted =>
context.log.info(
namedMessage(
reporter.finalReport(
streamName = streamName,
metrics = metrics,
duration = totalDuration,
)
)
)
message.replyTo ! result(metrics)
Behaviors.stopped
}
}
collectorActor.map(MetricsManager[StreamElem](_))
}
private def result(metrics: List[Metric[T]]): MetricsResult = {
val atLeastOneObjectiveViolated = metrics.exists(_.violatedObjective.nonEmpty)
if (atLeastOneObjectiveViolated) MetricsResult.ObjectivesViolated
else MetricsResult.Ok
}
private def namedMessage(message: String) = s"[$streamName] $message"
private def totalDuration: java.time.Duration =
TimeUtil.durationBetween(startTime, Instant.now())
}
object Creator {
def apply(): Behavior[SpawnProtocol.Command] =
Behaviors.setup { context =>
context.log.debug(s"Starting Creator actor")
SpawnProtocol()
}
}

View File

@ -3,11 +3,10 @@
package com.daml.ledger.api.benchtool.metrics
import akka.actor.typed.{ActorRef, ActorSystem, Props, SpawnProtocol}
import akka.util.Timeout
import com.daml.ledger.api.benchtool.Config.StreamConfig.Objectives
import com.daml.ledger.api.benchtool.metrics.objectives.{MaxDelay, MinConsumptionSpeed}
import com.daml.ledger.api.benchtool.util.MetricReporter
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.api.v1.transaction_service.{
GetTransactionTreesResponse,
GetTransactionsResponse,
@ -15,58 +14,9 @@ import com.daml.ledger.api.v1.transaction_service.{
import com.google.protobuf.timestamp.Timestamp
import java.time.Clock
import scala.concurrent.Future
import scala.concurrent.duration._
object TransactionMetrics {
implicit val timeout: Timeout = Timeout(3.seconds)
import akka.actor.typed.scaladsl.AskPattern._
def transactionsMetricsManager(
streamName: String,
logInterval: FiniteDuration,
objectives: Objectives,
)(implicit
system: ActorSystem[SpawnProtocol.Command]
): Future[ActorRef[MetricsManager.Message]] = {
system.ask(
SpawnProtocol.Spawn(
behavior = MetricsManager(
streamName = streamName,
metrics = transactionMetrics(objectives),
logInterval = logInterval,
reporter = MetricReporter.Default,
),
name = s"${streamName}-manager",
props = Props.empty,
_,
)
)
}
def transactionTreesMetricsManager(
streamName: String,
logInterval: FiniteDuration,
objectives: Objectives,
)(implicit
system: ActorSystem[SpawnProtocol.Command]
): Future[ActorRef[MetricsManager.Message]] = {
system.ask(
SpawnProtocol.Spawn(
behavior = MetricsManager(
streamName = streamName,
metrics = transactionTreesMetrics(objectives),
logInterval = logInterval,
reporter = MetricReporter.Default,
),
name = s"${streamName}-manager",
props = Props.empty,
_,
)
)
}
private def transactionMetrics(objectives: Objectives): List[Metric[GetTransactionsResponse]] =
object MetricsSet {
def transactionMetrics(objectives: Objectives): List[Metric[GetTransactionsResponse]] =
all[GetTransactionsResponse](
countingFunction = _.transactions.length,
sizingFunction = _.serializedSize.toLong,
@ -76,7 +26,7 @@ object TransactionMetrics {
objectives = objectives,
)
private def transactionTreesMetrics(
def transactionTreesMetrics(
objectives: Objectives
): List[Metric[GetTransactionTreesResponse]] =
all[GetTransactionTreesResponse](
@ -88,6 +38,32 @@ object TransactionMetrics {
objectives = objectives,
)
def activeContractsMetrics: List[Metric[GetActiveContractsResponse]] =
List[Metric[GetActiveContractsResponse]](
CountRateMetric.empty[GetActiveContractsResponse](
countingFunction = _.activeContracts.length
),
TotalCountMetric.empty[GetActiveContractsResponse](
countingFunction = _.activeContracts.length
),
SizeMetric.empty[GetActiveContractsResponse](
sizingFunction = _.serializedSize.toLong
),
)
def completionsMetrics: List[Metric[CompletionStreamResponse]] =
List[Metric[CompletionStreamResponse]](
CountRateMetric.empty(
countingFunction = _.completions.length
),
TotalCountMetric.empty(
countingFunction = _.completions.length
),
SizeMetric.empty(
sizingFunction = _.serializedSize.toLong
),
)
private def all[T](
countingFunction: T => Int,
sizingFunction: T => Long,

View File

@ -0,0 +1,27 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.metrics
import akka.actor.typed.{ActorSystem, SpawnProtocol}
import org.slf4j.Logger
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
object StreamMetrics {
def observer[StreamElem](
streamName: String,
logInterval: FiniteDuration,
metrics: List[Metric[StreamElem]],
logger: Logger,
)(implicit
system: ActorSystem[SpawnProtocol.Command],
ec: ExecutionContext,
): Future[MeteredStreamObserver[StreamElem]] =
MetricsManager(streamName, logInterval, metrics).map { manager =>
new MeteredStreamObserver[StreamElem](streamName, logger, manager)
}
}

View File

@ -0,0 +1,58 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.services
import com.daml.ledger.api.benchtool.Config
import com.daml.ledger.api.benchtool.util.ObserverWithResult
import com.daml.ledger.api.v1.active_contracts_service._
import com.daml.ledger.api.v1.transaction_filter.{Filters, InclusiveFilters, TransactionFilter}
import io.grpc.Channel
import org.slf4j.LoggerFactory
import scala.concurrent.Future
final class ActiveContractsService(
channel: Channel,
ledgerId: String,
) {
private val logger = LoggerFactory.getLogger(getClass)
private val service: ActiveContractsServiceGrpc.ActiveContractsServiceStub =
ActiveContractsServiceGrpc.stub(channel)
def getActiveContracts[Result](
config: Config.StreamConfig.ActiveContractsStreamConfig,
observer: ObserverWithResult[GetActiveContractsResponse, Result],
): Future[Result] = {
service.getActiveContracts(getActiveContractsRequest(ledgerId, config), observer)
logger.info("Started fetching active contracts")
observer.result
}
private def getActiveContractsRequest(
ledgerId: String,
config: Config.StreamConfig.ActiveContractsStreamConfig,
) = {
val templatesFilter = config.templateIds match {
case Some(ids) =>
Filters.defaultInstance.withInclusive(
InclusiveFilters.defaultInstance.addAllTemplateIds(ids)
)
case None =>
Filters.defaultInstance
}
GetActiveContractsRequest.defaultInstance
.withLedgerId(ledgerId)
.withFilter(
TransactionFilter.defaultInstance
.withFiltersByParty(
Map(
config.party -> templatesFilter
)
)
)
}
}

View File

@ -0,0 +1,51 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.services
import com.daml.ledger.api.benchtool.Config
import com.daml.ledger.api.benchtool.util.ObserverWithResult
import com.daml.ledger.api.v1.command_completion_service.{
CommandCompletionServiceGrpc,
CompletionStreamRequest,
CompletionStreamResponse,
}
import io.grpc.Channel
import org.slf4j.LoggerFactory
import scala.concurrent.Future
class CommandCompletionService(
channel: Channel,
ledgerId: String,
) {
private val logger = LoggerFactory.getLogger(getClass)
private val service: CommandCompletionServiceGrpc.CommandCompletionServiceStub =
CommandCompletionServiceGrpc.stub(channel)
def completions[Result](
config: Config.StreamConfig.CompletionsStreamConfig,
observer: ObserverWithResult[CompletionStreamResponse, Result],
): Future[Result] = {
val request = completionsRequest(ledgerId, config)
service.completionStream(request, observer)
logger.info(s"Started fetching completions")
observer.result
}
private def completionsRequest(
ledgerId: String,
config: Config.StreamConfig.CompletionsStreamConfig,
): CompletionStreamRequest = {
val request = CompletionStreamRequest.defaultInstance
.withLedgerId(ledgerId)
.withParties(List(config.party))
.withApplicationId(config.applicationId)
config.beginOffset match {
case Some(offset) => request.withOffset(offset)
case None => request
}
}
}

View File

@ -28,23 +28,35 @@ final class TransactionService(
TransactionServiceGrpc.stub(channel)
def transactions[Result](
config: Config.StreamConfig,
config: Config.StreamConfig.TransactionsStreamConfig,
observer: ObserverWithResult[GetTransactionsResponse, Result],
): Future[Result] = {
val request = getTransactionsRequest(ledgerId, config)
val request = getTransactionsRequest(
ledgerId = ledgerId,
party = config.party,
templateIds = config.templateIds,
beginOffset = config.beginOffset,
endOffset = config.endOffset,
)
service.getTransactions(request, observer)
logger.info("Started fetching transactions")
observer.result
}
def transactionTrees[Result](
config: Config.StreamConfig,
config: Config.StreamConfig.TransactionTreesStreamConfig,
observer: ObserverWithResult[
GetTransactionTreesResponse,
Result,
],
): Future[Result] = {
val request = getTransactionsRequest(ledgerId, config)
val request = getTransactionsRequest(
ledgerId = ledgerId,
party = config.party,
templateIds = config.templateIds,
beginOffset = config.beginOffset,
endOffset = config.endOffset,
)
service.getTransactionTrees(request, observer)
logger.info("Started fetching transaction trees")
observer.result
@ -52,13 +64,16 @@ final class TransactionService(
private def getTransactionsRequest(
ledgerId: String,
config: Config.StreamConfig,
party: String,
templateIds: Option[List[Identifier]],
beginOffset: Option[LedgerOffset],
endOffset: Option[LedgerOffset],
): GetTransactionsRequest = {
GetTransactionsRequest.defaultInstance
.withLedgerId(ledgerId)
.withBegin(config.beginOffset.getOrElse(ledgerBeginOffset))
.withEnd(config.endOffset.getOrElse(ledgerEndOffset))
.withFilter(partyFilter(config.party, config.templateIds))
.withBegin(beginOffset.getOrElse(ledgerBeginOffset))
.withEnd(endOffset.getOrElse(ledgerEndOffset))
.withFilter(partyFilter(party, templateIds))
}
private def partyFilter(

View File

@ -3,8 +3,9 @@
package com.daml.ledger.api.benchtool.util
import akka.actor.typed.ActorSystem
import com.daml.ledger.resources.ResourceContext
import akka.actor.typed.{ActorSystem, Behavior, SpawnProtocol}
import akka.actor.typed.scaladsl.Behaviors
import com.daml.ledger.resources.{ResourceContext, ResourceOwner}
import com.daml.resources.{AbstractResourceOwner, ReleasableResource, Resource}
import scala.concurrent.Future
@ -17,3 +18,18 @@ class TypedActorSystemResourceOwner[BehaviorType](
): Resource[ResourceContext, ActorSystem[BehaviorType]] =
ReleasableResource(Future(acquireActorSystem()))(system => Future(system.terminate()))
}
object TypedActorSystemResourceOwner {
def owner(): ResourceOwner[ActorSystem[SpawnProtocol.Command]] =
new TypedActorSystemResourceOwner[SpawnProtocol.Command](() =>
ActorSystem(Creator(), "Creator")
)
object Creator {
def apply(): Behavior[SpawnProtocol.Command] =
Behaviors.setup { context =>
context.log.debug(s"Starting Creator actor")
SpawnProtocol()
}
}
}

View File

@ -11,9 +11,9 @@ import akka.actor.testkit.typed.scaladsl.{
TestProbe,
}
import akka.actor.typed.{ActorRef, Behavior}
import com.daml.ledger.api.benchtool.metrics.MetricsManager.Message
import com.daml.ledger.api.benchtool.metrics.MetricsCollector.Message
import com.daml.ledger.api.benchtool.metrics.objectives.ServiceLevelObjective
import com.daml.ledger.api.benchtool.metrics.{Metric, MetricValue, MetricsManager}
import com.daml.ledger.api.benchtool.metrics.{Metric, MetricValue, MetricsCollector}
import com.daml.ledger.api.benchtool.util.MetricReporter
import org.scalatest.wordspec.AnyWordSpecLike
@ -21,21 +21,23 @@ import java.time.Duration
import scala.concurrent.duration._
import scala.util.Random
class MetricsManagerSpec extends ScalaTestWithActorTestKit(ManualTime.config) with AnyWordSpecLike {
class MetricsCollectorSpec
extends ScalaTestWithActorTestKit(ManualTime.config)
with AnyWordSpecLike {
"The MetricsManager" should {
"The MetricsCollector" should {
val manualTime: ManualTime = ManualTime()
"log periodic report" in {
val logInterval = 100.millis
val manager = spawnManager(logInterval)
val collector = spawn(logInterval)
val first = "first"
val second = "second"
manager ! MetricsManager.Message.NewValue(first)
manager ! MetricsManager.Message.NewValue(second)
collector ! MetricsCollector.Message.NewValue(first)
collector ! MetricsCollector.Message.NewValue(second)
manualTime.timePasses(logInterval - 1.milli)
@ -48,51 +50,51 @@ class MetricsManagerSpec extends ScalaTestWithActorTestKit(ManualTime.config) wi
}
"respond with metrics result on StreamCompleted message" in {
val manager = spawnManager()
val collector = spawn()
val probe = aTestProbe()
manager ! MetricsManager.Message.StreamCompleted(probe.ref)
probe.expectMessage(MetricsManager.Message.MetricsResult.Ok)
collector ! MetricsCollector.Message.StreamCompleted(probe.ref)
probe.expectMessage(MetricsCollector.Message.MetricsResult.Ok)
}
"respond with information about violated objectives" in {
val manager = spawnManager()
val collector = spawn()
val probe = aTestProbe()
manager ! MetricsManager.Message.NewValue("a value")
manager ! MetricsManager.Message.NewValue(TestObjective.TestViolatingValue)
manager ! MetricsManager.Message.StreamCompleted(probe.ref)
collector ! MetricsCollector.Message.NewValue("a value")
collector ! MetricsCollector.Message.NewValue(TestObjective.TestViolatingValue)
collector ! MetricsCollector.Message.StreamCompleted(probe.ref)
probe.expectMessage(MetricsManager.Message.MetricsResult.ObjectivesViolated)
probe.expectMessage(MetricsCollector.Message.MetricsResult.ObjectivesViolated)
}
"stop when the stream completes" in {
val probe = aTestProbe()
val behaviorTestKit = BehaviorTestKit(managerBehavior())
val behaviorTestKit = BehaviorTestKit(behavior())
behaviorTestKit.isAlive shouldBe true
behaviorTestKit.run(MetricsManager.Message.StreamCompleted(probe.ref))
behaviorTestKit.run(MetricsCollector.Message.StreamCompleted(probe.ref))
behaviorTestKit.isAlive shouldBe false
}
}
private def aTestProbe(): TestProbe[Message.MetricsResult] =
testKit.createTestProbe[MetricsManager.Message.MetricsResult]()
testKit.createTestProbe[MetricsCollector.Message.MetricsResult]()
private def spawnManager(
private def spawn(
logInterval: FiniteDuration = 100.millis
): ActorRef[MetricsManager.Message] =
): ActorRef[MetricsCollector.Message] =
testKit.spawn(
behavior = managerBehavior(logInterval),
behavior = behavior(logInterval),
name = Random.alphanumeric.take(10).mkString,
)
private def managerBehavior(
private def behavior(
logInterval: FiniteDuration = 100.millis
): Behavior[MetricsManager.Message] =
MetricsManager[String](
): Behavior[MetricsCollector.Message] =
MetricsCollector[String](
streamName = "testStream",
metrics = List(TestMetric()),
logInterval = logInterval,