mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
Benchtool: Separate command generation and submission. Also other clean-ups [DPP-978] (#13833)
This is setting ground for generating contracts with non-empty non-stakeholder informees in #13808 changelog_begin changelog_end
This commit is contained in:
parent
ad123f9b22
commit
751e0f78a6
@ -4,7 +4,6 @@
|
||||
package com.daml.ledger.api.benchtool
|
||||
|
||||
import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig
|
||||
import com.daml.ledger.api.benchtool.submission.CommandSubmitter
|
||||
import com.daml.ledger.api.v1.value.Identifier
|
||||
import com.daml.ledger.test.model.Foo.{Foo1, Foo2, Foo3}
|
||||
import scalaz.syntax.tag._
|
||||
@ -13,25 +12,25 @@ object ConfigEnricher {
|
||||
|
||||
def enrichStreamConfig(
|
||||
streamConfig: StreamConfig,
|
||||
submissionSummary: Option[CommandSubmitter.SubmissionSummary],
|
||||
submissionResult: Option[SubmissionStepResult],
|
||||
): StreamConfig = {
|
||||
streamConfig match {
|
||||
case config: StreamConfig.TransactionsStreamConfig =>
|
||||
config.copy(filters = enrichFilters(config.filters, submissionSummary))
|
||||
config.copy(filters = enrichFilters(config.filters, submissionResult))
|
||||
case config: StreamConfig.TransactionTreesStreamConfig =>
|
||||
config.copy(filters = enrichFilters(config.filters, submissionSummary))
|
||||
config.copy(filters = enrichFilters(config.filters, submissionResult))
|
||||
case config: StreamConfig.ActiveContractsStreamConfig =>
|
||||
config.copy(filters = enrichFilters(config.filters, submissionSummary))
|
||||
config.copy(filters = enrichFilters(config.filters, submissionResult))
|
||||
case config: StreamConfig.CompletionsStreamConfig =>
|
||||
config.copy(party = convertParty(config.party, submissionSummary))
|
||||
config.copy(party = convertParty(config.party, submissionResult))
|
||||
}
|
||||
}
|
||||
|
||||
private def convertParty(
|
||||
party: String,
|
||||
submissionSummary: Option[CommandSubmitter.SubmissionSummary],
|
||||
submissionResult: Option[SubmissionStepResult],
|
||||
): String =
|
||||
submissionSummary match {
|
||||
submissionResult match {
|
||||
case None => party
|
||||
case Some(summary) =>
|
||||
summary.observers
|
||||
@ -42,7 +41,7 @@ object ConfigEnricher {
|
||||
|
||||
private def enrichFilters(
|
||||
filters: List[StreamConfig.PartyFilter],
|
||||
submissionSummary: Option[CommandSubmitter.SubmissionSummary],
|
||||
submissionResult: Option[SubmissionStepResult],
|
||||
): List[StreamConfig.PartyFilter] = {
|
||||
def identifierToFullyQualifiedString(id: Identifier) =
|
||||
s"${id.packageId}:${id.moduleName}:${id.entityName}"
|
||||
@ -56,7 +55,7 @@ object ConfigEnricher {
|
||||
|
||||
filters.map { filter =>
|
||||
StreamConfig.PartyFilter(
|
||||
party = convertParty(filter.party, submissionSummary),
|
||||
party = convertParty(filter.party, submissionResult),
|
||||
templates = filter.templates.map(fullyQualifiedTemplateId),
|
||||
)
|
||||
}
|
||||
|
@ -3,8 +3,14 @@
|
||||
|
||||
package com.daml.ledger.api.benchtool
|
||||
|
||||
import java.util.concurrent._
|
||||
|
||||
import akka.actor.typed.{ActorSystem, SpawnProtocol}
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.api.benchtool.config.WorkflowConfig.{
|
||||
FibonacciSubmissionConfig,
|
||||
FooSubmissionConfig,
|
||||
}
|
||||
import com.daml.ledger.api.benchtool.config.{Config, ConfigMaker, WorkflowConfig}
|
||||
import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager
|
||||
import com.daml.ledger.api.benchtool.metrics.{
|
||||
@ -14,7 +20,7 @@ import com.daml.ledger.api.benchtool.metrics.{
|
||||
MetricsManager,
|
||||
}
|
||||
import com.daml.ledger.api.benchtool.services.LedgerApiServices
|
||||
import com.daml.ledger.api.benchtool.submission.{CommandSubmitter, Names}
|
||||
import com.daml.ledger.api.benchtool.submission._
|
||||
import com.daml.ledger.api.benchtool.util.TypedActorSystemResourceOwner
|
||||
import com.daml.ledger.api.tls.TlsConfiguration
|
||||
import com.daml.ledger.client
|
||||
@ -25,7 +31,6 @@ import io.grpc.Channel
|
||||
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import java.util.concurrent._
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{Await, ExecutionContext, Future}
|
||||
import scala.util.control.NonFatal
|
||||
@ -36,6 +41,11 @@ import scala.util.control.NonFatal
|
||||
* Uses "benchtool" ([[Names.benchtoolApplicationId]]) applicationId for both steps.
|
||||
*/
|
||||
object LedgerApiBenchTool {
|
||||
private val printer = pprint.PPrinter.BlackWhite
|
||||
|
||||
private[benchtool] val logger: Logger = LoggerFactory.getLogger(getClass)
|
||||
private[benchtool] def prettyPrint(x: Any): String = printer(x).toString()
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
ConfigMaker.make(args) match {
|
||||
@ -43,7 +53,8 @@ object LedgerApiBenchTool {
|
||||
logger.error(s"Configuration error: ${error.details}")
|
||||
case Right(config) =>
|
||||
logger.info(s"Starting benchmark with configuration:\n${prettyPrint(config)}")
|
||||
val result = run(config)(ExecutionContext.Implicits.global)
|
||||
val result = LedgerApiBenchTool(config)
|
||||
.run()(ExecutionContext.Implicits.global)
|
||||
.map {
|
||||
case Right(()) =>
|
||||
logger.info(s"Benchmark finished successfully.")
|
||||
@ -59,139 +70,39 @@ object LedgerApiBenchTool {
|
||||
}
|
||||
}
|
||||
|
||||
private def run(config: Config)(implicit ec: ExecutionContext): Future[Either[String, Unit]] = {
|
||||
def apply(config: Config): LedgerApiBenchTool = {
|
||||
new LedgerApiBenchTool(
|
||||
names = new Names,
|
||||
authorizationHelper = config.authorizationTokenSecret.map(new AuthorizationHelper(_)),
|
||||
config = config,
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
case class SubmissionStepResult(
|
||||
signatory: Primitive.Party,
|
||||
observers: List[Primitive.Party],
|
||||
)
|
||||
|
||||
class LedgerApiBenchTool(
|
||||
names: Names,
|
||||
authorizationHelper: Option[AuthorizationHelper],
|
||||
config: Config,
|
||||
) {
|
||||
|
||||
import LedgerApiBenchTool.{logger, prettyPrint}
|
||||
|
||||
def run()(implicit ec: ExecutionContext): Future[Either[String, Unit]] = {
|
||||
implicit val resourceContext: ResourceContext = ResourceContext(ec)
|
||||
|
||||
val names = new Names
|
||||
val authorizationHelper = config.authorizationTokenSecret.map(new AuthorizationHelper(_))
|
||||
|
||||
def regularUserSetupStep(adminServices: LedgerApiServices): Future[Unit] =
|
||||
(config.authorizationTokenSecret, config.workflow.submission) match {
|
||||
case (Some(_), Some(submissionConfig)) =>
|
||||
// We only need to setup the user when the UserManagementService is used and we're going to submit transactions
|
||||
// The submission config is necessary to establish a set of rights that will be granted to the user.
|
||||
logger.info(
|
||||
s"Setting up the regular '${names.benchtoolUserId}' user prior to the submission phase."
|
||||
)
|
||||
adminServices.userManagementService.createUserOrGrantRightsToExisting(
|
||||
userId = names.benchtoolUserId,
|
||||
observerPartyNames = names.observerPartyNames(
|
||||
submissionConfig.numberOfObservers,
|
||||
submissionConfig.uniqueParties,
|
||||
),
|
||||
signatoryPartyName = names.signatoryPartyName,
|
||||
)
|
||||
case _ =>
|
||||
Future.successful(
|
||||
logger.info(
|
||||
s"The '${names.benchtoolUserId}' user is going to be used for authentication."
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
def benchmarkStreams(
|
||||
regularUserServices: LedgerApiServices,
|
||||
streamConfigs: List[WorkflowConfig.StreamConfig],
|
||||
metricRegistry: MetricRegistry,
|
||||
actorSystem: ActorSystem[SpawnProtocol.Command],
|
||||
): Future[Either[String, Unit]] =
|
||||
if (streamConfigs.isEmpty) {
|
||||
logger.info(s"No streams defined. Skipping the benchmark step.")
|
||||
Future.successful(Right(()))
|
||||
} else
|
||||
Benchmark
|
||||
.run(
|
||||
streamConfigs = streamConfigs,
|
||||
reportingPeriod = config.reportingPeriod,
|
||||
apiServices = regularUserServices,
|
||||
metricRegistry = metricRegistry,
|
||||
system = actorSystem,
|
||||
)
|
||||
|
||||
def benchmarkLatency(
|
||||
regularUserServices: LedgerApiServices,
|
||||
adminServices: LedgerApiServices,
|
||||
submissionConfigO: Option[WorkflowConfig.SubmissionConfig],
|
||||
metricRegistry: MetricRegistry,
|
||||
signatory: client.binding.Primitive.Party,
|
||||
actorSystem: ActorSystem[SpawnProtocol.Command],
|
||||
maxLatencyObjectiveMillis: Long,
|
||||
): Future[Either[String, Unit]] =
|
||||
submissionConfigO match {
|
||||
case Some(submissionConfig) =>
|
||||
for {
|
||||
metricsManager <- MetricsManager(
|
||||
observedMetric = "submit-and-wait-latency",
|
||||
logInterval = config.reportingPeriod,
|
||||
metrics = List(LatencyMetric.empty(maxLatencyObjectiveMillis)),
|
||||
exposedMetrics = None,
|
||||
)(actorSystem, ec)
|
||||
submitter = CommandSubmitter(
|
||||
names = names,
|
||||
benchtoolUserServices = regularUserServices,
|
||||
adminServices = adminServices,
|
||||
metricRegistry = metricRegistry,
|
||||
metricsManager = metricsManager,
|
||||
)
|
||||
result <- submitter
|
||||
.submit(
|
||||
config = submissionConfig,
|
||||
signatory = signatory,
|
||||
observers = Nil,
|
||||
maxInFlightCommands = config.maxInFlightCommands,
|
||||
submissionBatchSize = config.submissionBatchSize,
|
||||
)
|
||||
.flatMap(_ => metricsManager.result())
|
||||
.map {
|
||||
case BenchmarkResult.ObjectivesViolated =>
|
||||
Left("Metrics objectives not met.")
|
||||
case BenchmarkResult.Ok =>
|
||||
Right(())
|
||||
}
|
||||
.recoverWith { case NonFatal(e) =>
|
||||
Future.successful(Left(e.getMessage))
|
||||
}
|
||||
} yield result
|
||||
case None =>
|
||||
Future.failed(
|
||||
new RuntimeException("Submission config cannot be empty for latency benchmarking")
|
||||
)
|
||||
}
|
||||
|
||||
def submissionStep(
|
||||
regularUserServices: LedgerApiServices,
|
||||
adminServices: LedgerApiServices,
|
||||
submissionConfig: Option[WorkflowConfig.SubmissionConfig],
|
||||
metricRegistry: MetricRegistry,
|
||||
)(implicit ec: ExecutionContext): Future[
|
||||
Option[(Primitive.Party, CommandSubmitter.SubmissionSummary)]
|
||||
] =
|
||||
submissionConfig match {
|
||||
case None =>
|
||||
logger.info(s"No submission defined. Skipping.")
|
||||
Future.successful(None)
|
||||
case Some(submissionConfig) =>
|
||||
val submitter = CommandSubmitter(
|
||||
names = names,
|
||||
benchtoolUserServices = regularUserServices,
|
||||
adminServices = adminServices,
|
||||
metricRegistry = metricRegistry,
|
||||
metricsManager = NoOpMetricsManager(),
|
||||
)
|
||||
for {
|
||||
(signatory, observers) <- submitter.prepare(submissionConfig)
|
||||
result <- submitter
|
||||
.submit(
|
||||
config = submissionConfig,
|
||||
signatory = signatory,
|
||||
observers = observers,
|
||||
maxInFlightCommands = config.maxInFlightCommands,
|
||||
submissionBatchSize = config.submissionBatchSize,
|
||||
)
|
||||
} yield Some((signatory, result))
|
||||
}
|
||||
|
||||
val resources = for {
|
||||
val resources: ResourceOwner[
|
||||
(
|
||||
String => LedgerApiServices,
|
||||
ActorSystem[SpawnProtocol.Command],
|
||||
MetricRegistry,
|
||||
)
|
||||
] = for {
|
||||
servicesForUserId <- apiServicesOwner(config, authorizationHelper)
|
||||
system <- TypedActorSystemResourceOwner.owner()
|
||||
metricRegistry <- new MetricRegistryOwner(
|
||||
@ -207,39 +118,214 @@ object LedgerApiBenchTool {
|
||||
|
||||
for {
|
||||
_ <- regularUserSetupStep(adminServices)
|
||||
submissionStepProducts <- submissionStep(
|
||||
regularUserServices = regularUserServices,
|
||||
adminServices = adminServices,
|
||||
submissionConfig = config.workflow.submission,
|
||||
metricRegistry = metricRegistry,
|
||||
)
|
||||
streams = config.workflow.streams.map(
|
||||
ConfigEnricher.enrichStreamConfig(_, submissionStepProducts.map(_._2))
|
||||
submissionStepResultO: Option[SubmissionStepResult] <- {
|
||||
config.workflow.submission match {
|
||||
case None =>
|
||||
logger.info(s"No submission defined. Skipping.")
|
||||
Future.successful(None)
|
||||
case Some(submissionConfig) =>
|
||||
submissionStep(
|
||||
regularUserServices = regularUserServices,
|
||||
adminServices = adminServices,
|
||||
submissionConfig = submissionConfig,
|
||||
metricRegistry = metricRegistry,
|
||||
).map(Some(_))
|
||||
}
|
||||
}
|
||||
|
||||
updatedStreamConfigs = config.workflow.streams.map(streamsConfig =>
|
||||
ConfigEnricher.enrichStreamConfig(streamsConfig, submissionStepResultO)
|
||||
)
|
||||
|
||||
_ = logger.info(
|
||||
s"Stream configs adapted after the submission step: ${prettyPrint(streams)}"
|
||||
s"Stream configs adapted after the submission step: ${prettyPrint(updatedStreamConfigs)}"
|
||||
)
|
||||
benchmarkResult <-
|
||||
if (config.latencyTest) {
|
||||
val signatory = submissionStepProducts
|
||||
.map(_._1)
|
||||
val signatory = submissionStepResultO
|
||||
.map(_.signatory)
|
||||
.getOrElse(
|
||||
throw new RuntimeException("Signatory cannot be empty for latency benchmark")
|
||||
)
|
||||
benchmarkLatency(
|
||||
regularUserServices,
|
||||
adminServices,
|
||||
config.workflow.submission,
|
||||
metricRegistry,
|
||||
signatory,
|
||||
actorSystem,
|
||||
config.maxLatencyObjectiveMillis,
|
||||
regularUserServices = regularUserServices,
|
||||
adminServices = adminServices,
|
||||
submissionConfigO = config.workflow.submission,
|
||||
metricRegistry = metricRegistry,
|
||||
signatory = signatory,
|
||||
actorSystem = actorSystem,
|
||||
maxLatencyObjectiveMillis = config.maxLatencyObjectiveMillis,
|
||||
)
|
||||
} else benchmarkStreams(regularUserServices, streams, metricRegistry, actorSystem)
|
||||
} else {
|
||||
benchmarkStreams(
|
||||
regularUserServices = regularUserServices,
|
||||
streamConfigs = updatedStreamConfigs,
|
||||
metricRegistry = metricRegistry,
|
||||
actorSystem = actorSystem,
|
||||
)
|
||||
}
|
||||
} yield benchmarkResult
|
||||
}
|
||||
}
|
||||
|
||||
private def regularUserSetupStep(
|
||||
adminServices: LedgerApiServices
|
||||
)(implicit ec: ExecutionContext): Future[Unit] =
|
||||
(config.authorizationTokenSecret, config.workflow.submission) match {
|
||||
case (Some(_), Some(submissionConfig)) =>
|
||||
// We only need to setup the user when the UserManagementService is used and we're going to submit transactions
|
||||
// The submission config is necessary to establish a set of rights that will be granted to the user.
|
||||
logger.info(
|
||||
s"Setting up the regular '${names.benchtoolUserId}' user prior to the submission phase."
|
||||
)
|
||||
adminServices.userManagementService.createUserOrGrantRightsToExisting(
|
||||
userId = names.benchtoolUserId,
|
||||
observerPartyNames = names.observerPartyNames(
|
||||
submissionConfig.numberOfObservers,
|
||||
submissionConfig.uniqueParties,
|
||||
),
|
||||
signatoryPartyName = names.signatoryPartyName,
|
||||
)
|
||||
case _ =>
|
||||
Future.successful(
|
||||
logger.info(
|
||||
s"The '${names.benchtoolUserId}' user is going to be used for authentication."
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
private def benchmarkStreams(
|
||||
regularUserServices: LedgerApiServices,
|
||||
streamConfigs: List[WorkflowConfig.StreamConfig],
|
||||
metricRegistry: MetricRegistry,
|
||||
actorSystem: ActorSystem[SpawnProtocol.Command],
|
||||
)(implicit ec: ExecutionContext): Future[Either[String, Unit]] =
|
||||
if (streamConfigs.isEmpty) {
|
||||
logger.info(s"No streams defined. Skipping the benchmark step.")
|
||||
Future.successful(Right(()))
|
||||
} else
|
||||
Benchmark
|
||||
.run(
|
||||
streamConfigs = streamConfigs,
|
||||
reportingPeriod = config.reportingPeriod,
|
||||
apiServices = regularUserServices,
|
||||
metricRegistry = metricRegistry,
|
||||
system = actorSystem,
|
||||
)
|
||||
|
||||
private def benchmarkLatency(
|
||||
regularUserServices: LedgerApiServices,
|
||||
adminServices: LedgerApiServices,
|
||||
submissionConfigO: Option[WorkflowConfig.SubmissionConfig],
|
||||
metricRegistry: MetricRegistry,
|
||||
signatory: client.binding.Primitive.Party,
|
||||
actorSystem: ActorSystem[SpawnProtocol.Command],
|
||||
maxLatencyObjectiveMillis: Long,
|
||||
)(implicit ec: ExecutionContext): Future[Either[String, Unit]] =
|
||||
submissionConfigO match {
|
||||
case Some(submissionConfig: FooSubmissionConfig) =>
|
||||
val generator: CommandGenerator = new FooCommandGenerator(
|
||||
randomnessProvider = RandomnessProvider.Default,
|
||||
signatory = signatory,
|
||||
config = submissionConfig,
|
||||
allObservers = List.empty,
|
||||
)
|
||||
for {
|
||||
metricsManager <- MetricsManager(
|
||||
observedMetric = "submit-and-wait-latency",
|
||||
logInterval = config.reportingPeriod,
|
||||
metrics = List(LatencyMetric.empty(maxLatencyObjectiveMillis)),
|
||||
exposedMetrics = None,
|
||||
)(actorSystem, ec)
|
||||
submitter = CommandSubmitter(
|
||||
names = names,
|
||||
benchtoolUserServices = regularUserServices,
|
||||
adminServices = adminServices,
|
||||
metricRegistry = metricRegistry,
|
||||
metricsManager = metricsManager,
|
||||
)
|
||||
result <- submitter
|
||||
.generateAndSubmit(
|
||||
generator = generator,
|
||||
config = submissionConfig,
|
||||
signatory = signatory,
|
||||
maxInFlightCommands = config.maxInFlightCommands,
|
||||
submissionBatchSize = config.submissionBatchSize,
|
||||
)
|
||||
.flatMap(_ => metricsManager.result())
|
||||
.map {
|
||||
case BenchmarkResult.ObjectivesViolated =>
|
||||
Left("Metrics objectives not met.")
|
||||
case BenchmarkResult.Ok =>
|
||||
Right(())
|
||||
}
|
||||
.recoverWith { case NonFatal(e) =>
|
||||
Future.successful(Left(e.getMessage))
|
||||
}
|
||||
} yield result
|
||||
case Some(other) =>
|
||||
Future.failed(
|
||||
new RuntimeException(s"Unsupported submission config for latency benchmarking: $other")
|
||||
)
|
||||
case None =>
|
||||
Future.failed(
|
||||
new RuntimeException("Submission config cannot be empty for latency benchmarking")
|
||||
)
|
||||
}
|
||||
|
||||
def submissionStep(
|
||||
regularUserServices: LedgerApiServices,
|
||||
adminServices: LedgerApiServices,
|
||||
submissionConfig: WorkflowConfig.SubmissionConfig,
|
||||
metricRegistry: MetricRegistry,
|
||||
)(implicit
|
||||
ec: ExecutionContext
|
||||
): Future[SubmissionStepResult] = {
|
||||
|
||||
val submitter = CommandSubmitter(
|
||||
names = names,
|
||||
benchtoolUserServices = regularUserServices,
|
||||
adminServices = adminServices,
|
||||
metricRegistry = metricRegistry,
|
||||
metricsManager = NoOpMetricsManager(),
|
||||
)
|
||||
for {
|
||||
(signatory, allObservers) <- submitter.prepare(
|
||||
submissionConfig
|
||||
)
|
||||
_ <-
|
||||
submissionConfig match {
|
||||
case submissionConfig: FooSubmissionConfig =>
|
||||
new FooSubmission(
|
||||
submitter = submitter,
|
||||
maxInFlightCommands = config.maxInFlightCommands,
|
||||
submissionBatchSize = config.submissionBatchSize,
|
||||
submissionConfig = submissionConfig,
|
||||
signatory = signatory,
|
||||
allObservers = allObservers,
|
||||
).performSubmission()
|
||||
case submissionConfig: FibonacciSubmissionConfig =>
|
||||
val generator: CommandGenerator = new FibonacciCommandGenerator(
|
||||
signatory = signatory,
|
||||
config = submissionConfig,
|
||||
)
|
||||
for {
|
||||
_ <- submitter
|
||||
.generateAndSubmit(
|
||||
generator = generator,
|
||||
config = submissionConfig,
|
||||
signatory = signatory,
|
||||
maxInFlightCommands = config.maxInFlightCommands,
|
||||
submissionBatchSize = config.submissionBatchSize,
|
||||
)
|
||||
} yield ()
|
||||
}
|
||||
} yield SubmissionStepResult(
|
||||
signatory = signatory,
|
||||
observers = allObservers,
|
||||
)
|
||||
}
|
||||
|
||||
private def apiServicesOwner(
|
||||
config: Config,
|
||||
authorizationHelper: Option[AuthorizationHelper],
|
||||
@ -298,8 +384,4 @@ object LedgerApiBenchTool {
|
||||
else new ArrayBlockingQueue[Runnable](config.maxQueueLength),
|
||||
)
|
||||
)
|
||||
|
||||
private val logger: Logger = LoggerFactory.getLogger(getClass)
|
||||
private val printer = pprint.PPrinter.BlackWhite
|
||||
private def prettyPrint(x: Any): String = printer(x).toString()
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ object WorkflowConfig {
|
||||
uniqueParties: Boolean,
|
||||
value: Int,
|
||||
) extends SubmissionConfig {
|
||||
def numberOfObservers = 0
|
||||
override val numberOfObservers = 0
|
||||
}
|
||||
|
||||
final case class FooSubmissionConfig(
|
||||
|
@ -3,30 +3,10 @@
|
||||
|
||||
package com.daml.ledger.api.benchtool.submission
|
||||
|
||||
import com.daml.ledger.api.benchtool.config.WorkflowConfig.{
|
||||
FibonacciSubmissionConfig,
|
||||
FooSubmissionConfig,
|
||||
SubmissionConfig,
|
||||
}
|
||||
import com.daml.ledger.api.v1.commands.Command
|
||||
import com.daml.ledger.client.binding.Primitive
|
||||
|
||||
import scala.util.Try
|
||||
|
||||
trait CommandGenerator {
|
||||
def next(): Try[Seq[Command]]
|
||||
}
|
||||
|
||||
object CommandGenerator {
|
||||
def apply(
|
||||
randomnessProvider: RandomnessProvider,
|
||||
config: SubmissionConfig,
|
||||
signatory: Primitive.Party,
|
||||
observers: List[Primitive.Party],
|
||||
): CommandGenerator = config match {
|
||||
case c: FooSubmissionConfig =>
|
||||
new FooCommandGenerator(randomnessProvider, c, signatory, observers)
|
||||
case c: FibonacciSubmissionConfig =>
|
||||
new FibonacciCommandGenerator(c, signatory)
|
||||
}
|
||||
}
|
||||
|
@ -36,7 +36,12 @@ case class CommandSubmitter(
|
||||
|
||||
def prepare(config: SubmissionConfig)(implicit
|
||||
ec: ExecutionContext
|
||||
): Future[(client.binding.Primitive.Party, List[client.binding.Primitive.Party])] = {
|
||||
): Future[
|
||||
(
|
||||
client.binding.Primitive.Party,
|
||||
List[client.binding.Primitive.Party],
|
||||
)
|
||||
] = {
|
||||
val observerPartyNames =
|
||||
names.observerPartyNames(config.numberOfObservers, config.uniqueParties)
|
||||
|
||||
@ -48,7 +53,7 @@ case class CommandSubmitter(
|
||||
_ <- uploadTestDars()
|
||||
} yield {
|
||||
logger.info("Prepared command submission.")
|
||||
signatory -> observers
|
||||
(signatory, observers)
|
||||
})
|
||||
.recoverWith { case NonFatal(ex) =>
|
||||
logger.error(
|
||||
@ -59,26 +64,25 @@ case class CommandSubmitter(
|
||||
}
|
||||
}
|
||||
|
||||
def submit(
|
||||
def generateAndSubmit(
|
||||
generator: CommandGenerator,
|
||||
config: SubmissionConfig,
|
||||
signatory: client.binding.Primitive.Party,
|
||||
observers: List[client.binding.Primitive.Party],
|
||||
maxInFlightCommands: Int,
|
||||
submissionBatchSize: Int,
|
||||
)(implicit ec: ExecutionContext): Future[CommandSubmitter.SubmissionSummary] = {
|
||||
)(implicit ec: ExecutionContext): Future[Unit] = {
|
||||
logger.info("Generating contracts...")
|
||||
(for {
|
||||
_ <- submitCommands(
|
||||
config,
|
||||
signatory,
|
||||
observers,
|
||||
maxInFlightCommands,
|
||||
submissionBatchSize,
|
||||
generator = generator,
|
||||
config = config,
|
||||
signatory = signatory,
|
||||
maxInFlightCommands = maxInFlightCommands,
|
||||
submissionBatchSize = submissionBatchSize,
|
||||
)
|
||||
} yield {
|
||||
logger.info("Commands submitted successfully.")
|
||||
// TODO Refactor: Extract the SubmissionSummery construction out of this method
|
||||
CommandSubmitter.SubmissionSummary(observers = observers)
|
||||
()
|
||||
})
|
||||
.recoverWith { case NonFatal(ex) =>
|
||||
logger.error(s"Command submission failed. Details: ${ex.getLocalizedMessage}", ex)
|
||||
@ -118,7 +122,7 @@ case class CommandSubmitter(
|
||||
|
||||
private def submitAndWait(
|
||||
id: String,
|
||||
party: Primitive.Party,
|
||||
actAs: Seq[Primitive.Party],
|
||||
commands: Seq[Command],
|
||||
)(implicit
|
||||
ec: ExecutionContext
|
||||
@ -127,7 +131,7 @@ case class CommandSubmitter(
|
||||
ledgerId = benchtoolUserServices.ledgerId,
|
||||
applicationId = names.benchtoolApplicationId,
|
||||
commandId = id,
|
||||
party = party.unwrap,
|
||||
actAs = actAs.map(_.unwrap),
|
||||
commands = commands,
|
||||
workflowId = names.workflowId,
|
||||
)
|
||||
@ -139,9 +143,9 @@ case class CommandSubmitter(
|
||||
}
|
||||
|
||||
private def submitCommands(
|
||||
generator: CommandGenerator,
|
||||
config: SubmissionConfig,
|
||||
signatory: Primitive.Party,
|
||||
observers: List[Primitive.Party],
|
||||
maxInFlightCommands: Int,
|
||||
submissionBatchSize: Int,
|
||||
)(implicit
|
||||
@ -163,14 +167,6 @@ case class CommandSubmitter(
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
val generator = CommandGenerator(
|
||||
randomnessProvider = RandomnessProvider.Default,
|
||||
signatory = signatory,
|
||||
config = config,
|
||||
observers = observers,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
s"Submitting commands ($numBatches commands, $submissionBatchSize contracts per command)..."
|
||||
)
|
||||
@ -192,7 +188,7 @@ case class CommandSubmitter(
|
||||
timed(submitAndWaitTimer, metricsManager)(
|
||||
submitAndWait(
|
||||
id = names.commandId(index),
|
||||
party = signatory,
|
||||
actAs = Seq(signatory),
|
||||
commands = commands.flatten,
|
||||
)
|
||||
)
|
||||
|
@ -1,129 +0,0 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.api.benchtool.submission
|
||||
|
||||
import com.daml.ledger.api.benchtool.submission.EventsObserver._
|
||||
import com.daml.ledger.api.benchtool.util.ObserverWithResult
|
||||
import com.daml.ledger.api.v1.event.CreatedEvent
|
||||
import com.daml.ledger.api.v1.transaction.{TransactionTree, TreeEvent}
|
||||
import com.daml.ledger.api.v1.transaction_service.GetTransactionTreesResponse
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
object EventsObserver {
|
||||
|
||||
def apply(expectedTemplateNames: Set[String]): EventsObserver = new EventsObserver(
|
||||
logger = LoggerFactory.getLogger(getClass),
|
||||
expectedTemplateNames = expectedTemplateNames,
|
||||
)
|
||||
|
||||
case class ObservedExerciseEvent(
|
||||
templateName: String,
|
||||
choiceName: String,
|
||||
choiceArgumentsSerializedSize: Int,
|
||||
consuming: Boolean,
|
||||
)
|
||||
|
||||
case class ObservedCreateEvent(templateName: String, createArgumentsSerializedSize: Int)
|
||||
|
||||
case class ObservedEvents(
|
||||
expectedTemplateNames: Set[String],
|
||||
createEvents: Seq[ObservedCreateEvent],
|
||||
exerciseEvents: Seq[ObservedExerciseEvent],
|
||||
) {
|
||||
private val _actualTemplateNames =
|
||||
(createEvents.map(_.templateName) ++ exerciseEvents.map(_.templateName)).toSet
|
||||
require(
|
||||
_actualTemplateNames.subsetOf(expectedTemplateNames),
|
||||
s"${_actualTemplateNames} must be a subset of $expectedTemplateNames",
|
||||
)
|
||||
|
||||
val consumingExercises: Seq[ObservedExerciseEvent] = exerciseEvents.filter(_.consuming)
|
||||
val nonConsumingExercises: Seq[ObservedExerciseEvent] = exerciseEvents.filterNot(_.consuming)
|
||||
|
||||
val avgSizeOfConsumingExercise: Int = {
|
||||
if (consumingExercises.isEmpty) 0
|
||||
else consumingExercises.map(_.choiceArgumentsSerializedSize).sum / consumingExercises.size
|
||||
}
|
||||
|
||||
val avgSizeOfNonconsumingExercise: Int = {
|
||||
if (nonConsumingExercises.isEmpty) 0
|
||||
else
|
||||
nonConsumingExercises.map(_.choiceArgumentsSerializedSize).sum / nonConsumingExercises.size
|
||||
}
|
||||
|
||||
val numberOfCreatesPerTemplateName: Map[String, Int] = {
|
||||
val groups = createEvents.groupBy(_.templateName)
|
||||
expectedTemplateNames.map(name => name -> groups.get(name).fold(0)(_.size)).toMap
|
||||
}
|
||||
|
||||
val avgSizeOfCreateEventPerTemplateName: Map[String, Int] = {
|
||||
val groups = createEvents.groupBy(_.templateName)
|
||||
expectedTemplateNames.map { name =>
|
||||
val avgSize = groups
|
||||
.get(name)
|
||||
.fold(0)(events =>
|
||||
if (events.isEmpty) 0 else events.map(_.createArgumentsSerializedSize).sum / events.size
|
||||
)
|
||||
name -> avgSize
|
||||
}.toMap
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** Collects information about create and exercise events.
|
||||
*/
|
||||
class EventsObserver(expectedTemplateNames: Set[String], logger: Logger)
|
||||
extends ObserverWithResult[GetTransactionTreesResponse, ObservedEvents](logger) {
|
||||
|
||||
private val createEvents = collection.mutable.ArrayBuffer[ObservedCreateEvent]()
|
||||
private val exerciseEvents = collection.mutable.ArrayBuffer[ObservedExerciseEvent]()
|
||||
|
||||
override def streamName: String = "dummy-stream-name"
|
||||
|
||||
override def onNext(value: GetTransactionTreesResponse): Unit = {
|
||||
value.transactions.foreach { transaction: TransactionTree =>
|
||||
val rootEvents = transaction.rootEventIds.map(transaction.eventsById)
|
||||
rootEvents.foreach { event: TreeEvent =>
|
||||
event.kind.created.foreach { created: CreatedEvent =>
|
||||
val argsSize = created.createArguments.fold(0)(_.serializedSize)
|
||||
val templateName =
|
||||
created.templateId.getOrElse(sys.error(s"Expected templateId in $created")).entityName
|
||||
createEvents.addOne(
|
||||
ObservedCreateEvent(
|
||||
templateName = templateName,
|
||||
createArgumentsSerializedSize = argsSize,
|
||||
)
|
||||
)
|
||||
}
|
||||
event.kind.exercised.foreach { exercised =>
|
||||
val argsSize = exercised.choiceArgument.fold(0)(_.serializedSize)
|
||||
val templateName = exercised.templateId
|
||||
.getOrElse(sys.error(s"Expected templateId in $exercised"))
|
||||
.entityName
|
||||
val choiceName = exercised.choice
|
||||
exerciseEvents.addOne(
|
||||
ObservedExerciseEvent(
|
||||
templateName = templateName,
|
||||
choiceName = choiceName,
|
||||
choiceArgumentsSerializedSize = argsSize,
|
||||
consuming = exercised.consuming,
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def completeWith(): Future[ObservedEvents] = Future.successful(
|
||||
ObservedEvents(
|
||||
expectedTemplateNames = expectedTemplateNames,
|
||||
createEvents = createEvents.toList,
|
||||
exerciseEvents = exerciseEvents.toList,
|
||||
)
|
||||
)
|
||||
}
|
@ -0,0 +1,45 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.api.benchtool.submission
|
||||
|
||||
import com.daml.ledger.api.benchtool.util.ObserverWithResult
|
||||
import com.daml.ledger.api.v1.transaction_service.GetTransactionsResponse
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
object FlatEventsObserver {
|
||||
def apply(expectedTemplateNames: Set[String]): FlatEventsObserver = new FlatEventsObserver(
|
||||
logger = LoggerFactory.getLogger(getClass),
|
||||
expectedTemplateNames = expectedTemplateNames,
|
||||
)
|
||||
}
|
||||
|
||||
/** Collects information about create and exercise events.
|
||||
*/
|
||||
class FlatEventsObserver(expectedTemplateNames: Set[String], logger: Logger)
|
||||
extends ObserverWithResult[GetTransactionsResponse, ObservedEvents](logger) {
|
||||
|
||||
private val createEvents = collection.mutable.ArrayBuffer[ObservedCreateEvent]()
|
||||
private val exerciseEvents = collection.mutable.ArrayBuffer[ObservedExerciseEvent]()
|
||||
|
||||
override def streamName: String = "dummy-stream-name"
|
||||
|
||||
override def onNext(value: GetTransactionsResponse): Unit =
|
||||
for {
|
||||
transaction <- value.transactions
|
||||
event <- transaction.events
|
||||
created <- event.event.created.toList
|
||||
} {
|
||||
createEvents.addOne(ObservedCreateEvent(created))
|
||||
}
|
||||
|
||||
override def completeWith(): Future[ObservedEvents] = Future.successful(
|
||||
ObservedEvents(
|
||||
expectedTemplateNames = expectedTemplateNames,
|
||||
createEvents = createEvents.toList,
|
||||
exerciseEvents = exerciseEvents.toList,
|
||||
)
|
||||
)
|
||||
}
|
@ -4,63 +4,45 @@
|
||||
package com.daml.ledger.api.benchtool.submission
|
||||
|
||||
import com.daml.ledger.api.benchtool.config.WorkflowConfig.FooSubmissionConfig
|
||||
import com.daml.ledger.api.v1.commands.{Command, ExerciseByKeyCommand}
|
||||
import com.daml.ledger.api.v1.commands.Command
|
||||
import com.daml.ledger.api.v1.commands.ExerciseByKeyCommand
|
||||
import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value}
|
||||
import com.daml.ledger.client.binding.Primitive
|
||||
import com.daml.ledger.test.model.Foo._
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
import com.daml.ledger.client.binding
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
import scala.util.{Failure, Try}
|
||||
|
||||
case class FooTemplateDescriptor(
|
||||
templateId: Identifier,
|
||||
consumingChoiceName: String,
|
||||
nonconsumingChoiceName: String,
|
||||
)
|
||||
|
||||
/** NOTE: Keep me in sync with `Foo.daml`
|
||||
*/
|
||||
object FooTemplateDescriptor {
|
||||
|
||||
val Foo1: FooTemplateDescriptor = FooTemplateDescriptor(
|
||||
templateId = com.daml.ledger.test.model.Foo.Foo1.id.asInstanceOf[Identifier],
|
||||
consumingChoiceName = "Foo1_ConsumingChoice",
|
||||
nonconsumingChoiceName = "Foo1_NonconsumingChoice",
|
||||
)
|
||||
val Foo2: FooTemplateDescriptor = FooTemplateDescriptor(
|
||||
templateId = com.daml.ledger.test.model.Foo.Foo2.id.asInstanceOf[Identifier],
|
||||
consumingChoiceName = "Foo2_ConsumingChoice",
|
||||
nonconsumingChoiceName = "Foo2_NonconsumingChoice",
|
||||
)
|
||||
val Foo3: FooTemplateDescriptor = FooTemplateDescriptor(
|
||||
templateId = com.daml.ledger.test.model.Foo.Foo3.id.asInstanceOf[Identifier],
|
||||
consumingChoiceName = "Foo3_ConsumingChoice",
|
||||
nonconsumingChoiceName = "Foo3_NonconsumingChoice",
|
||||
)
|
||||
}
|
||||
|
||||
final class FooCommandGenerator(
|
||||
randomnessProvider: RandomnessProvider,
|
||||
config: FooSubmissionConfig,
|
||||
signatory: Primitive.Party,
|
||||
observers: List[Primitive.Party],
|
||||
allObservers: List[Primitive.Party],
|
||||
) extends CommandGenerator {
|
||||
private val distribution = new Distribution(config.instanceDistribution.map(_.weight))
|
||||
private val descriptionMapping: Map[Int, FooSubmissionConfig.ContractDescription] =
|
||||
config.instanceDistribution.zipWithIndex
|
||||
.map(_.swap)
|
||||
.toMap
|
||||
private val observersWithIndices: List[(Primitive.Party, Int)] = observers.zipWithIndex
|
||||
private val nextCommandId = new AtomicLong(0)
|
||||
private val observersWithUnlikelihood: List[(Primitive.Party, Int)] =
|
||||
allObservers.zipWithIndex.toMap.view.mapValues(unlikelihood).toList
|
||||
|
||||
/** @return denominator of a 1/(10**i) likelihood
|
||||
*/
|
||||
private def unlikelihood(i: Int): Int = math.pow(10.0, i.toDouble).toInt
|
||||
|
||||
def next(): Try[Seq[Command]] =
|
||||
(for {
|
||||
(description, observers) <- Try((pickDescription(), pickObservers()))
|
||||
(description, observers) <- Try(
|
||||
(pickDescription(), pickObservers())
|
||||
)
|
||||
createContractPayload <- Try(randomPayload(description.payloadSizeBytes))
|
||||
command = createContractCommand(
|
||||
templateName = description.template,
|
||||
command = createCommands(
|
||||
templateDescriptor = FooTemplateDescriptor.forName(description.template),
|
||||
signatory = signatory,
|
||||
observers = observers,
|
||||
payload = createContractPayload,
|
||||
@ -78,87 +60,70 @@ final class FooCommandGenerator(
|
||||
descriptionMapping(distribution.index(randomnessProvider.randomDouble()))
|
||||
|
||||
private def pickObservers(): List[Primitive.Party] =
|
||||
observersWithIndices
|
||||
.filter { case (_, index) => isObserverUsed(index) }
|
||||
observersWithUnlikelihood
|
||||
.filter { case (_, unlikelihood) => randomDraw(unlikelihood) }
|
||||
.map(_._1)
|
||||
|
||||
private def isObserverUsed(i: Int): Boolean =
|
||||
randomnessProvider.randomNatural(math.pow(10.0, i.toDouble).toInt) == 0
|
||||
private def randomDraw(unlikelihood: Int): Boolean =
|
||||
randomnessProvider.randomNatural(unlikelihood) == 0
|
||||
|
||||
private def createContractCommand(
|
||||
templateName: String,
|
||||
private def createCommands(
|
||||
templateDescriptor: FooTemplateDescriptor,
|
||||
signatory: Primitive.Party,
|
||||
observers: List[Primitive.Party],
|
||||
payload: String,
|
||||
): Seq[Command] = {
|
||||
val commandId = nextCommandId.getAndIncrement()
|
||||
val consumingExercisePayload: Option[String] = config.consumingExercises
|
||||
.flatMap(c =>
|
||||
Option.when(randomnessProvider.randomDouble() <= c.probability)(c.payloadSizeBytes)
|
||||
)
|
||||
.map(randomPayload)
|
||||
val nonconsumingExercisePayload: Seq[String] =
|
||||
config.nonConsumingExercises.fold(Seq.empty[String]) { c =>
|
||||
var f = c.probability.toInt
|
||||
if (randomnessProvider.randomDouble() <= c.probability - f) {
|
||||
val contractNumber = FooCommandGenerator.nextContractNumber.getAndIncrement()
|
||||
val fooKeyId = "foo-" + contractNumber
|
||||
val fooContractKey = FooCommandGenerator.makeContractKeyValue(signatory, fooKeyId)
|
||||
val createFooCmd = templateDescriptor.name match {
|
||||
case "Foo1" => Foo1(signatory, observers, payload, keyId = fooKeyId).create.command
|
||||
case "Foo2" => Foo2(signatory, observers, payload, keyId = fooKeyId).create.command
|
||||
case "Foo3" => Foo3(signatory, observers, payload, keyId = fooKeyId).create.command
|
||||
}
|
||||
val nonconsumingExercisePayloads: Seq[String] =
|
||||
config.nonConsumingExercises.fold(Seq.empty[String]) { config =>
|
||||
var f = config.probability.toInt
|
||||
if (randomnessProvider.randomDouble() <= config.probability - f) {
|
||||
f += 1
|
||||
}
|
||||
Seq.fill[String](f)(randomPayload(c.payloadSizeBytes))
|
||||
Seq.fill[String](f)(randomPayload(config.payloadSizeBytes))
|
||||
}
|
||||
val (templateDesc, createCmd) = templateName match {
|
||||
case "Foo1" =>
|
||||
(
|
||||
FooTemplateDescriptor.Foo1,
|
||||
Foo1(signatory, observers, payload, id = commandId).create.command,
|
||||
)
|
||||
case "Foo2" =>
|
||||
(
|
||||
FooTemplateDescriptor.Foo2,
|
||||
Foo2(signatory, observers, payload, id = commandId).create.command,
|
||||
)
|
||||
case "Foo3" =>
|
||||
(
|
||||
FooTemplateDescriptor.Foo3,
|
||||
Foo3(signatory, observers, payload, id = commandId).create.command,
|
||||
)
|
||||
case invalid => sys.error(s"Invalid template: $invalid")
|
||||
val nonconsumingExercises = nonconsumingExercisePayloads.map { payload =>
|
||||
makeExerciseByKeyCommand(
|
||||
templateId = templateDescriptor.templateId,
|
||||
choiceName = templateDescriptor.nonconsumingChoiceName,
|
||||
args = Seq(
|
||||
RecordField(
|
||||
label = "exercisePayload",
|
||||
value = Some(Value(Value.Sum.Text(payload))),
|
||||
)
|
||||
),
|
||||
)(contractKey = fooContractKey)
|
||||
}
|
||||
val consumingExerciseO: Option[Command] = config.consumingExercises
|
||||
.flatMap(config =>
|
||||
if (randomnessProvider.randomDouble() <= config.probability) {
|
||||
val payload = randomPayload(config.payloadSizeBytes)
|
||||
Some(
|
||||
makeExerciseByKeyCommand(
|
||||
templateId = templateDescriptor.templateId,
|
||||
choiceName = templateDescriptor.consumingChoiceName,
|
||||
args = Seq(
|
||||
RecordField(
|
||||
label = "exercisePayload",
|
||||
value = Some(Value(Value.Sum.Text(payload))),
|
||||
)
|
||||
),
|
||||
)(contractKey = fooContractKey)
|
||||
)
|
||||
|
||||
val contractKey = Value(
|
||||
Value.Sum.Record(
|
||||
Record(
|
||||
None,
|
||||
Seq(
|
||||
RecordField(
|
||||
value = Some(Value(Value.Sum.Party(signatory.toString)))
|
||||
),
|
||||
RecordField(
|
||||
value = Some(Value(Value.Sum.Int64(commandId)))
|
||||
),
|
||||
),
|
||||
)
|
||||
} else None
|
||||
)
|
||||
)
|
||||
val nonconsumingExercises = nonconsumingExercisePayload.map { payload =>
|
||||
createExerciseByKeyCmd(
|
||||
templateId = templateDesc.templateId,
|
||||
choiceName = templateDesc.nonconsumingChoiceName,
|
||||
argValue = payload,
|
||||
)(contractKey = contractKey)
|
||||
}
|
||||
val consumingExerciseO = consumingExercisePayload.fold[Option[Command]](None)(payload =>
|
||||
Some(
|
||||
createExerciseByKeyCmd(
|
||||
templateId = templateDesc.templateId,
|
||||
choiceName = templateDesc.consumingChoiceName,
|
||||
argValue = payload,
|
||||
)(contractKey = contractKey)
|
||||
)
|
||||
)
|
||||
Seq(createCmd) ++ nonconsumingExercises ++ consumingExerciseO.toList
|
||||
Seq(createFooCmd) ++ nonconsumingExercises ++ consumingExerciseO.toList
|
||||
}
|
||||
|
||||
def createExerciseByKeyCmd(templateId: Identifier, choiceName: String, argValue: String)(
|
||||
def makeExerciseByKeyCommand(templateId: Identifier, choiceName: String, args: Seq[RecordField])(
|
||||
contractKey: Value
|
||||
): Command = {
|
||||
val choiceArgument = Some(
|
||||
@ -166,12 +131,7 @@ final class FooCommandGenerator(
|
||||
Value.Sum.Record(
|
||||
Record(
|
||||
None,
|
||||
Seq(
|
||||
RecordField(
|
||||
label = "exercisePayload",
|
||||
value = Some(Value(Value.Sum.Text(argValue))),
|
||||
)
|
||||
),
|
||||
args,
|
||||
)
|
||||
)
|
||||
)
|
||||
@ -195,6 +155,32 @@ final class FooCommandGenerator(
|
||||
}
|
||||
|
||||
object FooCommandGenerator {
|
||||
|
||||
private[submission] val nextContractNumber = new AtomicLong(0)
|
||||
|
||||
/** @return A DAML tuple of type `(Party, Text)`
|
||||
*/
|
||||
private[submission] def makeContractKeyValue(
|
||||
party: binding.Primitive.Party,
|
||||
keyId: String,
|
||||
): Value = {
|
||||
Value(
|
||||
Value.Sum.Record(
|
||||
Record(
|
||||
None,
|
||||
Seq(
|
||||
RecordField(
|
||||
value = Some(Value(Value.Sum.Party(party.toString)))
|
||||
),
|
||||
RecordField(
|
||||
value = Some(Value(Value.Sum.Text(keyId)))
|
||||
),
|
||||
),
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
case class CommandGeneratorError(msg: String, cause: Throwable)
|
||||
extends RuntimeException(msg, cause)
|
||||
|
||||
|
@ -0,0 +1,43 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.api.benchtool.submission
|
||||
|
||||
import com.daml.ledger.api.benchtool.config.WorkflowConfig.FooSubmissionConfig
|
||||
import com.daml.ledger.client.binding
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
/** Generates and submits Foo and related commands
|
||||
*/
|
||||
class FooSubmission(
|
||||
submitter: CommandSubmitter,
|
||||
maxInFlightCommands: Int,
|
||||
submissionBatchSize: Int,
|
||||
submissionConfig: FooSubmissionConfig,
|
||||
signatory: binding.Primitive.Party,
|
||||
allObservers: List[binding.Primitive.Party],
|
||||
) {
|
||||
|
||||
def performSubmission()(implicit
|
||||
ec: ExecutionContext
|
||||
): Future[Unit] = {
|
||||
val generator = new FooCommandGenerator(
|
||||
randomnessProvider = RandomnessProvider.Default,
|
||||
signatory = signatory,
|
||||
config = submissionConfig,
|
||||
allObservers = allObservers,
|
||||
)
|
||||
for {
|
||||
_ <- submitter
|
||||
.generateAndSubmit(
|
||||
generator = generator,
|
||||
config = submissionConfig,
|
||||
signatory = signatory,
|
||||
maxInFlightCommands = maxInFlightCommands,
|
||||
submissionBatchSize = submissionBatchSize,
|
||||
)
|
||||
} yield ()
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.api.benchtool.submission
|
||||
|
||||
import com.daml.ledger.api.v1.value.Identifier
|
||||
|
||||
case class FooTemplateDescriptor(
|
||||
name: String,
|
||||
templateId: Identifier,
|
||||
consumingChoiceName: String,
|
||||
nonconsumingChoiceName: String,
|
||||
)
|
||||
|
||||
/** NOTE: Keep me in sync with `Foo.daml`
|
||||
*/
|
||||
object FooTemplateDescriptor {
|
||||
|
||||
val Foo1: FooTemplateDescriptor = FooTemplateDescriptor(
|
||||
name = "Foo1",
|
||||
templateId = com.daml.ledger.test.model.Foo.Foo1.id.asInstanceOf[Identifier],
|
||||
consumingChoiceName = "Foo1_ConsumingChoice",
|
||||
nonconsumingChoiceName = "Foo1_NonconsumingChoice",
|
||||
)
|
||||
val Foo2: FooTemplateDescriptor = FooTemplateDescriptor(
|
||||
name = "Foo2",
|
||||
templateId = com.daml.ledger.test.model.Foo.Foo2.id.asInstanceOf[Identifier],
|
||||
consumingChoiceName = "Foo2_ConsumingChoice",
|
||||
nonconsumingChoiceName = "Foo2_NonconsumingChoice",
|
||||
)
|
||||
val Foo3: FooTemplateDescriptor = FooTemplateDescriptor(
|
||||
name = "Foo3",
|
||||
templateId = com.daml.ledger.test.model.Foo.Foo3.id.asInstanceOf[Identifier],
|
||||
consumingChoiceName = "Foo3_ConsumingChoice",
|
||||
nonconsumingChoiceName = "Foo3_NonconsumingChoice",
|
||||
)
|
||||
|
||||
private val all: Map[String, FooTemplateDescriptor] =
|
||||
List(Foo1, Foo2, Foo3).map(foo => foo.name -> foo).toMap
|
||||
|
||||
def forName(templateName: String): FooTemplateDescriptor =
|
||||
all.getOrElse(templateName, sys.error(s"Invalid template: $templateName"))
|
||||
|
||||
}
|
@ -13,10 +13,9 @@ class Names {
|
||||
val workflowId = s"$benchtoolApplicationId-$identifierSuffix"
|
||||
val signatoryPartyName = s"signatory-$identifierSuffix"
|
||||
|
||||
def observerPartyName(index: Int, uniqueParties: Boolean): String = {
|
||||
def observerPartyName(index: Int, uniqueParties: Boolean): String =
|
||||
if (uniqueParties) s"Obs-$index-$identifierSuffix"
|
||||
else s"Obs-$index"
|
||||
}
|
||||
|
||||
def observerPartyNames(numberOfObservers: Int, uniqueParties: Boolean): Seq[String] =
|
||||
(0 until numberOfObservers).map(i => observerPartyName(i, uniqueParties))
|
||||
|
@ -0,0 +1,132 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.api.benchtool.submission
|
||||
|
||||
import com.daml.ledger.api.benchtool.util.ObserverWithResult
|
||||
import com.daml.ledger.api.v1.transaction_service.GetTransactionTreesResponse
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
case class ObservedExerciseEvent(
|
||||
templateName: String,
|
||||
choiceName: String,
|
||||
choiceArgumentsSerializedSize: Int,
|
||||
consuming: Boolean,
|
||||
)
|
||||
object ObservedExerciseEvent {
|
||||
def apply(exercised: com.daml.ledger.api.v1.event.ExercisedEvent): ObservedExerciseEvent = {
|
||||
val argsSize = exercised.choiceArgument.fold(0)(_.serializedSize)
|
||||
val templateName = exercised.templateId
|
||||
.getOrElse(sys.error(s"Expected templateId in $exercised"))
|
||||
.entityName
|
||||
val choiceName = exercised.choice
|
||||
ObservedExerciseEvent(
|
||||
templateName = templateName,
|
||||
choiceName = choiceName,
|
||||
choiceArgumentsSerializedSize = argsSize,
|
||||
consuming = exercised.consuming,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
case class ObservedCreateEvent(templateName: String, createArgumentsSerializedSize: Int)
|
||||
object ObservedCreateEvent {
|
||||
def apply(created: com.daml.ledger.api.v1.event.CreatedEvent): ObservedCreateEvent = {
|
||||
val argsSize = created.createArguments.fold(0)(_.serializedSize)
|
||||
val templateName =
|
||||
created.templateId.getOrElse(sys.error(s"Expected templateId in $created")).entityName
|
||||
ObservedCreateEvent(
|
||||
templateName = templateName,
|
||||
createArgumentsSerializedSize = argsSize,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
case class ObservedEvents(
|
||||
expectedTemplateNames: Set[String],
|
||||
createEvents: Seq[ObservedCreateEvent],
|
||||
exerciseEvents: Seq[ObservedExerciseEvent],
|
||||
) {
|
||||
private val _actualTemplateNames =
|
||||
(createEvents.map(_.templateName) ++ exerciseEvents.map(_.templateName)).toSet
|
||||
require(
|
||||
_actualTemplateNames.subsetOf(expectedTemplateNames),
|
||||
s"${_actualTemplateNames} must be a subset of $expectedTemplateNames",
|
||||
)
|
||||
|
||||
val consumingExercises: Seq[ObservedExerciseEvent] = exerciseEvents.filter(_.consuming)
|
||||
val nonConsumingExercises: Seq[ObservedExerciseEvent] = exerciseEvents.filterNot(_.consuming)
|
||||
|
||||
val avgSizeOfConsumingExercise: Int = {
|
||||
if (consumingExercises.isEmpty) 0
|
||||
else consumingExercises.map(_.choiceArgumentsSerializedSize).sum / consumingExercises.size
|
||||
}
|
||||
|
||||
val avgSizeOfNonconsumingExercise: Int = {
|
||||
if (nonConsumingExercises.isEmpty) 0
|
||||
else
|
||||
nonConsumingExercises.map(_.choiceArgumentsSerializedSize).sum / nonConsumingExercises.size
|
||||
}
|
||||
|
||||
val numberOfCreatesPerTemplateName: Map[String, Int] = {
|
||||
val groups = createEvents.groupBy(_.templateName)
|
||||
expectedTemplateNames.map(name => name -> groups.get(name).fold(0)(_.size)).toMap
|
||||
}
|
||||
|
||||
val avgSizeOfCreateEventPerTemplateName: Map[String, Int] = {
|
||||
val groups = createEvents.groupBy(_.templateName)
|
||||
expectedTemplateNames.map { name =>
|
||||
val avgSize = groups
|
||||
.get(name)
|
||||
.fold(0)(events =>
|
||||
if (events.isEmpty) 0 else events.map(_.createArgumentsSerializedSize).sum / events.size
|
||||
)
|
||||
name -> avgSize
|
||||
}.toMap
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object TreeEventsObserver {
|
||||
|
||||
def apply(expectedTemplateNames: Set[String]): TreeEventsObserver = new TreeEventsObserver(
|
||||
logger = LoggerFactory.getLogger(getClass),
|
||||
expectedTemplateNames = expectedTemplateNames,
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
/** Collects information about create and exercise events.
|
||||
*/
|
||||
class TreeEventsObserver(expectedTemplateNames: Set[String], logger: Logger)
|
||||
extends ObserverWithResult[GetTransactionTreesResponse, ObservedEvents](logger) {
|
||||
|
||||
private val createEvents = collection.mutable.ArrayBuffer[ObservedCreateEvent]()
|
||||
private val exerciseEvents = collection.mutable.ArrayBuffer[ObservedExerciseEvent]()
|
||||
|
||||
override def streamName: String = "dummy-stream-name"
|
||||
|
||||
override def onNext(value: GetTransactionTreesResponse): Unit = {
|
||||
for {
|
||||
transaction <- value.transactions
|
||||
allEvents = transaction.eventsById.values
|
||||
event <- allEvents
|
||||
} {
|
||||
event.kind.created.foreach(created => createEvents.addOne(ObservedCreateEvent(created)))
|
||||
event.kind.exercised.foreach(exercised =>
|
||||
exerciseEvents.addOne(ObservedExerciseEvent(exercised))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override def completeWith(): Future[ObservedEvents] =
|
||||
Future.successful(
|
||||
ObservedEvents(
|
||||
expectedTemplateNames = expectedTemplateNames,
|
||||
createEvents = createEvents.toList,
|
||||
exerciseEvents = exerciseEvents.toList,
|
||||
)
|
||||
)
|
||||
}
|
@ -9,7 +9,6 @@ import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager
|
||||
import com.daml.ledger.api.benchtool.services.LedgerApiServices
|
||||
import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll
|
||||
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
|
||||
import com.daml.ledger.api.v1.value.Identifier
|
||||
import com.daml.platform.sandbox.fixture.SandboxFixture
|
||||
import org.scalatest.AppendedClues
|
||||
import org.scalatest.flatspec.AsyncFlatSpec
|
||||
@ -43,19 +42,22 @@ class FibonacciCommandSubmitterITSpec
|
||||
metricRegistry = new MetricRegistry,
|
||||
metricsManager = NoOpMetricsManager(),
|
||||
)
|
||||
(signatory, observers) <- tested.prepare(config)
|
||||
_ <- tested.submit(
|
||||
(signatory, _) <- tested.prepare(config)
|
||||
generator = new FibonacciCommandGenerator(
|
||||
signatory = signatory,
|
||||
config = config,
|
||||
)
|
||||
_ <- tested.generateAndSubmit(
|
||||
generator = generator,
|
||||
config = config,
|
||||
signatory = signatory,
|
||||
observers = observers,
|
||||
maxInFlightCommands = 1,
|
||||
submissionBatchSize = 5,
|
||||
)
|
||||
eventsObserver = EventsObserver(expectedTemplateNames =
|
||||
eventsObserver = TreeEventsObserver(expectedTemplateNames =
|
||||
Set(
|
||||
com.daml.ledger.test.model.Bench.InefficientFibonacci.id
|
||||
.asInstanceOf[Identifier]
|
||||
.entityName
|
||||
"InefficientFibonacci",
|
||||
"InefficientFibonacciResult",
|
||||
)
|
||||
)
|
||||
_ <- apiServices.transactionService.transactionTrees(
|
||||
@ -75,7 +77,9 @@ class FibonacciCommandSubmitterITSpec
|
||||
)
|
||||
observerResult <- eventsObserver.result
|
||||
} yield {
|
||||
observerResult.createEvents.size shouldBe config.numberOfInstances withClue ("number of create events")
|
||||
observerResult.numberOfCreatesPerTemplateName(
|
||||
"InefficientFibonacci"
|
||||
) shouldBe config.numberOfInstances withClue ("number of create events")
|
||||
succeed
|
||||
}
|
||||
}
|
||||
|
@ -11,7 +11,6 @@ import com.daml.ledger.api.benchtool.config.WorkflowConfig.FooSubmissionConfig.{
|
||||
}
|
||||
import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager
|
||||
import com.daml.ledger.api.benchtool.services.LedgerApiServices
|
||||
import com.daml.ledger.api.benchtool.submission.EventsObserver.ObservedEvents
|
||||
import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll
|
||||
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
|
||||
import com.daml.platform.sandbox.fixture.SandboxFixture
|
||||
@ -73,14 +72,20 @@ class FooCommandSubmitterITSpec
|
||||
metricsManager = NoOpMetricsManager(),
|
||||
)
|
||||
(signatory, observers) <- tested.prepare(config)
|
||||
_ <- tested.submit(
|
||||
generator: CommandGenerator = new FooCommandGenerator(
|
||||
randomnessProvider = RandomnessProvider.Default,
|
||||
signatory = signatory,
|
||||
config = config,
|
||||
allObservers = observers,
|
||||
)
|
||||
_ <- tested.generateAndSubmit(
|
||||
generator = generator,
|
||||
config = config,
|
||||
signatory = signatory,
|
||||
observers = observers,
|
||||
maxInFlightCommands = 1,
|
||||
submissionBatchSize = 5,
|
||||
)
|
||||
eventsObserver = EventsObserver(expectedTemplateNames = Set("Foo1", "Foo2"))
|
||||
eventsObserver = TreeEventsObserver(expectedTemplateNames = Set("Foo1", "Foo2"))
|
||||
_ <- apiServices.transactionService.transactionTrees(
|
||||
config = WorkflowConfig.StreamConfig.TransactionTreesStreamConfig(
|
||||
name = "dummy-name",
|
||||
|
@ -8,11 +8,11 @@ template Foo1
|
||||
signatory : Party
|
||||
observers : [Party]
|
||||
payload : Text
|
||||
id: Int
|
||||
keyId: Text
|
||||
where
|
||||
signatory signatory
|
||||
observer observers
|
||||
key (signatory, id): (Party, Int)
|
||||
key (signatory, keyId): (Party, Text)
|
||||
maintainer key._1
|
||||
|
||||
nonconsuming choice Foo1_NonconsumingChoice: ()
|
||||
@ -34,11 +34,11 @@ template Foo2
|
||||
signatory : Party
|
||||
observers : [Party]
|
||||
payload : Text
|
||||
id: Int
|
||||
keyId: Text
|
||||
where
|
||||
signatory signatory
|
||||
observer observers
|
||||
key (signatory, id): (Party, Int)
|
||||
key (signatory, keyId): (Party, Text)
|
||||
maintainer key._1
|
||||
|
||||
nonconsuming choice Foo2_NonconsumingChoice: ()
|
||||
@ -60,11 +60,11 @@ template Foo3
|
||||
signatory : Party
|
||||
observers : [Party]
|
||||
payload : Text
|
||||
id: Int
|
||||
keyId: Text
|
||||
where
|
||||
signatory signatory
|
||||
observer observers
|
||||
key (signatory, id): (Party, Int)
|
||||
key (signatory, keyId): (Party, Text)
|
||||
maintainer key._1
|
||||
|
||||
nonconsuming choice Foo3_NonconsumingChoice: ()
|
||||
|
Loading…
Reference in New Issue
Block a user