Migrate LoadTesting to use Canton (#16652)

This commit is contained in:
Carl Pulley 2023-04-05 13:23:32 +01:00 committed by GitHub
parent 2e2bd3c8f1
commit 3026c29790
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 254 additions and 45 deletions

View File

@ -30,7 +30,7 @@ import scala.sys.process.Process
@scala.annotation.nowarn("msg=match may not be exhaustive")
object CantonFixture {
case class CompiledDar(
final case class CompiledDar(
mainPkg: Ref.PackageId,
compiledPackages: PureCompiledPackages,
)

View File

@ -85,6 +85,7 @@ da_scala_library(
srcs = [
"src/test/scala/com/digitalasset/daml/lf/engine/trigger/test/AbstractFuncTests.scala",
"src/test/scala/com/digitalasset/daml/lf/engine/trigger/test/AbstractTriggerTest.scala",
"src/test/scala/com/digitalasset/daml/lf/engine/trigger/test/AbstractTriggerTestWithCanton.scala",
],
scala_deps = [
"@maven//:com_typesafe_akka_akka_stream",
@ -99,6 +100,7 @@ da_scala_library(
"//bazel_tools/runfiles:scala_runfiles",
"//daml-lf/archive:daml_lf_archive_reader",
"//daml-lf/data",
"//daml-lf/integration-test-lib",
"//daml-lf/interpreter",
"//daml-lf/language",
"//daml-lf/transaction",
@ -194,6 +196,7 @@ da_scala_library(
data = [
":acs%s.dar" % suffix,
"//test-common/test-certificates",
"@canton//:lib",
],
resources = ["//triggers/runner:src/main/resources/logback.xml"],
scala_deps = [
@ -209,6 +212,7 @@ da_scala_library(
"//daml-lf/archive:daml_lf_archive_reader",
"//daml-lf/data",
"//daml-lf/engine",
"//daml-lf/integration-test-lib",
"//daml-lf/interpreter",
"//daml-lf/language",
"//language-support/scala/bindings",
@ -222,8 +226,6 @@ da_scala_library(
"//ledger/ledger-configuration",
"//ledger/ledger-runner-common",
"//ledger/participant-integration-api",
"//ledger/sandbox-on-x",
"//ledger/sandbox-on-x:sandbox-on-x-test-lib",
"//libs-scala/contextualized-logging",
"//libs-scala/ledger-resources",
"//libs-scala/logging-entries",

View File

@ -0,0 +1,237 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.lf
package engine
package trigger
package test
import java.util.UUID
import akka.stream.scaladsl.{Sink, Source}
import com.daml.bazeltools.BazelRunfiles
import com.daml.ledger.api.refinements.ApiTypes.{ApplicationId, Party}
import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll
import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.daml.ledger.api.v1.commands.{Command, CreateCommand, ExerciseCommand, _}
import com.daml.ledger.api.v1.event.CreatedEvent
import com.daml.ledger.api.v1.transaction_filter.{Filters, TransactionFilter}
import com.daml.ledger.api.v1.{value => LedgerApi}
import com.daml.ledger.client.LedgerClient
import com.daml.ledger.client.configuration.{
CommandClientConfiguration,
LedgerClientChannelConfiguration,
LedgerClientConfiguration,
LedgerIdRequirement,
}
import com.daml.lf.data.Ref._
import com.daml.lf.engine.trigger.TriggerRunnerConfig.DefaultTriggerRunnerConfig
import com.daml.lf.integrationtest.CantonFixture
import com.daml.lf.speedy.SValue
import com.daml.lf.speedy.SValue._
import com.daml.platform.services.time.TimeProviderType
import org.scalatest._
import scalaz.syntax.tag._
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
// TODO: once test migration work has completed, rename this trait to AbstractTriggerTest
trait AbstractTriggerTestWithCanton extends CantonFixture with SuiteResourceManagementAroundAll {
self: Suite =>
import CantonFixture._
private[this] lazy val darFile =
Try(BazelRunfiles.requiredResource("triggers/tests/acs.dar"))
.getOrElse(BazelRunfiles.requiredResource("triggers/tests/acs-1.dev.dar"))
override protected def authSecret = None
override protected def darFiles = List(darFile.toPath)
override protected def devMode = true
override protected def nParticipants = 1
override protected def timeProviderType = TimeProviderType.Static
override protected def tlsEnable = false
protected def toHighLevelResult(s: SValue) = s match {
case SRecord(_, _, values) if values.size == 6 =>
AbstractTriggerTestWithCanton.HighLevelResult(
values.get(0),
values.get(1),
values.get(2),
values.get(3),
values.get(4),
values.get(5),
)
case _ => throw new IllegalArgumentException(s"Expected record with 6 fields but got $s")
}
protected val applicationId: ApplicationId = RunnerConfig.DefaultApplicationId
protected def ledgerClientConfiguration: LedgerClientConfiguration =
LedgerClientConfiguration(
applicationId = ApplicationId.unwrap(applicationId),
ledgerIdRequirement = LedgerIdRequirement.none,
commandClient = CommandClientConfiguration.default,
token = None,
)
protected def ledgerClientChannelConfiguration: LedgerClientChannelConfiguration =
LedgerClientChannelConfiguration.InsecureDefaults
protected def triggerRunnerConfiguration: TriggerRunnerConfig = DefaultTriggerRunnerConfig
protected val CompiledDar(packageId, compiledPackages) =
readDar(darFile.toPath, speedy.Compiler.Config.Dev)
protected def getRunner(
client: LedgerClient,
name: QualifiedName,
party: String,
readAs: Set[String] = Set.empty,
): Runner = {
val triggerId = Identifier(packageId, name)
Trigger.newTriggerLogContext(
triggerId,
Party(party),
Party.subst(readAs),
"test-trigger",
ApplicationId("test-trigger-app"),
) { implicit triggerContext: TriggerLogContext =>
val trigger = Trigger.fromIdentifier(compiledPackages, triggerId).toOption.get
Runner(
compiledPackages,
trigger,
triggerRunnerConfiguration,
client,
timeProviderType,
applicationId,
TriggerParties(
actAs = Party(party),
readAs = Party.subst(readAs),
),
)
}
}
protected def allocateParty(client: LedgerClient)(implicit ec: ExecutionContext): Future[String] =
client.partyManagementClient.allocateParty(None, None).map(_.party)
protected def create(client: LedgerClient, party: String, cmd: CreateCommand)(implicit
ec: ExecutionContext
): Future[String] = {
val commands = Seq(Command().withCreate(cmd))
val request = SubmitAndWaitRequest(
Some(
Commands(
party = party,
commands = commands,
ledgerId = client.ledgerId.unwrap,
applicationId = ApplicationId.unwrap(applicationId),
commandId = UUID.randomUUID.toString,
)
)
)
for {
response <- client.commandServiceClient.submitAndWaitForTransaction(request)
} yield response.getTransaction.events.head.getCreated.contractId
}
protected def create(
client: LedgerClient,
party: String,
commands: Seq[CreateCommand],
elements: Int = 50,
per: FiniteDuration = 1.second,
)(implicit ec: ExecutionContext): Future[Unit] = {
Source(commands)
.mapAsync(8) { cmd =>
create(client, party, cmd)
}
.throttle(elements, per)
.run()
.map(_ => ())
}
protected def exercise(
client: LedgerClient,
party: String,
templateId: LedgerApi.Identifier,
contractId: String,
choice: String,
choiceArgument: LedgerApi.Value,
)(implicit ec: ExecutionContext): Future[Unit] = {
val commands = Seq(
Command().withExercise(
ExerciseCommand(
templateId = Some(templateId),
contractId = contractId,
choice = choice,
choiceArgument = Some(choiceArgument),
)
)
)
val request = SubmitAndWaitRequest(
Some(
Commands(
party = party,
commands = commands,
ledgerId = client.ledgerId.unwrap,
applicationId = ApplicationId.unwrap(applicationId),
commandId = UUID.randomUUID.toString,
)
)
)
for {
_ <- client.commandServiceClient.submitAndWaitForTransaction(request)
} yield ()
}
protected def archive(
client: LedgerClient,
party: String,
templateId: LedgerApi.Identifier,
contractId: String,
)(implicit ec: ExecutionContext): Future[Unit] = {
exercise(
client,
party,
templateId,
contractId,
"Archive",
LedgerApi.Value().withRecord(LedgerApi.Record()),
)
}
protected def queryACS(client: LedgerClient, party: String)(implicit
ec: ExecutionContext
): Future[Map[LedgerApi.Identifier, Seq[LedgerApi.Record]]] = {
val filter = TransactionFilter(List((party, Filters.defaultInstance)).toMap)
val contractsF: Future[Seq[CreatedEvent]] = client.activeContractSetClient
.getActiveContracts(filter, verbose = true)
.runWith(Sink.seq)
.map(_.flatMap(x => x.activeContracts))
contractsF.map(contracts =>
contracts
.map(created => (created.getTemplateId, created.getCreateArguments))
.groupBy(_._1)
.view
.mapValues(cs => cs.map(_._2))
.toMap
)
}
}
object AbstractTriggerTestWithCanton {
final case class HighLevelResult(
acs: SValue,
party: SValue,
readAs: SValue,
state: SValue,
commandsInFlight: SValue,
config: SValue,
)
}

View File

@ -4,14 +4,11 @@
package com.daml.lf.engine.trigger.test
import akka.stream.scaladsl.Flow
import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll
import com.daml.ledger.api.v1.commands.CreateCommand
import com.daml.ledger.api.v1.event.Event.Event.Created
import com.daml.ledger.api.v1.event.{Event => ApiEvent}
import com.daml.ledger.api.v1.transaction.{Transaction => ApiTransaction}
import com.daml.ledger.api.v1.{value => LedgerApi}
import com.daml.ledger.runner.common.Config
import com.daml.ledger.sandbox.SandboxOnXForTest.{ApiServerConfig, singleParticipant}
import com.daml.lf.data.Ref.QualifiedName
import com.daml.lf.engine.trigger.Runner.TriggerContext
import com.daml.lf.engine.trigger.{
@ -21,21 +18,18 @@ import com.daml.lf.engine.trigger.{
TriggerRuleEvaluationTimeout,
TriggerRunnerConfig,
}
import com.daml.platform.services.time.TimeProviderType
import com.daml.util.Ctx
import org.scalatest.{Inside, TryValues}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import scala.concurrent.duration._
import scala.concurrent.Future
abstract class LoadTesting
extends AsyncWordSpec
with AbstractTriggerTest
with AbstractTriggerTestWithCanton
with Matchers
with Inside
with SuiteResourceManagementAroundAll
with TryValues {
// The following value should be kept in sync with the value of contractPairings in Cats.daml
@ -43,14 +37,6 @@ abstract class LoadTesting
// The following value should be kept in sync with the value of breedingRate in Cats.daml
val breedingRate: Int = 100
override protected def config: Config = super.config.copy(
participants = singleParticipant(
ApiServerConfig.copy(
timeProviderType = TimeProviderType.Static
)
)
)
def command(template: String, owner: String, i: Int): CreateCommand =
CreateCommand(
templateId = Some(LedgerApi.Identifier(packageId, "Cats", template)),
@ -96,18 +82,10 @@ final class BaseLoadTesting extends LoadTesting {
"Contracts are already created" should {
"Process all contract pairings successfully" in {
for {
client <- ledgerClient()
client <- defaultLedgerClient()
party <- allocateParty(client)
_ <- Future.sequence(
(0 until contractPairings).map { i =>
create(client, party, cat(party, i))
}
)
_ <- Future.sequence(
(0 until contractPairings).map { i =>
create(client, party, food(party, i))
}
)
_ <- create(client, party, (0 until contractPairings).map(i => cat(party, i)))
_ <- create(client, party, (0 until contractPairings).map(i => food(party, i)))
runner = getRunner(
client,
QualifiedName.assertFromString("Cats:feedingTrigger"),
@ -133,7 +111,7 @@ final class BaseLoadTesting extends LoadTesting {
"Contracts are created by the trigger" should {
"Process all contract pairings successfully" in {
for {
client <- ledgerClient()
client <- defaultLedgerClient()
party <- allocateParty(client)
runner = getRunner(
client,
@ -175,7 +153,7 @@ final class InFlightLoadTesting extends LoadTesting {
"Eventually cause a trigger overflow" in {
recoverToSucceededIf[InFlightCommandOverflowException] {
for {
client <- ledgerClient()
client <- defaultLedgerClient()
party <- allocateParty(client)
runner = getRunner(
client,
@ -218,18 +196,10 @@ final class ACSLoadTesting extends LoadTesting {
"Cause a trigger overflow" in {
recoverToSucceededIf[ACSOverflowException] {
for {
client <- ledgerClient()
client <- defaultLedgerClient()
party <- allocateParty(client)
_ <- Future.sequence(
(0 until breedingRate).map { i =>
create(client, party, cat(party, i))
}
)
_ <- Future.sequence(
(0 until breedingRate).map { i =>
create(client, party, food(party, i))
}
)
_ <- create(client, party, (0 until breedingRate).map(i => cat(party, i)))
_ <- create(client, party, (0 until breedingRate).map(i => food(party, i)))
runner = getRunner(
client,
QualifiedName.assertFromString("Cats:feedingTrigger"),
@ -248,7 +218,7 @@ final class ACSLoadTesting extends LoadTesting {
"Cause a trigger overflow" in {
recoverToSucceededIf[ACSOverflowException] {
for {
client <- ledgerClient()
client <- defaultLedgerClient()
party <- allocateParty(client)
runner = getRunner(
client,
@ -287,7 +257,7 @@ final class TriggerRuleEvaluationTimeoutTesting extends LoadTesting {
"Cause a trigger timeout" in {
recoverToSucceededIf[TriggerRuleEvaluationTimeout] {
for {
client <- ledgerClient()
client <- defaultLedgerClient()
party <- allocateParty(client)
runner = getRunner(
client,
@ -317,7 +287,7 @@ final class TriggerRuleEvaluationTimeoutTesting extends LoadTesting {
"Cause a trigger timeout" in {
recoverToSucceededIf[TriggerRuleEvaluationTimeout] {
for {
client <- ledgerClient()
client <- defaultLedgerClient()
party <- allocateParty(client)
runner = getRunner(
client,