[ETQ-Testing] Prepare Benchtool for extending it to support huge party filters [DPP-1079] (#14246)

This a preparation before the adding support for huge party filters.

Some of the changes:
1. Scan for existing parties even if when there is not submission step configured so that we can enrich stream configs fully.
NOTE: ConfigEnricher will now never fail on missing party.
2. Extract party selection for contract creation to a separate class.
3. Expose `commandGenerationParallelism` and a couple of randomness providers to allow for reproducible party allotment to contracts - useful in tests.
4. Extract `benchtoolFixture` to reduce boilerplate in IT tests


changelog_begin
changelog_end
This commit is contained in:
pbatko-da 2022-06-24 11:33:05 +02:00 committed by GitHub
parent 698ddc6372
commit 7cd677e367
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 503 additions and 273 deletions

View File

@ -78,6 +78,8 @@ da_scala_library(
scala_deps = [
"@maven//:org_scalatest_scalatest_core",
"@maven//:org_scalactic_scalactic",
"@maven//:com_typesafe_akka_akka_actor",
"@maven//:com_typesafe_akka_akka_stream",
],
visibility = ["//visibility:public"],
deps = [
@ -87,9 +89,11 @@ da_scala_library(
"//libs-scala/ports",
"//ledger/test-common:dar-files-%s-lib" % "1.14",
"//language-support/scala/bindings",
"//ledger-api/rs-grpc-bridge",
"//ledger/sandbox-on-x",
"//ledger/sandbox-on-x:sandbox-on-x-test-lib",
"//ledger/test-common:dar-files-default-lib",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:org_slf4j_slf4j_api",
],
)

View File

@ -4,61 +4,71 @@
package com.daml.ledger.api.benchtool
import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig
import com.daml.ledger.api.v1.value.Identifier
import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig.{
ActiveContractsStreamConfig,
CompletionsStreamConfig,
TransactionTreesStreamConfig,
TransactionsStreamConfig,
}
import com.daml.ledger.api.benchtool.submission.AllocatedParties
import com.daml.ledger.test.benchtool.Foo.{Foo1, Foo2, Foo3}
import scalaz.syntax.tag._
object ConfigEnricher {
class ConfigEnricher(allocatedParties: AllocatedParties) {
private val templateNameToFullyQualifiedNameMap: Map[String, String] = List(
Foo1.id,
Foo2.id,
Foo3.id,
).map { templateId =>
val id = templateId.unwrap
id.entityName -> s"${id.packageId}:${id.moduleName}:${id.entityName}"
}.toMap
def enrichStreamConfig(
streamConfig: StreamConfig,
submissionResult: Option[SubmissionStepResult],
streamConfig: StreamConfig
): StreamConfig = {
streamConfig match {
case config: StreamConfig.TransactionsStreamConfig =>
config.copy(filters = enrichFilters(config.filters, submissionResult))
case config: StreamConfig.TransactionTreesStreamConfig =>
config.copy(filters = enrichFilters(config.filters, submissionResult))
case config: StreamConfig.ActiveContractsStreamConfig =>
config.copy(filters = enrichFilters(config.filters, submissionResult))
case config: StreamConfig.CompletionsStreamConfig =>
config.copy(parties = config.parties.map(party => convertParty(party, submissionResult)))
case config: TransactionsStreamConfig =>
config
.copy(
filters = enrichFilters(config.filters)
)
case config: TransactionTreesStreamConfig =>
config
.copy(
filters = enrichFilters(config.filters)
)
case config: ActiveContractsStreamConfig =>
config
.copy(
filters = enrichFilters(config.filters)
)
case config: CompletionsStreamConfig =>
config.copy(parties = config.parties.map(party => convertParty(party)))
}
}
private def convertParty(
party: String,
submissionResult: Option[SubmissionStepResult],
partyShortName: String
): String =
submissionResult match {
case None => party
case Some(summary) =>
summary.allocatedParties.allAllocatedParties
.map(_.unwrap)
.find(_.contains(party))
.getOrElse(throw new RuntimeException(s"Observer not found: $party"))
}
allocatedParties.allAllocatedParties
.map(_.unwrap)
.find(_.contains(partyShortName))
.getOrElse(partyShortName)
private def enrichFilters(
filters: List[StreamConfig.PartyFilter],
submissionResult: Option[SubmissionStepResult],
filters: List[StreamConfig.PartyFilter]
): List[StreamConfig.PartyFilter] = {
def identifierToFullyQualifiedString(id: Identifier) =
s"${id.packageId}:${id.moduleName}:${id.entityName}"
def fullyQualifiedTemplateId(template: String): String =
template match {
case "Foo1" => identifierToFullyQualifiedString(Foo1.id.unwrap)
case "Foo2" => identifierToFullyQualifiedString(Foo2.id.unwrap)
case "Foo3" => identifierToFullyQualifiedString(Foo3.id.unwrap)
case other => other
}
filters.map { filter =>
StreamConfig.PartyFilter(
party = convertParty(filter.party, submissionResult),
templates = filter.templates.map(fullyQualifiedTemplateId),
party = convertParty(filter.party),
templates = filter.templates.map(convertTemplate),
)
}
}
def convertTemplate(shortTemplateName: String): String =
templateNameToFullyQualifiedNameMap.getOrElse(shortTemplateName, shortTemplateName)
}

View File

@ -21,6 +21,7 @@ import com.daml.ledger.api.benchtool.metrics.{
}
import com.daml.ledger.api.benchtool.services.LedgerApiServices
import com.daml.ledger.api.benchtool.submission._
import com.daml.ledger.api.benchtool.submission.foo.RandomPartySelecting
import com.daml.ledger.api.benchtool.util.TypedActorSystemResourceOwner
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.ledger.participant.state.index.v2.UserManagementStore
@ -79,8 +80,6 @@ object LedgerApiBenchTool {
}
case class SubmissionStepResult(allocatedParties: AllocatedParties)
class LedgerApiBenchTool(
names: Names,
authorizationHelper: Option[AuthorizationHelper],
@ -112,25 +111,33 @@ class LedgerApiBenchTool(
val adminServices = servicesForUserId(UserManagementStore.DefaultParticipantAdminUserId)
val regularUserServices = servicesForUserId(names.benchtoolUserId)
val partyAllocating = new PartyAllocating(
names = names,
adminServices = adminServices,
)
for {
_ <- regularUserSetupStep(adminServices)
submissionStepResultO: Option[SubmissionStepResult] <- {
allocatedParties <- {
config.workflow.submission match {
case None =>
logger.info(s"No submission defined. Skipping.")
Future.successful(None)
for {
existingParties <- partyAllocating.lookupExistingParties()
} yield AllocatedParties.forExistingParties(existingParties.toList)
case Some(submissionConfig) =>
submissionStep(
regularUserServices = regularUserServices,
adminServices = adminServices,
submissionConfig = submissionConfig,
metricRegistry = metricRegistry,
).map(Some(_))
partyAllocating = partyAllocating,
)
}
}
configEnricher = new ConfigEnricher(allocatedParties)
updatedStreamConfigs = config.workflow.streams.map(streamsConfig =>
ConfigEnricher.enrichStreamConfig(streamsConfig, submissionStepResultO)
configEnricher.enrichStreamConfig(streamsConfig)
)
_ = logger.info(
@ -138,13 +145,6 @@ class LedgerApiBenchTool(
)
benchmarkResult <-
if (config.latencyTest) {
val allocatedParties = submissionStepResultO
.map(_.allocatedParties)
.getOrElse(
throw new RuntimeException(
"Signatory (which is part of allocated parties) cannot be empty for latency benchmark"
)
)
benchmarkLatency(
regularUserServices = regularUserServices,
adminServices = adminServices,
@ -223,11 +223,13 @@ class LedgerApiBenchTool(
submissionConfigO match {
case Some(submissionConfig: FooSubmissionConfig) =>
val generator: CommandGenerator = new FooCommandGenerator(
randomnessProvider = RandomnessProvider.Default,
config = submissionConfig,
divulgeesToDivulgerKeyMap = Map.empty,
names = names,
allocatedParties = allocatedParties,
partySelecting = new RandomPartySelecting(
allocatedParties = allocatedParties
),
)
for {
metricsManager <- MetricsManager(
@ -243,6 +245,10 @@ class LedgerApiBenchTool(
metricRegistry = metricRegistry,
metricsManager = metricsManager,
waitForSubmission = true,
partyAllocating = new PartyAllocating(
names = names,
adminServices = adminServices,
),
)
result <- submitter
.generateAndSubmit(
@ -278,9 +284,10 @@ class LedgerApiBenchTool(
adminServices: LedgerApiServices,
submissionConfig: WorkflowConfig.SubmissionConfig,
metricRegistry: MetricRegistry,
partyAllocating: PartyAllocating,
)(implicit
ec: ExecutionContext
): Future[SubmissionStepResult] = {
): Future[AllocatedParties] = {
val submitter = CommandSubmitter(
names = names,
@ -289,6 +296,7 @@ class LedgerApiBenchTool(
metricRegistry = metricRegistry,
metricsManager = NoOpMetricsManager(),
waitForSubmission = submissionConfig.waitForSubmission,
partyAllocating = partyAllocating,
)
for {
allocatedParties <- submitter.prepare(
@ -322,9 +330,7 @@ class LedgerApiBenchTool(
)
} yield ()
}
} yield SubmissionStepResult(
allocatedParties = allocatedParties
)
} yield allocatedParties
}
private def apiServicesOwner(

View File

@ -39,13 +39,13 @@ object WorkflowConfig {
final case class FooSubmissionConfig(
numberOfInstances: Int,
numberOfObservers: Int,
numberOfDivulgees: Int,
numberOfExtraSubmitters: Int,
uniqueParties: Boolean,
instanceDistribution: List[FooSubmissionConfig.ContractDescription],
nonConsumingExercises: Option[NonconsumingExercises],
consumingExercises: Option[ConsumingExercises],
applicationIds: List[FooSubmissionConfig.ApplicationId],
numberOfDivulgees: Int = 0,
numberOfExtraSubmitters: Int = 0,
nonConsumingExercises: Option[NonconsumingExercises] = None,
consumingExercises: Option[ConsumingExercises] = None,
applicationIds: List[FooSubmissionConfig.ApplicationId] = List.empty,
maybeWaitForSubmission: Option[Boolean] = None,
) extends SubmissionConfig {
def waitForSubmission: Boolean = maybeWaitForSubmission.getOrElse(true)
@ -88,32 +88,35 @@ object WorkflowConfig {
}
object StreamConfig {
final case class PartyFilter(party: String, templates: List[String] = List.empty)
final case class TransactionsStreamConfig(
name: String,
filters: List[PartyFilter],
beginOffset: Option[LedgerOffset],
endOffset: Option[LedgerOffset],
objectives: Option[StreamConfig.TransactionObjectives],
override val maxItemCount: Option[Long],
override val timeoutInSecondsO: Option[Long],
beginOffset: Option[LedgerOffset] = None,
endOffset: Option[LedgerOffset] = None,
objectives: Option[StreamConfig.TransactionObjectives] = None,
override val maxItemCount: Option[Long] = None,
override val timeoutInSecondsO: Option[Long] = None,
) extends StreamConfig
final case class TransactionTreesStreamConfig(
name: String,
filters: List[PartyFilter],
beginOffset: Option[LedgerOffset],
endOffset: Option[LedgerOffset],
objectives: Option[StreamConfig.TransactionObjectives],
override val maxItemCount: Option[Long],
override val timeoutInSecondsO: Option[Long],
beginOffset: Option[LedgerOffset] = None,
endOffset: Option[LedgerOffset] = None,
objectives: Option[StreamConfig.TransactionObjectives] = None,
override val maxItemCount: Option[Long] = None,
override val timeoutInSecondsO: Option[Long] = None,
) extends StreamConfig
final case class ActiveContractsStreamConfig(
name: String,
filters: List[PartyFilter],
objectives: Option[StreamConfig.RateObjectives],
override val maxItemCount: Option[Long],
override val timeoutInSecondsO: Option[Long],
objectives: Option[StreamConfig.RateObjectives] = None,
override val maxItemCount: Option[Long] = None,
override val timeoutInSecondsO: Option[Long] = None,
) extends StreamConfig
final case class CompletionsStreamConfig(
@ -126,8 +129,6 @@ object WorkflowConfig {
override val timeoutInSecondsO: Option[Long],
) extends StreamConfig
final case class PartyFilter(party: String, templates: List[String])
case class TransactionObjectives(
maxDelaySeconds: Option[Long],
minConsumptionSpeed: Option[Double],

View File

@ -134,10 +134,10 @@ object WorkflowConfigParser {
Decoder.forProduct10(
"num_instances",
"num_observers",
"num_divulgees",
"num_extra_submitters",
"unique_parties",
"instance_distribution",
"num_divulgees",
"num_extra_submitters",
"nonconsuming_exercises",
"consuming_exercises",
"application_ids",

View File

@ -0,0 +1,37 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.submission
import com.daml.ledger.client.binding.Primitive
case class AllocatedParties(
signatoryO: Option[Primitive.Party],
observers: List[Primitive.Party],
divulgees: List[Primitive.Party],
extraSubmitters: List[Primitive.Party],
) {
val allAllocatedParties: List[Primitive.Party] =
signatoryO.toList ++ observers ++ divulgees ++ extraSubmitters
/** NOTE: This is guaranteed to be safe only for runs with synthetic data generated by Benchtool
*/
def signatory: Primitive.Party = signatoryO.getOrElse(sys.error("Signatory party not found!"))
}
object AllocatedParties {
def forExistingParties(parties: List[String]): AllocatedParties = {
val partiesPrefixMap: Map[String, List[Primitive.Party]] = parties
.groupBy(Names.getPartyNamePrefix)
.view
.mapValues(_.map(Primitive.Party(_)))
.toMap
AllocatedParties(
// NOTE: For synthetic streams signatory is always present
signatoryO = partiesPrefixMap.getOrElse(Names.SignatoryPrefix, List.empty).headOption,
observers = partiesPrefixMap.getOrElse(Names.ObserverPrefix, List.empty),
divulgees = partiesPrefixMap.getOrElse(Names.DivulgeePrefix, List.empty),
extraSubmitters = partiesPrefixMap.getOrElse(Names.ExtraSubmitterPrefix, List.empty),
)
}
}

View File

@ -25,23 +25,15 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.chaining._
import scala.util.control.NonFatal
case class AllocatedParties(
signatory: client.binding.Primitive.Party,
observers: List[client.binding.Primitive.Party],
divulgees: List[client.binding.Primitive.Party],
extraSubmitters: List[client.binding.Primitive.Party],
) {
val allAllocatedParties: List[Primitive.Party] =
List(signatory) ++ observers ++ divulgees ++ extraSubmitters
}
case class CommandSubmitter(
names: Names,
benchtoolUserServices: LedgerApiServices,
adminServices: LedgerApiServices,
partyAllocating: PartyAllocating,
metricRegistry: MetricRegistry,
metricsManager: MetricsManager[LatencyNanos],
waitForSubmission: Boolean,
commandGenerationParallelism: Int = 8,
) {
private val logger = LoggerFactory.getLogger(getClass)
private val submitLatencyTimer = if (waitForSubmission) {
@ -53,31 +45,11 @@ case class CommandSubmitter(
def prepare(config: SubmissionConfig)(implicit
ec: ExecutionContext
): Future[AllocatedParties] = {
val observerPartyNames =
names.observerPartyNames(config.numberOfObservers, config.uniqueParties)
val divulgeePartyNames =
names.divulgeePartyNames(config.numberOfDivulgees, config.uniqueParties)
val extraSubmittersPartyNames =
names.extraSubmitterPartyNames(config.numberOfExtraSubmitters, config.uniqueParties)
logger.info("Generating contracts...")
logger.info(s"Identifier suffix: ${names.identifierSuffix}")
(for {
known <- lookupExistingParties()
signatory <- allocateSignatoryParty(known)
observers <- allocateParties(observerPartyNames, known)
divulgees <- allocateParties(divulgeePartyNames, known)
extraSubmitters <- allocateParties(extraSubmittersPartyNames, known)
allocatedParties <- partyAllocating.allocateParties(config)
_ <- uploadTestDars()
} yield {
logger.info("Prepared command submission.")
AllocatedParties(
signatory = signatory,
observers = observers,
divulgees = divulgees,
extraSubmitters = extraSubmitters,
)
})
} yield allocatedParties)
.recoverWith { case NonFatal(ex) =>
logger.error(
s"Command submission preparation failed. Details: ${ex.getLocalizedMessage}",
@ -129,31 +101,6 @@ case class CommandSubmitter(
}
}
private def allocateSignatoryParty(known: Set[String])(implicit
ec: ExecutionContext
): Future[Primitive.Party] =
lookupOrAllocateParty(names.signatoryPartyName, known)
private def allocateParties(partyNames: Seq[String], known: Set[String])(implicit
ec: ExecutionContext
): Future[List[Primitive.Party]] = {
Future.traverse(partyNames.toList)(lookupOrAllocateParty(_, known))
}
private def lookupExistingParties()(implicit ec: ExecutionContext): Future[Set[String]] = {
adminServices.partyManagementService.listKnownParties()
}
private def lookupOrAllocateParty(party: String, known: Set[String])(implicit
ec: ExecutionContext
): Future[Primitive.Party] = {
if (known.contains(party)) {
logger.info(s"Found known party: $party")
Future.successful(Primitive.Party(party))
} else
adminServices.partyManagementService.allocateParty(party)
}
private def uploadDar(dar: TestDars.DarFile, submissionId: String)(implicit
ec: ExecutionContext
): Future[Unit] =
@ -162,7 +109,8 @@ case class CommandSubmitter(
submissionId = submissionId,
)
private def uploadTestDars()(implicit ec: ExecutionContext): Future[Unit] =
private def uploadTestDars()(implicit ec: ExecutionContext): Future[Unit] = {
logger.info("Uploading dars...")
for {
dars <- Future.fromTry(TestDars.readAll())
_ <- Future.sequence {
@ -171,7 +119,10 @@ case class CommandSubmitter(
uploadDar(dar, names.darId(index))
}
}
} yield ()
} yield {
logger.info("Uplading dars completed")
}
}
private def submit(
id: String,
@ -232,7 +183,7 @@ case class CommandSubmitter(
_ <- Source
.fromIterator(() => (1 to config.numberOfInstances).iterator)
.wireTap(i => if (i == 1) progressMeter.start())
.mapAsync(8)(index =>
.mapAsync(commandGenerationParallelism)(index =>
Future.fromTry(
generator.next().map(cmd => index -> cmd)
)

View File

@ -3,16 +3,15 @@
package com.daml.ledger.api.benchtool.submission
import com.daml.ledger.api.benchtool.config.WorkflowConfig.FooSubmissionConfig
import com.daml.ledger.api.v1.commands.Command
import com.daml.ledger.api.v1.commands.ExerciseByKeyCommand
import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value}
import com.daml.ledger.client.binding.Primitive
import com.daml.ledger.test.benchtool.Foo._
import java.util.concurrent.atomic.AtomicLong
import com.daml.ledger.api.benchtool.config.WorkflowConfig.FooSubmissionConfig
import com.daml.ledger.api.benchtool.submission.foo.RandomPartySelecting
import com.daml.ledger.api.v1.commands.{Command, ExerciseByKeyCommand}
import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value}
import com.daml.ledger.client.binding
import com.daml.ledger.client.binding.Primitive
import com.daml.ledger.test.benchtool.Foo._
import scala.util.control.NonFatal
import scala.util.{Failure, Try}
@ -20,11 +19,13 @@ import scala.util.{Failure, Try}
/** @param divulgeesToDivulgerKeyMap map whose keys are sorted divulgees lists
*/
final class FooCommandGenerator(
randomnessProvider: RandomnessProvider,
config: FooSubmissionConfig,
allocatedParties: AllocatedParties,
divulgeesToDivulgerKeyMap: Map[Set[Primitive.Party], Value],
names: Names,
partySelecting: RandomPartySelecting,
defaultRandomnessProvider: RandomnessProvider = RandomnessProvider.Default,
consumingEventsRandomnessProvider: RandomnessProvider = RandomnessProvider.Default,
) extends CommandGenerator {
private val contractDescriptions = new Distribution[FooSubmissionConfig.ContractDescription](
weights = config.instanceDistribution.map(_.weight),
@ -39,30 +40,20 @@ final class FooCommandGenerator(
)
)
private val observersWithUnlikelihood: List[(Primitive.Party, Int)] = unlikelihoods(
allocatedParties.observers
)
private val divulgeesWithUnlikelihood: List[(Primitive.Party, Int)] = unlikelihoods(
allocatedParties.divulgees
)
private val extraSubmittersWithUnlikelihood: List[(Primitive.Party, Int)] = unlikelihoods(
allocatedParties.extraSubmitters
)
override def next(): Try[Seq[Command]] =
(for {
(contractDescription, observers, divulgees) <- Try(
(contractDescription, partySelection) <- Try(
(
pickContractDescription(),
pickParties(observersWithUnlikelihood),
pickParties(divulgeesWithUnlikelihood).toSet,
partySelecting.nextPartiesForContracts(),
)
)
divulgees = partySelection.divulgees.toSet
createContractPayload <- Try(randomPayload(contractDescription.payloadSizeBytes))
command = createCommands(
templateDescriptor = FooTemplateDescriptor.forName(contractDescription.template),
signatory = allocatedParties.signatory,
observers = observers,
observers = partySelection.observers,
divulgerContractKeyO =
if (divulgees.isEmpty) None else divulgeesToDivulgerKeyMap.get(divulgees),
payload = createContractPayload,
@ -80,23 +71,16 @@ final class FooCommandGenerator(
applicationIdsDistributionO.fold(
names.benchtoolApplicationId
)(applicationIdsDistribution =>
applicationIdsDistribution.choose(randomnessProvider.randomDouble()).applicationId
applicationIdsDistribution.choose(defaultRandomnessProvider.randomDouble()).applicationId
)
}
override def nextExtraCommandSubmitters(): List[Primitive.Party] = {
pickParties(extraSubmittersWithUnlikelihood)
partySelecting.nextExtraSubmitter()
}
private def pickContractDescription(): FooSubmissionConfig.ContractDescription =
contractDescriptions.choose(randomnessProvider.randomDouble())
private def pickParties(unlikelihoods: List[(Primitive.Party, Int)]): List[Primitive.Party] =
unlikelihoods
.collect { case (party, unlikelihood) if randomDraw(unlikelihood) => party }
private def randomDraw(unlikelihood: Int): Boolean =
randomnessProvider.randomNatural(unlikelihood) == 0
contractDescriptions.choose(defaultRandomnessProvider.randomDouble())
private def createCommands(
templateDescriptor: FooTemplateDescriptor,
@ -133,7 +117,7 @@ final class FooCommandGenerator(
// Consuming events
val consumingPayloadO: Option[String] = config.consumingExercises
.flatMap(config =>
if (randomnessProvider.randomDouble() <= config.probability) {
if (consumingEventsRandomnessProvider.randomDouble() <= config.probability) {
Some(randomPayload(config.payloadSizeBytes))
} else None
)
@ -196,7 +180,7 @@ final class FooCommandGenerator(
val nonconsumingExercisePayloads: Seq[String] =
config.nonConsumingExercises.fold(Seq.empty[String]) { config =>
var f = config.probability.toInt
if (randomnessProvider.randomDouble() <= config.probability - f) {
if (defaultRandomnessProvider.randomDouble() <= config.probability - f) {
f += 1
}
Seq.fill[String](f)(randomPayload(config.payloadSizeBytes))
@ -282,14 +266,8 @@ final class FooCommandGenerator(
}
private def randomPayload(sizeBytes: Int): String =
FooCommandGenerator.randomPayload(randomnessProvider, sizeBytes)
FooCommandGenerator.randomPayload(defaultRandomnessProvider, sizeBytes)
private def unlikelihoods(orderedParties: List[Primitive.Party]): List[(Primitive.Party, Int)] =
orderedParties.zipWithIndex.toMap.view.mapValues(unlikelihood).toList
/** @return denominator of a 1/(10**i) likelihood
*/
private def unlikelihood(i: Int): Int = math.pow(10.0, i.toDouble).toInt
}
object FooCommandGenerator {

View File

@ -4,6 +4,7 @@
package com.daml.ledger.api.benchtool.submission
import com.daml.ledger.api.benchtool.config.WorkflowConfig.FooSubmissionConfig
import com.daml.ledger.api.benchtool.submission.foo.RandomPartySelecting
import scala.concurrent.{ExecutionContext, Future}
@ -14,6 +15,8 @@ class FooSubmission(
submissionConfig: FooSubmissionConfig,
allocatedParties: AllocatedParties,
names: Names,
partySelectingRandomnessProvider: RandomnessProvider = RandomnessProvider.Default,
consumingEventsRandomnessProvider: RandomnessProvider = RandomnessProvider.Default,
) {
def performSubmission()(implicit
@ -24,7 +27,11 @@ class FooSubmission(
divulgingParty = allocatedParties.signatory,
allDivulgees = allocatedParties.divulgees,
)
val partySelecting =
new RandomPartySelecting(
allocatedParties = allocatedParties,
randomnessProvider = partySelectingRandomnessProvider,
)
for {
_ <-
if (divulgerCmds.nonEmpty) {
@ -41,11 +48,13 @@ class FooSubmission(
Future.unit
}
generator: CommandGenerator = new FooCommandGenerator(
randomnessProvider = RandomnessProvider.Default,
defaultRandomnessProvider = RandomnessProvider.Default,
config = submissionConfig,
divulgeesToDivulgerKeyMap = divulgeesToDivulgerKeyMap,
names = names,
allocatedParties = allocatedParties,
partySelecting = partySelecting,
consumingEventsRandomnessProvider = consumingEventsRandomnessProvider,
)
_ <- submitter
.generateAndSubmit(

View File

@ -7,33 +7,54 @@ package com.daml.ledger.api.benchtool.submission
*/
class Names {
import Names.{
SignatoryPrefix,
PartyPrefixSeparatorChar,
ObserverPrefix,
DivulgeePrefix,
ExtraSubmitterPrefix,
}
val identifierSuffix = f"${System.nanoTime}%x"
val benchtoolApplicationId = "benchtool"
val benchtoolUserId: String = benchtoolApplicationId
val workflowId = s"$benchtoolApplicationId-$identifierSuffix"
val signatoryPartyName = s"signatory-$identifierSuffix"
val signatoryPartyName = s"$SignatoryPrefix$PartyPrefixSeparatorChar$identifierSuffix"
def observerPartyNames(numberOfObservers: Int, uniqueParties: Boolean): Seq[String] =
partyNames("Obs", numberOfObservers, uniqueParties)
partyNames(ObserverPrefix, numberOfObservers, uniqueParties)
def divulgeePartyNames(numberOfDivulgees: Int, uniqueParties: Boolean): Seq[String] =
partyNames("Div", numberOfDivulgees, uniqueParties)
partyNames(DivulgeePrefix, numberOfDivulgees, uniqueParties)
def extraSubmitterPartyNames(numberOfExtraSubmitters: Int, uniqueParties: Boolean): Seq[String] =
partyNames("Sub", numberOfExtraSubmitters, uniqueParties)
partyNames(ExtraSubmitterPrefix, numberOfExtraSubmitters, uniqueParties)
def commandId(index: Int): String = s"command-$index-$identifierSuffix"
def darId(index: Int) = s"submission-dars-$index-$identifierSuffix"
private def partyNames(
baseName: String,
def partyNames(
prefix: String,
numberOfParties: Int,
uniqueParties: Boolean,
): Seq[String] =
(0 until numberOfParties).map(i => partyName(baseName, i, uniqueParties))
(0 until numberOfParties).map(i => partyName(prefix, i, uniqueParties))
private def partyName(baseName: String, index: Int, uniqueParties: Boolean): String =
s"$baseName-$index" + (if (uniqueParties) identifierSuffix else "")
s"$baseName$PartyPrefixSeparatorChar$index" + (if (uniqueParties) identifierSuffix else "")
}
object Names {
protected val PartyPrefixSeparatorChar: Char = '-'
val SignatoryPrefix = "signatory"
val ObserverPrefix = "Obs"
val DivulgeePrefix = "Div"
val ExtraSubmitterPrefix = "Sub"
def getPartyNamePrefix(partyName: String): String = {
partyName.split(Names.PartyPrefixSeparatorChar)(0)
}
}

View File

@ -0,0 +1,72 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.submission
import com.daml.ledger.api.benchtool.config.WorkflowConfig.SubmissionConfig
import com.daml.ledger.api.benchtool.services.LedgerApiServices
import com.daml.ledger.client.binding.Primitive
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, Future}
class PartyAllocating(
names: Names,
adminServices: LedgerApiServices,
) {
private val logger = LoggerFactory.getLogger(getClass)
def allocateParties(config: SubmissionConfig)(implicit
ec: ExecutionContext
): Future[AllocatedParties] = {
val observerPartyNames =
names.observerPartyNames(config.numberOfObservers, config.uniqueParties)
val divulgeePartyNames =
names.divulgeePartyNames(config.numberOfDivulgees, config.uniqueParties)
val extraSubmittersPartyNames =
names.extraSubmitterPartyNames(config.numberOfExtraSubmitters, config.uniqueParties)
logger.info("Allocating parties...")
for {
known <- lookupExistingParties()
signatory <- allocateSignatoryParty(known)
observers <- allocateParties(observerPartyNames, known)
divulgees <- allocateParties(divulgeePartyNames, known)
extraSubmitters <- allocateParties(extraSubmittersPartyNames, known)
} yield {
logger.info("Allocating parties completed")
AllocatedParties(
signatoryO = Some(signatory),
observers = observers,
divulgees = divulgees,
extraSubmitters = extraSubmitters,
)
}
}
def lookupExistingParties()(implicit ec: ExecutionContext): Future[Set[String]] = {
adminServices.partyManagementService.listKnownParties()
}
private def allocateSignatoryParty(known: Set[String])(implicit
ec: ExecutionContext
): Future[Primitive.Party] =
lookupOrAllocateParty(names.signatoryPartyName, known)
private def allocateParties(partyNames: Seq[String], known: Set[String])(implicit
ec: ExecutionContext
): Future[List[Primitive.Party]] = {
Future.traverse(partyNames.toList)(lookupOrAllocateParty(_, known))
}
private def lookupOrAllocateParty(party: String, known: Set[String])(implicit
ec: ExecutionContext
): Future[Primitive.Party] = {
if (known.contains(party)) {
logger.info(s"Found known party: $party")
Future.successful(Primitive.Party(party))
} else
adminServices.partyManagementService.allocateParty(party)
}
}

View File

@ -12,8 +12,12 @@ trait RandomnessProvider {
}
object RandomnessProvider {
object Default extends RandomnessProvider {
private val r = new scala.util.Random(System.currentTimeMillis())
object Default extends Seeded(System.currentTimeMillis())
def forSeed(seed: Long) = new Seeded(seed = seed)
class Seeded(seed: Long) extends RandomnessProvider {
private val r = new scala.util.Random(seed)
override def randomDouble(): Double = r.nextDouble()
override def randomNatural(n: Int): Int = r.nextInt(n)
override def randomAsciiString(n: Int): String = {

View File

@ -0,0 +1,11 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.submission.foo
import com.daml.ledger.client.binding.Primitive
case class PartiesSelection(
observers: List[Primitive.Party],
divulgees: List[Primitive.Party],
)

View File

@ -0,0 +1,45 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.submission.foo
import com.daml.ledger.api.benchtool.submission.{AllocatedParties, RandomnessProvider}
import com.daml.ledger.client.binding.Primitive
class RandomPartySelecting(
allocatedParties: AllocatedParties,
randomnessProvider: RandomnessProvider = RandomnessProvider.Default,
) {
private val observersProbability = probabilitiesByPartyIndex(allocatedParties.observers)
private val divulgeesProbability = probabilitiesByPartyIndex(allocatedParties.divulgees)
private val extraSubmittersProbability = probabilitiesByPartyIndex(
allocatedParties.extraSubmitters
)
def nextPartiesForContracts(): PartiesSelection = {
PartiesSelection(
observers = pickParties(observersProbability),
divulgees = pickParties(divulgeesProbability),
)
}
def nextExtraSubmitter(): List[Primitive.Party] = pickParties(extraSubmittersProbability)
private def pickParties(probabilities: List[(Primitive.Party, Double)]): List[Primitive.Party] =
probabilities
.collect { case (party, probability) if randomBoolean(probability) => party }
private def randomBoolean(truthProbability: Double): Boolean =
randomnessProvider.randomDouble() <= truthProbability
private def probabilitiesByPartyIndex(
orderedParties: List[Primitive.Party]
): List[(Primitive.Party, Double)] =
orderedParties.zipWithIndex.toMap.view.mapValues(probabilityBaseTen).toList
/** @return probability of a 1/(10**i)
*/
private def probabilityBaseTen(i: Int): Double = math.pow(10.0, -i.toDouble)
}

View File

@ -5,11 +5,17 @@ package com.daml.ledger.api.benchtool
import java.io.File
import com.codahale.metrics.MetricRegistry
import com.daml.bazeltools.BazelRunfiles.rlocation
import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager
import com.daml.ledger.api.benchtool.services.LedgerApiServices
import com.daml.ledger.api.benchtool.submission.{CommandSubmitter, Names, PartyAllocating}
import com.daml.ledger.test.BenchtoolTestDar
import com.daml.platform.sandbox.fixture.SandboxFixture
import org.scalatest.Suite
import scala.concurrent.{ExecutionContext, Future}
trait BenchtoolSandboxFixture extends SandboxFixture {
self: Suite =>
@ -17,4 +23,33 @@ trait BenchtoolSandboxFixture extends SandboxFixture {
new File(rlocation(BenchtoolTestDar.path))
)
def benchtoolFixture()(implicit
ec: ExecutionContext
): Future[(LedgerApiServices, Names, CommandSubmitter)] = {
for {
ledgerApiServicesF <- LedgerApiServices.forChannel(
channel = channel,
authorizationHelper = None,
)
apiServices: LedgerApiServices = ledgerApiServicesF("someUser")
names = new Names()
submitter = CommandSubmitter(
names = names,
benchtoolUserServices = apiServices,
adminServices = apiServices,
metricRegistry = new MetricRegistry,
metricsManager = NoOpMetricsManager(),
waitForSubmission = true,
partyAllocating = new PartyAllocating(
names = names,
adminServices = apiServices,
),
)
} yield (
apiServices,
names,
submitter,
)
}
}

View File

@ -80,6 +80,11 @@ case class ObservedEvents(
expectedTemplateNames.map(name => name -> groups.get(name).fold(0)(_.size)).toMap
}
val numberOfNonConsumingExercisesPerTemplateName: Map[String, Int] = {
val groups = nonConsumingExercises.groupBy(_.templateName)
expectedTemplateNames.map(name => name -> groups.get(name).fold(0)(_.size)).toMap
}
val avgSizeOfCreateEventPerTemplateName: Map[String, Int] = {
val groups = createEvents.groupBy(_.templateName)
expectedTemplateNames.map { name =>

View File

@ -0,0 +1,73 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool
import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig.{
PartyFilter,
TransactionsStreamConfig,
}
import com.daml.ledger.api.benchtool.submission.AllocatedParties
import com.daml.ledger.client.binding.Primitive
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import scalaz.syntax.tag._
class ConfigEnricherSpec extends AnyFlatSpec with Matchers {
it should "expand party-set filter into a sequence of party filters" in {
val partySuffix = "-foo-123"
def makePartyName(shortName: String): String = s"$shortName$partySuffix"
def makeParty(shortName: String): Primitive.Party = Primitive.Party(makePartyName(shortName))
val desugaring = new ConfigEnricher(
allocatedParties = AllocatedParties(
signatoryO = Some(makeParty("Sig-0")),
observers = List(makeParty("Obs-0")),
divulgees = List(makeParty("Div-0")),
extraSubmitters = List(makeParty("Sub-0")),
)
)
val templates: List[String] = List("otherTemplate", "Foo1")
val foo1Id = com.daml.ledger.test.benchtool.Foo.Foo1.id.unwrap
val enrichedTemplates: List[String] =
List("otherTemplate", s"${foo1Id.packageId}:${foo1Id.moduleName}:${foo1Id.entityName}")
desugaring.enrichStreamConfig(
TransactionsStreamConfig(
name = "flat",
filters = List(
PartyFilter(
party = "Obs-0",
templates = templates,
),
PartyFilter(
party = "Sig-0",
templates = templates,
),
PartyFilter(
party = "UnknownParty-0",
templates = templates,
),
),
)
) shouldBe TransactionsStreamConfig(
name = "flat",
filters = List(
PartyFilter(
party = "Obs-0-foo-123",
templates = enrichedTemplates,
),
PartyFilter(
party = "Sig-0-foo-123",
templates = enrichedTemplates,
),
PartyFilter(
party = "UnknownParty-0",
templates = enrichedTemplates,
),
),
)
}
}

View File

@ -0,0 +1,45 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.submission
import com.daml.ledger.client.binding.Primitive
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
class AllocatedPartiesSpec extends AnyFlatSpec with Matchers {
it should "apportion parties appropriately" in {
AllocatedParties.forExistingParties(
parties = List(
"signatory-123",
"Obs-0",
"Obs-1",
"Div-0",
"Sub-0",
)
) shouldBe AllocatedParties(
signatoryO = Some(Primitive.Party("signatory-123")),
observers = List(
Primitive.Party("Obs-0"),
Primitive.Party("Obs-1"),
),
divulgees = List(Primitive.Party("Div-0")),
extraSubmitters = List(Primitive.Party("Sub-0")),
)
}
it should "apportion parties appropriately - minimal" in {
AllocatedParties.forExistingParties(
parties = List(
"signatory-123"
)
) shouldBe AllocatedParties(
signatoryO = Some(Primitive.Party("signatory-123")),
observers = List.empty,
divulgees = List.empty,
extraSubmitters = List.empty,
)
}
}

View File

@ -3,11 +3,8 @@
package com.daml.ledger.api.benchtool.submission
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.benchtool.BenchtoolSandboxFixture
import com.daml.ledger.api.benchtool.config.WorkflowConfig
import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager
import com.daml.ledger.api.benchtool.services.LedgerApiServices
import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import org.scalatest.AppendedClues
@ -31,28 +28,15 @@ class FibonacciCommandSubmitterITSpec
)
for {
ledgerApiServicesF <- LedgerApiServices.forChannel(
channel = channel,
authorizationHelper = None,
)
apiServices = ledgerApiServicesF("someUser")
names = new Names()
tested = CommandSubmitter(
names = names,
benchtoolUserServices = apiServices,
adminServices = apiServices,
metricRegistry = new MetricRegistry,
metricsManager = NoOpMetricsManager(),
waitForSubmission = config.waitForSubmission,
)
allocatedParties <- tested.prepare(config)
(apiServices, names, submitter) <- benchtoolFixture()
allocatedParties <- submitter.prepare(config)
_ = allocatedParties.divulgees shouldBe empty
generator = new FibonacciCommandGenerator(
signatory = allocatedParties.signatory,
config = config,
names = names,
)
_ <- tested.generateAndSubmit(
_ <- submitter.generateAndSubmit(
generator = generator,
config = config,
baseActAs = List(allocatedParties.signatory) ++ allocatedParties.divulgees,

View File

@ -3,14 +3,12 @@
package com.daml.ledger.api.benchtool.submission
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.benchtool.BenchtoolSandboxFixture
import com.daml.ledger.api.benchtool.config.WorkflowConfig
import com.daml.ledger.api.benchtool.config.WorkflowConfig.FooSubmissionConfig.{
ConsumingExercises,
NonconsumingExercises,
}
import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager
import com.daml.ledger.api.benchtool.services.LedgerApiServices
import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
@ -64,20 +62,7 @@ class FooCommandSubmitterITSpec
)
for {
ledgerApiServicesF <- LedgerApiServices.forChannel(
channel = channel,
authorizationHelper = None,
)
apiServices = ledgerApiServicesF("someUser")
names = new Names()
submitter = CommandSubmitter(
names = names,
benchtoolUserServices = apiServices,
adminServices = apiServices,
metricRegistry = new MetricRegistry,
metricsManager = NoOpMetricsManager(),
waitForSubmission = false,
)
(apiServices, names, submitter) <- benchtoolFixture()
allocatedParties <- submitter.prepare(config)
_ = allocatedParties.divulgees shouldBe empty
tested = new FooSubmission(

View File

@ -3,11 +3,9 @@
package com.daml.ledger.api.benchtool.submission
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.benchtool.BenchtoolSandboxFixture
import com.daml.ledger.api.benchtool.config.WorkflowConfig
import com.daml.ledger.api.benchtool.config.WorkflowConfig.FooSubmissionConfig.ConsumingExercises
import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager
import com.daml.ledger.api.benchtool.services.LedgerApiServices
import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
@ -51,20 +49,7 @@ class NonStakeholderInformeesITSpec
applicationIds = List.empty,
)
for {
ledgerApiServicesF <- LedgerApiServices.forChannel(
channel = channel,
authorizationHelper = None,
)
apiServices: LedgerApiServices = ledgerApiServicesF("someUser")
names = new Names()
submitter = CommandSubmitter(
names = names,
benchtoolUserServices = apiServices,
adminServices = apiServices,
metricRegistry = new MetricRegistry,
metricsManager = NoOpMetricsManager(),
waitForSubmission = true,
)
(apiServices, names, submitter) <- benchtoolFixture()
allocatedParties <- submitter.prepare(submissionConfig)
tested = new FooSubmission(
submitter = submitter,

View File

@ -3,11 +3,8 @@
package com.daml.ledger.api.benchtool.submission
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.benchtool.BenchtoolSandboxFixture
import com.daml.ledger.api.benchtool.config.WorkflowConfig
import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager
import com.daml.ledger.api.benchtool.services.LedgerApiServices
import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll
import org.scalatest.AppendedClues
import org.scalatest.flatspec.AsyncFlatSpec
@ -35,19 +32,7 @@ class PartyAllocationITSpec
)
for {
ledgerApiServicesF <- LedgerApiServices.forChannel(
authorizationHelper = None,
channel = channel,
)
apiServices = ledgerApiServicesF("someUser")
submitter = CommandSubmitter(
names = new Names(),
benchtoolUserServices = apiServices,
adminServices = apiServices,
metricRegistry = new MetricRegistry,
metricsManager = NoOpMetricsManager(),
waitForSubmission = false,
)
(_, _, submitter) <- benchtoolFixture()
parties1 <- submitter.prepare(config)
parties2 <- submitter.prepare(config)
} yield {

View File

@ -5,11 +5,9 @@ package com.daml.ledger.api.benchtool.submission
import java.util.concurrent.TimeUnit
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.benchtool.BenchtoolSandboxFixture
import com.daml.ledger.api.benchtool.config.WorkflowConfig
import com.daml.ledger.api.benchtool.config.WorkflowConfig.FooSubmissionConfig.ApplicationId
import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager
import com.daml.ledger.api.benchtool.services.LedgerApiServices
import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
@ -34,7 +32,6 @@ class WeightedApplicationIdsAndSubmittersITSpec
val submissionConfig = WorkflowConfig.FooSubmissionConfig(
numberOfInstances = 100,
numberOfObservers = 1,
numberOfDivulgees = 0,
numberOfExtraSubmitters = 3,
uniqueParties = false,
instanceDistribution = List(
@ -58,20 +55,7 @@ class WeightedApplicationIdsAndSubmittersITSpec
),
)
for {
ledgerApiServicesF <- LedgerApiServices.forChannel(
channel = channel,
authorizationHelper = None,
)
apiServices: LedgerApiServices = ledgerApiServicesF("someUser")
names = new Names()
submitter = CommandSubmitter(
names = names,
benchtoolUserServices = apiServices,
adminServices = apiServices,
metricRegistry = new MetricRegistry,
metricsManager = NoOpMetricsManager(),
waitForSubmission = true,
)
(apiServices, names, submitter) <- benchtoolFixture()
allocatedParties <- submitter.prepare(submissionConfig)
tested = new FooSubmission(
submitter = submitter,