From 7cd677e36761d802ed19793721de1be2fff017bc Mon Sep 17 00:00:00 2001 From: pbatko-da Date: Fri, 24 Jun 2022 11:33:05 +0200 Subject: [PATCH] [ETQ-Testing] Prepare Benchtool for extending it to support huge party filters [DPP-1079] (#14246) This a preparation before the adding support for huge party filters. Some of the changes: 1. Scan for existing parties even if when there is not submission step configured so that we can enrich stream configs fully. NOTE: ConfigEnricher will now never fail on missing party. 2. Extract party selection for contract creation to a separate class. 3. Expose `commandGenerationParallelism` and a couple of randomness providers to allow for reproducible party allotment to contracts - useful in tests. 4. Extract `benchtoolFixture` to reduce boilerplate in IT tests changelog_begin changelog_end --- ledger/ledger-api-bench-tool/BUILD.bazel | 4 + .../ledger/api/benchtool/ConfigEnricher.scala | 82 +++++++++++-------- .../api/benchtool/LedgerApiBenchTool.scala | 42 ++++++---- .../api/benchtool/config/WorkflowConfig.scala | 41 +++++----- .../config/WorkflowConfigParser.scala | 4 +- .../submission/AllocatedParties.scala | 37 +++++++++ .../submission/CommandSubmitter.scala | 71 +++------------- .../submission/FooCommandGenerator.scala | 60 +++++--------- .../benchtool/submission/FooSubmission.scala | 13 ++- .../api/benchtool/submission/Names.scala | 37 +++++++-- .../submission/PartyAllocating.scala | 72 ++++++++++++++++ .../submission/RandomnessProvider.scala | 8 +- .../submission/foo/PartiesSelection.scala | 11 +++ .../submission/foo/RandomPartySelecting.scala | 45 ++++++++++ .../benchtool/BenchtoolSandboxFixture.scala | 35 ++++++++ .../submission/TreeEventsObserver.scala | 5 ++ .../api/benchtool/ConfigEnricherSpec.scala | 73 +++++++++++++++++ .../submission/AllocatedPartiesSpec.scala | 45 ++++++++++ .../FibonacciCommandSubmitterITSpec.scala | 22 +---- .../FooCommandSubmitterITSpec.scala | 17 +--- .../NonStakeholderInformeesITSpec.scala | 17 +--- .../submission/PartyAllocationITSpec.scala | 17 +--- ...tedApplicationIdsAndSubmittersITSpec.scala | 18 +--- 23 files changed, 503 insertions(+), 273 deletions(-) create mode 100644 ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/AllocatedParties.scala create mode 100644 ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/PartyAllocating.scala create mode 100644 ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/foo/PartiesSelection.scala create mode 100644 ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/foo/RandomPartySelecting.scala create mode 100644 ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/ConfigEnricherSpec.scala create mode 100644 ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/AllocatedPartiesSpec.scala diff --git a/ledger/ledger-api-bench-tool/BUILD.bazel b/ledger/ledger-api-bench-tool/BUILD.bazel index 6778e91cb23..498b9946d78 100644 --- a/ledger/ledger-api-bench-tool/BUILD.bazel +++ b/ledger/ledger-api-bench-tool/BUILD.bazel @@ -78,6 +78,8 @@ da_scala_library( scala_deps = [ "@maven//:org_scalatest_scalatest_core", "@maven//:org_scalactic_scalactic", + "@maven//:com_typesafe_akka_akka_actor", + "@maven//:com_typesafe_akka_akka_stream", ], visibility = ["//visibility:public"], deps = [ @@ -87,9 +89,11 @@ da_scala_library( "//libs-scala/ports", "//ledger/test-common:dar-files-%s-lib" % "1.14", "//language-support/scala/bindings", + "//ledger-api/rs-grpc-bridge", "//ledger/sandbox-on-x", "//ledger/sandbox-on-x:sandbox-on-x-test-lib", "//ledger/test-common:dar-files-default-lib", + "@maven//:io_dropwizard_metrics_metrics_core", "@maven//:org_slf4j_slf4j_api", ], ) diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/ConfigEnricher.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/ConfigEnricher.scala index ebf525d9feb..5eca2ec5d4a 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/ConfigEnricher.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/ConfigEnricher.scala @@ -4,61 +4,71 @@ package com.daml.ledger.api.benchtool import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig -import com.daml.ledger.api.v1.value.Identifier +import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig.{ + ActiveContractsStreamConfig, + CompletionsStreamConfig, + TransactionTreesStreamConfig, + TransactionsStreamConfig, +} +import com.daml.ledger.api.benchtool.submission.AllocatedParties import com.daml.ledger.test.benchtool.Foo.{Foo1, Foo2, Foo3} import scalaz.syntax.tag._ -object ConfigEnricher { +class ConfigEnricher(allocatedParties: AllocatedParties) { + + private val templateNameToFullyQualifiedNameMap: Map[String, String] = List( + Foo1.id, + Foo2.id, + Foo3.id, + ).map { templateId => + val id = templateId.unwrap + id.entityName -> s"${id.packageId}:${id.moduleName}:${id.entityName}" + }.toMap def enrichStreamConfig( - streamConfig: StreamConfig, - submissionResult: Option[SubmissionStepResult], + streamConfig: StreamConfig ): StreamConfig = { streamConfig match { - case config: StreamConfig.TransactionsStreamConfig => - config.copy(filters = enrichFilters(config.filters, submissionResult)) - case config: StreamConfig.TransactionTreesStreamConfig => - config.copy(filters = enrichFilters(config.filters, submissionResult)) - case config: StreamConfig.ActiveContractsStreamConfig => - config.copy(filters = enrichFilters(config.filters, submissionResult)) - case config: StreamConfig.CompletionsStreamConfig => - config.copy(parties = config.parties.map(party => convertParty(party, submissionResult))) + case config: TransactionsStreamConfig => + config + .copy( + filters = enrichFilters(config.filters) + ) + case config: TransactionTreesStreamConfig => + config + .copy( + filters = enrichFilters(config.filters) + ) + case config: ActiveContractsStreamConfig => + config + .copy( + filters = enrichFilters(config.filters) + ) + case config: CompletionsStreamConfig => + config.copy(parties = config.parties.map(party => convertParty(party))) } } private def convertParty( - party: String, - submissionResult: Option[SubmissionStepResult], + partyShortName: String ): String = - submissionResult match { - case None => party - case Some(summary) => - summary.allocatedParties.allAllocatedParties - .map(_.unwrap) - .find(_.contains(party)) - .getOrElse(throw new RuntimeException(s"Observer not found: $party")) - } + allocatedParties.allAllocatedParties + .map(_.unwrap) + .find(_.contains(partyShortName)) + .getOrElse(partyShortName) private def enrichFilters( - filters: List[StreamConfig.PartyFilter], - submissionResult: Option[SubmissionStepResult], + filters: List[StreamConfig.PartyFilter] ): List[StreamConfig.PartyFilter] = { - def identifierToFullyQualifiedString(id: Identifier) = - s"${id.packageId}:${id.moduleName}:${id.entityName}" - def fullyQualifiedTemplateId(template: String): String = - template match { - case "Foo1" => identifierToFullyQualifiedString(Foo1.id.unwrap) - case "Foo2" => identifierToFullyQualifiedString(Foo2.id.unwrap) - case "Foo3" => identifierToFullyQualifiedString(Foo3.id.unwrap) - case other => other - } - filters.map { filter => StreamConfig.PartyFilter( - party = convertParty(filter.party, submissionResult), - templates = filter.templates.map(fullyQualifiedTemplateId), + party = convertParty(filter.party), + templates = filter.templates.map(convertTemplate), ) } } + def convertTemplate(shortTemplateName: String): String = + templateNameToFullyQualifiedNameMap.getOrElse(shortTemplateName, shortTemplateName) + } diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala index c45590691be..69f43b756e2 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala @@ -21,6 +21,7 @@ import com.daml.ledger.api.benchtool.metrics.{ } import com.daml.ledger.api.benchtool.services.LedgerApiServices import com.daml.ledger.api.benchtool.submission._ +import com.daml.ledger.api.benchtool.submission.foo.RandomPartySelecting import com.daml.ledger.api.benchtool.util.TypedActorSystemResourceOwner import com.daml.ledger.api.tls.TlsConfiguration import com.daml.ledger.participant.state.index.v2.UserManagementStore @@ -79,8 +80,6 @@ object LedgerApiBenchTool { } -case class SubmissionStepResult(allocatedParties: AllocatedParties) - class LedgerApiBenchTool( names: Names, authorizationHelper: Option[AuthorizationHelper], @@ -112,25 +111,33 @@ class LedgerApiBenchTool( val adminServices = servicesForUserId(UserManagementStore.DefaultParticipantAdminUserId) val regularUserServices = servicesForUserId(names.benchtoolUserId) + val partyAllocating = new PartyAllocating( + names = names, + adminServices = adminServices, + ) for { _ <- regularUserSetupStep(adminServices) - submissionStepResultO: Option[SubmissionStepResult] <- { + allocatedParties <- { config.workflow.submission match { case None => logger.info(s"No submission defined. Skipping.") - Future.successful(None) + for { + existingParties <- partyAllocating.lookupExistingParties() + } yield AllocatedParties.forExistingParties(existingParties.toList) case Some(submissionConfig) => submissionStep( regularUserServices = regularUserServices, adminServices = adminServices, submissionConfig = submissionConfig, metricRegistry = metricRegistry, - ).map(Some(_)) + partyAllocating = partyAllocating, + ) } } + configEnricher = new ConfigEnricher(allocatedParties) updatedStreamConfigs = config.workflow.streams.map(streamsConfig => - ConfigEnricher.enrichStreamConfig(streamsConfig, submissionStepResultO) + configEnricher.enrichStreamConfig(streamsConfig) ) _ = logger.info( @@ -138,13 +145,6 @@ class LedgerApiBenchTool( ) benchmarkResult <- if (config.latencyTest) { - val allocatedParties = submissionStepResultO - .map(_.allocatedParties) - .getOrElse( - throw new RuntimeException( - "Signatory (which is part of allocated parties) cannot be empty for latency benchmark" - ) - ) benchmarkLatency( regularUserServices = regularUserServices, adminServices = adminServices, @@ -223,11 +223,13 @@ class LedgerApiBenchTool( submissionConfigO match { case Some(submissionConfig: FooSubmissionConfig) => val generator: CommandGenerator = new FooCommandGenerator( - randomnessProvider = RandomnessProvider.Default, config = submissionConfig, divulgeesToDivulgerKeyMap = Map.empty, names = names, allocatedParties = allocatedParties, + partySelecting = new RandomPartySelecting( + allocatedParties = allocatedParties + ), ) for { metricsManager <- MetricsManager( @@ -243,6 +245,10 @@ class LedgerApiBenchTool( metricRegistry = metricRegistry, metricsManager = metricsManager, waitForSubmission = true, + partyAllocating = new PartyAllocating( + names = names, + adminServices = adminServices, + ), ) result <- submitter .generateAndSubmit( @@ -278,9 +284,10 @@ class LedgerApiBenchTool( adminServices: LedgerApiServices, submissionConfig: WorkflowConfig.SubmissionConfig, metricRegistry: MetricRegistry, + partyAllocating: PartyAllocating, )(implicit ec: ExecutionContext - ): Future[SubmissionStepResult] = { + ): Future[AllocatedParties] = { val submitter = CommandSubmitter( names = names, @@ -289,6 +296,7 @@ class LedgerApiBenchTool( metricRegistry = metricRegistry, metricsManager = NoOpMetricsManager(), waitForSubmission = submissionConfig.waitForSubmission, + partyAllocating = partyAllocating, ) for { allocatedParties <- submitter.prepare( @@ -322,9 +330,7 @@ class LedgerApiBenchTool( ) } yield () } - } yield SubmissionStepResult( - allocatedParties = allocatedParties - ) + } yield allocatedParties } private def apiServicesOwner( diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/config/WorkflowConfig.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/config/WorkflowConfig.scala index dfbff69d386..5c2e7e8b72c 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/config/WorkflowConfig.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/config/WorkflowConfig.scala @@ -39,13 +39,13 @@ object WorkflowConfig { final case class FooSubmissionConfig( numberOfInstances: Int, numberOfObservers: Int, - numberOfDivulgees: Int, - numberOfExtraSubmitters: Int, uniqueParties: Boolean, instanceDistribution: List[FooSubmissionConfig.ContractDescription], - nonConsumingExercises: Option[NonconsumingExercises], - consumingExercises: Option[ConsumingExercises], - applicationIds: List[FooSubmissionConfig.ApplicationId], + numberOfDivulgees: Int = 0, + numberOfExtraSubmitters: Int = 0, + nonConsumingExercises: Option[NonconsumingExercises] = None, + consumingExercises: Option[ConsumingExercises] = None, + applicationIds: List[FooSubmissionConfig.ApplicationId] = List.empty, maybeWaitForSubmission: Option[Boolean] = None, ) extends SubmissionConfig { def waitForSubmission: Boolean = maybeWaitForSubmission.getOrElse(true) @@ -88,32 +88,35 @@ object WorkflowConfig { } object StreamConfig { + + final case class PartyFilter(party: String, templates: List[String] = List.empty) + final case class TransactionsStreamConfig( name: String, filters: List[PartyFilter], - beginOffset: Option[LedgerOffset], - endOffset: Option[LedgerOffset], - objectives: Option[StreamConfig.TransactionObjectives], - override val maxItemCount: Option[Long], - override val timeoutInSecondsO: Option[Long], + beginOffset: Option[LedgerOffset] = None, + endOffset: Option[LedgerOffset] = None, + objectives: Option[StreamConfig.TransactionObjectives] = None, + override val maxItemCount: Option[Long] = None, + override val timeoutInSecondsO: Option[Long] = None, ) extends StreamConfig final case class TransactionTreesStreamConfig( name: String, filters: List[PartyFilter], - beginOffset: Option[LedgerOffset], - endOffset: Option[LedgerOffset], - objectives: Option[StreamConfig.TransactionObjectives], - override val maxItemCount: Option[Long], - override val timeoutInSecondsO: Option[Long], + beginOffset: Option[LedgerOffset] = None, + endOffset: Option[LedgerOffset] = None, + objectives: Option[StreamConfig.TransactionObjectives] = None, + override val maxItemCount: Option[Long] = None, + override val timeoutInSecondsO: Option[Long] = None, ) extends StreamConfig final case class ActiveContractsStreamConfig( name: String, filters: List[PartyFilter], - objectives: Option[StreamConfig.RateObjectives], - override val maxItemCount: Option[Long], - override val timeoutInSecondsO: Option[Long], + objectives: Option[StreamConfig.RateObjectives] = None, + override val maxItemCount: Option[Long] = None, + override val timeoutInSecondsO: Option[Long] = None, ) extends StreamConfig final case class CompletionsStreamConfig( @@ -126,8 +129,6 @@ object WorkflowConfig { override val timeoutInSecondsO: Option[Long], ) extends StreamConfig - final case class PartyFilter(party: String, templates: List[String]) - case class TransactionObjectives( maxDelaySeconds: Option[Long], minConsumptionSpeed: Option[Double], diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/config/WorkflowConfigParser.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/config/WorkflowConfigParser.scala index 3e9021cca7b..1beddb5fde1 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/config/WorkflowConfigParser.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/config/WorkflowConfigParser.scala @@ -134,10 +134,10 @@ object WorkflowConfigParser { Decoder.forProduct10( "num_instances", "num_observers", - "num_divulgees", - "num_extra_submitters", "unique_parties", "instance_distribution", + "num_divulgees", + "num_extra_submitters", "nonconsuming_exercises", "consuming_exercises", "application_ids", diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/AllocatedParties.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/AllocatedParties.scala new file mode 100644 index 00000000000..08dd15692fa --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/AllocatedParties.scala @@ -0,0 +1,37 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.submission + +import com.daml.ledger.client.binding.Primitive + +case class AllocatedParties( + signatoryO: Option[Primitive.Party], + observers: List[Primitive.Party], + divulgees: List[Primitive.Party], + extraSubmitters: List[Primitive.Party], +) { + val allAllocatedParties: List[Primitive.Party] = + signatoryO.toList ++ observers ++ divulgees ++ extraSubmitters + + /** NOTE: This is guaranteed to be safe only for runs with synthetic data generated by Benchtool + */ + def signatory: Primitive.Party = signatoryO.getOrElse(sys.error("Signatory party not found!")) +} + +object AllocatedParties { + def forExistingParties(parties: List[String]): AllocatedParties = { + val partiesPrefixMap: Map[String, List[Primitive.Party]] = parties + .groupBy(Names.getPartyNamePrefix) + .view + .mapValues(_.map(Primitive.Party(_))) + .toMap + AllocatedParties( + // NOTE: For synthetic streams signatory is always present + signatoryO = partiesPrefixMap.getOrElse(Names.SignatoryPrefix, List.empty).headOption, + observers = partiesPrefixMap.getOrElse(Names.ObserverPrefix, List.empty), + divulgees = partiesPrefixMap.getOrElse(Names.DivulgeePrefix, List.empty), + extraSubmitters = partiesPrefixMap.getOrElse(Names.ExtraSubmitterPrefix, List.empty), + ) + } +} diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/CommandSubmitter.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/CommandSubmitter.scala index 859d2ab1068..c6d6ffb4096 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/CommandSubmitter.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/CommandSubmitter.scala @@ -25,23 +25,15 @@ import scala.concurrent.{ExecutionContext, Future} import scala.util.chaining._ import scala.util.control.NonFatal -case class AllocatedParties( - signatory: client.binding.Primitive.Party, - observers: List[client.binding.Primitive.Party], - divulgees: List[client.binding.Primitive.Party], - extraSubmitters: List[client.binding.Primitive.Party], -) { - val allAllocatedParties: List[Primitive.Party] = - List(signatory) ++ observers ++ divulgees ++ extraSubmitters -} - case class CommandSubmitter( names: Names, benchtoolUserServices: LedgerApiServices, adminServices: LedgerApiServices, + partyAllocating: PartyAllocating, metricRegistry: MetricRegistry, metricsManager: MetricsManager[LatencyNanos], waitForSubmission: Boolean, + commandGenerationParallelism: Int = 8, ) { private val logger = LoggerFactory.getLogger(getClass) private val submitLatencyTimer = if (waitForSubmission) { @@ -53,31 +45,11 @@ case class CommandSubmitter( def prepare(config: SubmissionConfig)(implicit ec: ExecutionContext ): Future[AllocatedParties] = { - val observerPartyNames = - names.observerPartyNames(config.numberOfObservers, config.uniqueParties) - val divulgeePartyNames = - names.divulgeePartyNames(config.numberOfDivulgees, config.uniqueParties) - val extraSubmittersPartyNames = - names.extraSubmitterPartyNames(config.numberOfExtraSubmitters, config.uniqueParties) - - logger.info("Generating contracts...") logger.info(s"Identifier suffix: ${names.identifierSuffix}") (for { - known <- lookupExistingParties() - signatory <- allocateSignatoryParty(known) - observers <- allocateParties(observerPartyNames, known) - divulgees <- allocateParties(divulgeePartyNames, known) - extraSubmitters <- allocateParties(extraSubmittersPartyNames, known) + allocatedParties <- partyAllocating.allocateParties(config) _ <- uploadTestDars() - } yield { - logger.info("Prepared command submission.") - AllocatedParties( - signatory = signatory, - observers = observers, - divulgees = divulgees, - extraSubmitters = extraSubmitters, - ) - }) + } yield allocatedParties) .recoverWith { case NonFatal(ex) => logger.error( s"Command submission preparation failed. Details: ${ex.getLocalizedMessage}", @@ -129,31 +101,6 @@ case class CommandSubmitter( } } - private def allocateSignatoryParty(known: Set[String])(implicit - ec: ExecutionContext - ): Future[Primitive.Party] = - lookupOrAllocateParty(names.signatoryPartyName, known) - - private def allocateParties(partyNames: Seq[String], known: Set[String])(implicit - ec: ExecutionContext - ): Future[List[Primitive.Party]] = { - Future.traverse(partyNames.toList)(lookupOrAllocateParty(_, known)) - } - - private def lookupExistingParties()(implicit ec: ExecutionContext): Future[Set[String]] = { - adminServices.partyManagementService.listKnownParties() - } - - private def lookupOrAllocateParty(party: String, known: Set[String])(implicit - ec: ExecutionContext - ): Future[Primitive.Party] = { - if (known.contains(party)) { - logger.info(s"Found known party: $party") - Future.successful(Primitive.Party(party)) - } else - adminServices.partyManagementService.allocateParty(party) - } - private def uploadDar(dar: TestDars.DarFile, submissionId: String)(implicit ec: ExecutionContext ): Future[Unit] = @@ -162,7 +109,8 @@ case class CommandSubmitter( submissionId = submissionId, ) - private def uploadTestDars()(implicit ec: ExecutionContext): Future[Unit] = + private def uploadTestDars()(implicit ec: ExecutionContext): Future[Unit] = { + logger.info("Uploading dars...") for { dars <- Future.fromTry(TestDars.readAll()) _ <- Future.sequence { @@ -171,7 +119,10 @@ case class CommandSubmitter( uploadDar(dar, names.darId(index)) } } - } yield () + } yield { + logger.info("Uplading dars completed") + } + } private def submit( id: String, @@ -232,7 +183,7 @@ case class CommandSubmitter( _ <- Source .fromIterator(() => (1 to config.numberOfInstances).iterator) .wireTap(i => if (i == 1) progressMeter.start()) - .mapAsync(8)(index => + .mapAsync(commandGenerationParallelism)(index => Future.fromTry( generator.next().map(cmd => index -> cmd) ) diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/FooCommandGenerator.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/FooCommandGenerator.scala index c9d82b19b40..882f19645d3 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/FooCommandGenerator.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/FooCommandGenerator.scala @@ -3,16 +3,15 @@ package com.daml.ledger.api.benchtool.submission -import com.daml.ledger.api.benchtool.config.WorkflowConfig.FooSubmissionConfig -import com.daml.ledger.api.v1.commands.Command -import com.daml.ledger.api.v1.commands.ExerciseByKeyCommand -import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value} -import com.daml.ledger.client.binding.Primitive -import com.daml.ledger.test.benchtool.Foo._ - import java.util.concurrent.atomic.AtomicLong +import com.daml.ledger.api.benchtool.config.WorkflowConfig.FooSubmissionConfig +import com.daml.ledger.api.benchtool.submission.foo.RandomPartySelecting +import com.daml.ledger.api.v1.commands.{Command, ExerciseByKeyCommand} +import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value} import com.daml.ledger.client.binding +import com.daml.ledger.client.binding.Primitive +import com.daml.ledger.test.benchtool.Foo._ import scala.util.control.NonFatal import scala.util.{Failure, Try} @@ -20,11 +19,13 @@ import scala.util.{Failure, Try} /** @param divulgeesToDivulgerKeyMap map whose keys are sorted divulgees lists */ final class FooCommandGenerator( - randomnessProvider: RandomnessProvider, config: FooSubmissionConfig, allocatedParties: AllocatedParties, divulgeesToDivulgerKeyMap: Map[Set[Primitive.Party], Value], names: Names, + partySelecting: RandomPartySelecting, + defaultRandomnessProvider: RandomnessProvider = RandomnessProvider.Default, + consumingEventsRandomnessProvider: RandomnessProvider = RandomnessProvider.Default, ) extends CommandGenerator { private val contractDescriptions = new Distribution[FooSubmissionConfig.ContractDescription]( weights = config.instanceDistribution.map(_.weight), @@ -39,30 +40,20 @@ final class FooCommandGenerator( ) ) - private val observersWithUnlikelihood: List[(Primitive.Party, Int)] = unlikelihoods( - allocatedParties.observers - ) - private val divulgeesWithUnlikelihood: List[(Primitive.Party, Int)] = unlikelihoods( - allocatedParties.divulgees - ) - private val extraSubmittersWithUnlikelihood: List[(Primitive.Party, Int)] = unlikelihoods( - allocatedParties.extraSubmitters - ) - override def next(): Try[Seq[Command]] = (for { - (contractDescription, observers, divulgees) <- Try( + (contractDescription, partySelection) <- Try( ( pickContractDescription(), - pickParties(observersWithUnlikelihood), - pickParties(divulgeesWithUnlikelihood).toSet, + partySelecting.nextPartiesForContracts(), ) ) + divulgees = partySelection.divulgees.toSet createContractPayload <- Try(randomPayload(contractDescription.payloadSizeBytes)) command = createCommands( templateDescriptor = FooTemplateDescriptor.forName(contractDescription.template), signatory = allocatedParties.signatory, - observers = observers, + observers = partySelection.observers, divulgerContractKeyO = if (divulgees.isEmpty) None else divulgeesToDivulgerKeyMap.get(divulgees), payload = createContractPayload, @@ -80,23 +71,16 @@ final class FooCommandGenerator( applicationIdsDistributionO.fold( names.benchtoolApplicationId )(applicationIdsDistribution => - applicationIdsDistribution.choose(randomnessProvider.randomDouble()).applicationId + applicationIdsDistribution.choose(defaultRandomnessProvider.randomDouble()).applicationId ) } override def nextExtraCommandSubmitters(): List[Primitive.Party] = { - pickParties(extraSubmittersWithUnlikelihood) + partySelecting.nextExtraSubmitter() } private def pickContractDescription(): FooSubmissionConfig.ContractDescription = - contractDescriptions.choose(randomnessProvider.randomDouble()) - - private def pickParties(unlikelihoods: List[(Primitive.Party, Int)]): List[Primitive.Party] = - unlikelihoods - .collect { case (party, unlikelihood) if randomDraw(unlikelihood) => party } - - private def randomDraw(unlikelihood: Int): Boolean = - randomnessProvider.randomNatural(unlikelihood) == 0 + contractDescriptions.choose(defaultRandomnessProvider.randomDouble()) private def createCommands( templateDescriptor: FooTemplateDescriptor, @@ -133,7 +117,7 @@ final class FooCommandGenerator( // Consuming events val consumingPayloadO: Option[String] = config.consumingExercises .flatMap(config => - if (randomnessProvider.randomDouble() <= config.probability) { + if (consumingEventsRandomnessProvider.randomDouble() <= config.probability) { Some(randomPayload(config.payloadSizeBytes)) } else None ) @@ -196,7 +180,7 @@ final class FooCommandGenerator( val nonconsumingExercisePayloads: Seq[String] = config.nonConsumingExercises.fold(Seq.empty[String]) { config => var f = config.probability.toInt - if (randomnessProvider.randomDouble() <= config.probability - f) { + if (defaultRandomnessProvider.randomDouble() <= config.probability - f) { f += 1 } Seq.fill[String](f)(randomPayload(config.payloadSizeBytes)) @@ -282,14 +266,8 @@ final class FooCommandGenerator( } private def randomPayload(sizeBytes: Int): String = - FooCommandGenerator.randomPayload(randomnessProvider, sizeBytes) + FooCommandGenerator.randomPayload(defaultRandomnessProvider, sizeBytes) - private def unlikelihoods(orderedParties: List[Primitive.Party]): List[(Primitive.Party, Int)] = - orderedParties.zipWithIndex.toMap.view.mapValues(unlikelihood).toList - - /** @return denominator of a 1/(10**i) likelihood - */ - private def unlikelihood(i: Int): Int = math.pow(10.0, i.toDouble).toInt } object FooCommandGenerator { diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/FooSubmission.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/FooSubmission.scala index 361c8817955..188a4ef6275 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/FooSubmission.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/FooSubmission.scala @@ -4,6 +4,7 @@ package com.daml.ledger.api.benchtool.submission import com.daml.ledger.api.benchtool.config.WorkflowConfig.FooSubmissionConfig +import com.daml.ledger.api.benchtool.submission.foo.RandomPartySelecting import scala.concurrent.{ExecutionContext, Future} @@ -14,6 +15,8 @@ class FooSubmission( submissionConfig: FooSubmissionConfig, allocatedParties: AllocatedParties, names: Names, + partySelectingRandomnessProvider: RandomnessProvider = RandomnessProvider.Default, + consumingEventsRandomnessProvider: RandomnessProvider = RandomnessProvider.Default, ) { def performSubmission()(implicit @@ -24,7 +27,11 @@ class FooSubmission( divulgingParty = allocatedParties.signatory, allDivulgees = allocatedParties.divulgees, ) - + val partySelecting = + new RandomPartySelecting( + allocatedParties = allocatedParties, + randomnessProvider = partySelectingRandomnessProvider, + ) for { _ <- if (divulgerCmds.nonEmpty) { @@ -41,11 +48,13 @@ class FooSubmission( Future.unit } generator: CommandGenerator = new FooCommandGenerator( - randomnessProvider = RandomnessProvider.Default, + defaultRandomnessProvider = RandomnessProvider.Default, config = submissionConfig, divulgeesToDivulgerKeyMap = divulgeesToDivulgerKeyMap, names = names, allocatedParties = allocatedParties, + partySelecting = partySelecting, + consumingEventsRandomnessProvider = consumingEventsRandomnessProvider, ) _ <- submitter .generateAndSubmit( diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/Names.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/Names.scala index 6e6d9d12390..0236c4deeee 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/Names.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/Names.scala @@ -7,33 +7,54 @@ package com.daml.ledger.api.benchtool.submission */ class Names { + import Names.{ + SignatoryPrefix, + PartyPrefixSeparatorChar, + ObserverPrefix, + DivulgeePrefix, + ExtraSubmitterPrefix, + } + val identifierSuffix = f"${System.nanoTime}%x" val benchtoolApplicationId = "benchtool" val benchtoolUserId: String = benchtoolApplicationId val workflowId = s"$benchtoolApplicationId-$identifierSuffix" - val signatoryPartyName = s"signatory-$identifierSuffix" + val signatoryPartyName = s"$SignatoryPrefix$PartyPrefixSeparatorChar$identifierSuffix" def observerPartyNames(numberOfObservers: Int, uniqueParties: Boolean): Seq[String] = - partyNames("Obs", numberOfObservers, uniqueParties) + partyNames(ObserverPrefix, numberOfObservers, uniqueParties) def divulgeePartyNames(numberOfDivulgees: Int, uniqueParties: Boolean): Seq[String] = - partyNames("Div", numberOfDivulgees, uniqueParties) + partyNames(DivulgeePrefix, numberOfDivulgees, uniqueParties) def extraSubmitterPartyNames(numberOfExtraSubmitters: Int, uniqueParties: Boolean): Seq[String] = - partyNames("Sub", numberOfExtraSubmitters, uniqueParties) + partyNames(ExtraSubmitterPrefix, numberOfExtraSubmitters, uniqueParties) def commandId(index: Int): String = s"command-$index-$identifierSuffix" def darId(index: Int) = s"submission-dars-$index-$identifierSuffix" - private def partyNames( - baseName: String, + def partyNames( + prefix: String, numberOfParties: Int, uniqueParties: Boolean, ): Seq[String] = - (0 until numberOfParties).map(i => partyName(baseName, i, uniqueParties)) + (0 until numberOfParties).map(i => partyName(prefix, i, uniqueParties)) private def partyName(baseName: String, index: Int, uniqueParties: Boolean): String = - s"$baseName-$index" + (if (uniqueParties) identifierSuffix else "") + s"$baseName$PartyPrefixSeparatorChar$index" + (if (uniqueParties) identifierSuffix else "") + +} + +object Names { + protected val PartyPrefixSeparatorChar: Char = '-' + val SignatoryPrefix = "signatory" + val ObserverPrefix = "Obs" + val DivulgeePrefix = "Div" + val ExtraSubmitterPrefix = "Sub" + + def getPartyNamePrefix(partyName: String): String = { + partyName.split(Names.PartyPrefixSeparatorChar)(0) + } } diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/PartyAllocating.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/PartyAllocating.scala new file mode 100644 index 00000000000..2a840f33251 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/PartyAllocating.scala @@ -0,0 +1,72 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.submission + +import com.daml.ledger.api.benchtool.config.WorkflowConfig.SubmissionConfig +import com.daml.ledger.api.benchtool.services.LedgerApiServices +import com.daml.ledger.client.binding.Primitive +import org.slf4j.LoggerFactory + +import scala.concurrent.{ExecutionContext, Future} + +class PartyAllocating( + names: Names, + adminServices: LedgerApiServices, +) { + + private val logger = LoggerFactory.getLogger(getClass) + + def allocateParties(config: SubmissionConfig)(implicit + ec: ExecutionContext + ): Future[AllocatedParties] = { + val observerPartyNames = + names.observerPartyNames(config.numberOfObservers, config.uniqueParties) + val divulgeePartyNames = + names.divulgeePartyNames(config.numberOfDivulgees, config.uniqueParties) + val extraSubmittersPartyNames = + names.extraSubmitterPartyNames(config.numberOfExtraSubmitters, config.uniqueParties) + logger.info("Allocating parties...") + for { + known <- lookupExistingParties() + signatory <- allocateSignatoryParty(known) + observers <- allocateParties(observerPartyNames, known) + divulgees <- allocateParties(divulgeePartyNames, known) + extraSubmitters <- allocateParties(extraSubmittersPartyNames, known) + } yield { + logger.info("Allocating parties completed") + AllocatedParties( + signatoryO = Some(signatory), + observers = observers, + divulgees = divulgees, + extraSubmitters = extraSubmitters, + ) + } + } + + def lookupExistingParties()(implicit ec: ExecutionContext): Future[Set[String]] = { + adminServices.partyManagementService.listKnownParties() + } + + private def allocateSignatoryParty(known: Set[String])(implicit + ec: ExecutionContext + ): Future[Primitive.Party] = + lookupOrAllocateParty(names.signatoryPartyName, known) + + private def allocateParties(partyNames: Seq[String], known: Set[String])(implicit + ec: ExecutionContext + ): Future[List[Primitive.Party]] = { + Future.traverse(partyNames.toList)(lookupOrAllocateParty(_, known)) + } + + private def lookupOrAllocateParty(party: String, known: Set[String])(implicit + ec: ExecutionContext + ): Future[Primitive.Party] = { + if (known.contains(party)) { + logger.info(s"Found known party: $party") + Future.successful(Primitive.Party(party)) + } else + adminServices.partyManagementService.allocateParty(party) + } + +} diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/RandomnessProvider.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/RandomnessProvider.scala index c3ba4555f68..bb5a6257d0b 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/RandomnessProvider.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/RandomnessProvider.scala @@ -12,8 +12,12 @@ trait RandomnessProvider { } object RandomnessProvider { - object Default extends RandomnessProvider { - private val r = new scala.util.Random(System.currentTimeMillis()) + object Default extends Seeded(System.currentTimeMillis()) + + def forSeed(seed: Long) = new Seeded(seed = seed) + + class Seeded(seed: Long) extends RandomnessProvider { + private val r = new scala.util.Random(seed) override def randomDouble(): Double = r.nextDouble() override def randomNatural(n: Int): Int = r.nextInt(n) override def randomAsciiString(n: Int): String = { diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/foo/PartiesSelection.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/foo/PartiesSelection.scala new file mode 100644 index 00000000000..62164d0e0bd --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/foo/PartiesSelection.scala @@ -0,0 +1,11 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.submission.foo + +import com.daml.ledger.client.binding.Primitive + +case class PartiesSelection( + observers: List[Primitive.Party], + divulgees: List[Primitive.Party], +) diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/foo/RandomPartySelecting.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/foo/RandomPartySelecting.scala new file mode 100644 index 00000000000..ebf4b3b5077 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/submission/foo/RandomPartySelecting.scala @@ -0,0 +1,45 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.submission.foo + +import com.daml.ledger.api.benchtool.submission.{AllocatedParties, RandomnessProvider} +import com.daml.ledger.client.binding.Primitive + +class RandomPartySelecting( + allocatedParties: AllocatedParties, + randomnessProvider: RandomnessProvider = RandomnessProvider.Default, +) { + + private val observersProbability = probabilitiesByPartyIndex(allocatedParties.observers) + private val divulgeesProbability = probabilitiesByPartyIndex(allocatedParties.divulgees) + private val extraSubmittersProbability = probabilitiesByPartyIndex( + allocatedParties.extraSubmitters + ) + + def nextPartiesForContracts(): PartiesSelection = { + PartiesSelection( + observers = pickParties(observersProbability), + divulgees = pickParties(divulgeesProbability), + ) + } + + def nextExtraSubmitter(): List[Primitive.Party] = pickParties(extraSubmittersProbability) + + private def pickParties(probabilities: List[(Primitive.Party, Double)]): List[Primitive.Party] = + probabilities + .collect { case (party, probability) if randomBoolean(probability) => party } + + private def randomBoolean(truthProbability: Double): Boolean = + randomnessProvider.randomDouble() <= truthProbability + + private def probabilitiesByPartyIndex( + orderedParties: List[Primitive.Party] + ): List[(Primitive.Party, Double)] = + orderedParties.zipWithIndex.toMap.view.mapValues(probabilityBaseTen).toList + + /** @return probability of a 1/(10**i) + */ + private def probabilityBaseTen(i: Int): Double = math.pow(10.0, -i.toDouble) + +} diff --git a/ledger/ledger-api-bench-tool/src/test/lib/scala/com/daml/ledger/api/benchtool/BenchtoolSandboxFixture.scala b/ledger/ledger-api-bench-tool/src/test/lib/scala/com/daml/ledger/api/benchtool/BenchtoolSandboxFixture.scala index 556686572c7..c5c398f5c74 100644 --- a/ledger/ledger-api-bench-tool/src/test/lib/scala/com/daml/ledger/api/benchtool/BenchtoolSandboxFixture.scala +++ b/ledger/ledger-api-bench-tool/src/test/lib/scala/com/daml/ledger/api/benchtool/BenchtoolSandboxFixture.scala @@ -5,11 +5,17 @@ package com.daml.ledger.api.benchtool import java.io.File +import com.codahale.metrics.MetricRegistry import com.daml.bazeltools.BazelRunfiles.rlocation +import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager +import com.daml.ledger.api.benchtool.services.LedgerApiServices +import com.daml.ledger.api.benchtool.submission.{CommandSubmitter, Names, PartyAllocating} import com.daml.ledger.test.BenchtoolTestDar import com.daml.platform.sandbox.fixture.SandboxFixture import org.scalatest.Suite +import scala.concurrent.{ExecutionContext, Future} + trait BenchtoolSandboxFixture extends SandboxFixture { self: Suite => @@ -17,4 +23,33 @@ trait BenchtoolSandboxFixture extends SandboxFixture { new File(rlocation(BenchtoolTestDar.path)) ) + def benchtoolFixture()(implicit + ec: ExecutionContext + ): Future[(LedgerApiServices, Names, CommandSubmitter)] = { + for { + ledgerApiServicesF <- LedgerApiServices.forChannel( + channel = channel, + authorizationHelper = None, + ) + apiServices: LedgerApiServices = ledgerApiServicesF("someUser") + names = new Names() + submitter = CommandSubmitter( + names = names, + benchtoolUserServices = apiServices, + adminServices = apiServices, + metricRegistry = new MetricRegistry, + metricsManager = NoOpMetricsManager(), + waitForSubmission = true, + partyAllocating = new PartyAllocating( + names = names, + adminServices = apiServices, + ), + ) + } yield ( + apiServices, + names, + submitter, + ) + } + } diff --git a/ledger/ledger-api-bench-tool/src/test/lib/scala/com/daml/ledger/api/benchtool/submission/TreeEventsObserver.scala b/ledger/ledger-api-bench-tool/src/test/lib/scala/com/daml/ledger/api/benchtool/submission/TreeEventsObserver.scala index b5e7caf65fd..29c908cf0b1 100644 --- a/ledger/ledger-api-bench-tool/src/test/lib/scala/com/daml/ledger/api/benchtool/submission/TreeEventsObserver.scala +++ b/ledger/ledger-api-bench-tool/src/test/lib/scala/com/daml/ledger/api/benchtool/submission/TreeEventsObserver.scala @@ -80,6 +80,11 @@ case class ObservedEvents( expectedTemplateNames.map(name => name -> groups.get(name).fold(0)(_.size)).toMap } + val numberOfNonConsumingExercisesPerTemplateName: Map[String, Int] = { + val groups = nonConsumingExercises.groupBy(_.templateName) + expectedTemplateNames.map(name => name -> groups.get(name).fold(0)(_.size)).toMap + } + val avgSizeOfCreateEventPerTemplateName: Map[String, Int] = { val groups = createEvents.groupBy(_.templateName) expectedTemplateNames.map { name => diff --git a/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/ConfigEnricherSpec.scala b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/ConfigEnricherSpec.scala new file mode 100644 index 00000000000..df1e95929f6 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/ConfigEnricherSpec.scala @@ -0,0 +1,73 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool + +import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig.{ + PartyFilter, + TransactionsStreamConfig, +} +import com.daml.ledger.api.benchtool.submission.AllocatedParties +import com.daml.ledger.client.binding.Primitive +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import scalaz.syntax.tag._ + +class ConfigEnricherSpec extends AnyFlatSpec with Matchers { + + it should "expand party-set filter into a sequence of party filters" in { + val partySuffix = "-foo-123" + + def makePartyName(shortName: String): String = s"$shortName$partySuffix" + def makeParty(shortName: String): Primitive.Party = Primitive.Party(makePartyName(shortName)) + + val desugaring = new ConfigEnricher( + allocatedParties = AllocatedParties( + signatoryO = Some(makeParty("Sig-0")), + observers = List(makeParty("Obs-0")), + divulgees = List(makeParty("Div-0")), + extraSubmitters = List(makeParty("Sub-0")), + ) + ) + val templates: List[String] = List("otherTemplate", "Foo1") + val foo1Id = com.daml.ledger.test.benchtool.Foo.Foo1.id.unwrap + val enrichedTemplates: List[String] = + List("otherTemplate", s"${foo1Id.packageId}:${foo1Id.moduleName}:${foo1Id.entityName}") + + desugaring.enrichStreamConfig( + TransactionsStreamConfig( + name = "flat", + filters = List( + PartyFilter( + party = "Obs-0", + templates = templates, + ), + PartyFilter( + party = "Sig-0", + templates = templates, + ), + PartyFilter( + party = "UnknownParty-0", + templates = templates, + ), + ), + ) + ) shouldBe TransactionsStreamConfig( + name = "flat", + filters = List( + PartyFilter( + party = "Obs-0-foo-123", + templates = enrichedTemplates, + ), + PartyFilter( + party = "Sig-0-foo-123", + templates = enrichedTemplates, + ), + PartyFilter( + party = "UnknownParty-0", + templates = enrichedTemplates, + ), + ), + ) + } +} diff --git a/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/AllocatedPartiesSpec.scala b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/AllocatedPartiesSpec.scala new file mode 100644 index 00000000000..4e36fb6865e --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/AllocatedPartiesSpec.scala @@ -0,0 +1,45 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.submission + +import com.daml.ledger.client.binding.Primitive +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class AllocatedPartiesSpec extends AnyFlatSpec with Matchers { + + it should "apportion parties appropriately" in { + AllocatedParties.forExistingParties( + parties = List( + "signatory-123", + "Obs-0", + "Obs-1", + "Div-0", + "Sub-0", + ) + ) shouldBe AllocatedParties( + signatoryO = Some(Primitive.Party("signatory-123")), + observers = List( + Primitive.Party("Obs-0"), + Primitive.Party("Obs-1"), + ), + divulgees = List(Primitive.Party("Div-0")), + extraSubmitters = List(Primitive.Party("Sub-0")), + ) + } + + it should "apportion parties appropriately - minimal" in { + AllocatedParties.forExistingParties( + parties = List( + "signatory-123" + ) + ) shouldBe AllocatedParties( + signatoryO = Some(Primitive.Party("signatory-123")), + observers = List.empty, + divulgees = List.empty, + extraSubmitters = List.empty, + ) + } + +} diff --git a/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/FibonacciCommandSubmitterITSpec.scala b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/FibonacciCommandSubmitterITSpec.scala index b36f2f8395f..718b82e1c00 100644 --- a/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/FibonacciCommandSubmitterITSpec.scala +++ b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/FibonacciCommandSubmitterITSpec.scala @@ -3,11 +3,8 @@ package com.daml.ledger.api.benchtool.submission -import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.benchtool.BenchtoolSandboxFixture import com.daml.ledger.api.benchtool.config.WorkflowConfig -import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager -import com.daml.ledger.api.benchtool.services.LedgerApiServices import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll import com.daml.ledger.api.v1.ledger_offset.LedgerOffset import org.scalatest.AppendedClues @@ -31,28 +28,15 @@ class FibonacciCommandSubmitterITSpec ) for { - ledgerApiServicesF <- LedgerApiServices.forChannel( - channel = channel, - authorizationHelper = None, - ) - apiServices = ledgerApiServicesF("someUser") - names = new Names() - tested = CommandSubmitter( - names = names, - benchtoolUserServices = apiServices, - adminServices = apiServices, - metricRegistry = new MetricRegistry, - metricsManager = NoOpMetricsManager(), - waitForSubmission = config.waitForSubmission, - ) - allocatedParties <- tested.prepare(config) + (apiServices, names, submitter) <- benchtoolFixture() + allocatedParties <- submitter.prepare(config) _ = allocatedParties.divulgees shouldBe empty generator = new FibonacciCommandGenerator( signatory = allocatedParties.signatory, config = config, names = names, ) - _ <- tested.generateAndSubmit( + _ <- submitter.generateAndSubmit( generator = generator, config = config, baseActAs = List(allocatedParties.signatory) ++ allocatedParties.divulgees, diff --git a/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/FooCommandSubmitterITSpec.scala b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/FooCommandSubmitterITSpec.scala index 4f302bb6dcb..bc8820aee99 100644 --- a/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/FooCommandSubmitterITSpec.scala +++ b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/FooCommandSubmitterITSpec.scala @@ -3,14 +3,12 @@ package com.daml.ledger.api.benchtool.submission -import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.benchtool.BenchtoolSandboxFixture import com.daml.ledger.api.benchtool.config.WorkflowConfig import com.daml.ledger.api.benchtool.config.WorkflowConfig.FooSubmissionConfig.{ ConsumingExercises, NonconsumingExercises, } -import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager import com.daml.ledger.api.benchtool.services.LedgerApiServices import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll import com.daml.ledger.api.v1.ledger_offset.LedgerOffset @@ -64,20 +62,7 @@ class FooCommandSubmitterITSpec ) for { - ledgerApiServicesF <- LedgerApiServices.forChannel( - channel = channel, - authorizationHelper = None, - ) - apiServices = ledgerApiServicesF("someUser") - names = new Names() - submitter = CommandSubmitter( - names = names, - benchtoolUserServices = apiServices, - adminServices = apiServices, - metricRegistry = new MetricRegistry, - metricsManager = NoOpMetricsManager(), - waitForSubmission = false, - ) + (apiServices, names, submitter) <- benchtoolFixture() allocatedParties <- submitter.prepare(config) _ = allocatedParties.divulgees shouldBe empty tested = new FooSubmission( diff --git a/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/NonStakeholderInformeesITSpec.scala b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/NonStakeholderInformeesITSpec.scala index 078402a783c..78074efbad1 100644 --- a/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/NonStakeholderInformeesITSpec.scala +++ b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/NonStakeholderInformeesITSpec.scala @@ -3,11 +3,9 @@ package com.daml.ledger.api.benchtool.submission -import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.benchtool.BenchtoolSandboxFixture import com.daml.ledger.api.benchtool.config.WorkflowConfig import com.daml.ledger.api.benchtool.config.WorkflowConfig.FooSubmissionConfig.ConsumingExercises -import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager import com.daml.ledger.api.benchtool.services.LedgerApiServices import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll import com.daml.ledger.api.v1.ledger_offset.LedgerOffset @@ -51,20 +49,7 @@ class NonStakeholderInformeesITSpec applicationIds = List.empty, ) for { - ledgerApiServicesF <- LedgerApiServices.forChannel( - channel = channel, - authorizationHelper = None, - ) - apiServices: LedgerApiServices = ledgerApiServicesF("someUser") - names = new Names() - submitter = CommandSubmitter( - names = names, - benchtoolUserServices = apiServices, - adminServices = apiServices, - metricRegistry = new MetricRegistry, - metricsManager = NoOpMetricsManager(), - waitForSubmission = true, - ) + (apiServices, names, submitter) <- benchtoolFixture() allocatedParties <- submitter.prepare(submissionConfig) tested = new FooSubmission( submitter = submitter, diff --git a/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/PartyAllocationITSpec.scala b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/PartyAllocationITSpec.scala index 69c8bc79e76..6d00a28a622 100644 --- a/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/PartyAllocationITSpec.scala +++ b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/PartyAllocationITSpec.scala @@ -3,11 +3,8 @@ package com.daml.ledger.api.benchtool.submission -import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.benchtool.BenchtoolSandboxFixture import com.daml.ledger.api.benchtool.config.WorkflowConfig -import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager -import com.daml.ledger.api.benchtool.services.LedgerApiServices import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll import org.scalatest.AppendedClues import org.scalatest.flatspec.AsyncFlatSpec @@ -35,19 +32,7 @@ class PartyAllocationITSpec ) for { - ledgerApiServicesF <- LedgerApiServices.forChannel( - authorizationHelper = None, - channel = channel, - ) - apiServices = ledgerApiServicesF("someUser") - submitter = CommandSubmitter( - names = new Names(), - benchtoolUserServices = apiServices, - adminServices = apiServices, - metricRegistry = new MetricRegistry, - metricsManager = NoOpMetricsManager(), - waitForSubmission = false, - ) + (_, _, submitter) <- benchtoolFixture() parties1 <- submitter.prepare(config) parties2 <- submitter.prepare(config) } yield { diff --git a/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/WeightedApplicationIdsAndSubmittersITSpec.scala b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/WeightedApplicationIdsAndSubmittersITSpec.scala index 3f221784f44..ce31c4b5532 100644 --- a/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/WeightedApplicationIdsAndSubmittersITSpec.scala +++ b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/submission/WeightedApplicationIdsAndSubmittersITSpec.scala @@ -5,11 +5,9 @@ package com.daml.ledger.api.benchtool.submission import java.util.concurrent.TimeUnit -import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.benchtool.BenchtoolSandboxFixture import com.daml.ledger.api.benchtool.config.WorkflowConfig import com.daml.ledger.api.benchtool.config.WorkflowConfig.FooSubmissionConfig.ApplicationId -import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager import com.daml.ledger.api.benchtool.services.LedgerApiServices import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll import com.daml.ledger.api.v1.ledger_offset.LedgerOffset @@ -34,7 +32,6 @@ class WeightedApplicationIdsAndSubmittersITSpec val submissionConfig = WorkflowConfig.FooSubmissionConfig( numberOfInstances = 100, numberOfObservers = 1, - numberOfDivulgees = 0, numberOfExtraSubmitters = 3, uniqueParties = false, instanceDistribution = List( @@ -58,20 +55,7 @@ class WeightedApplicationIdsAndSubmittersITSpec ), ) for { - ledgerApiServicesF <- LedgerApiServices.forChannel( - channel = channel, - authorizationHelper = None, - ) - apiServices: LedgerApiServices = ledgerApiServicesF("someUser") - names = new Names() - submitter = CommandSubmitter( - names = names, - benchtoolUserServices = apiServices, - adminServices = apiServices, - metricRegistry = new MetricRegistry, - metricsManager = NoOpMetricsManager(), - waitForSubmission = true, - ) + (apiServices, names, submitter) <- benchtoolFixture() allocatedParties <- submitter.prepare(submissionConfig) tested = new FooSubmission( submitter = submitter,