mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-19 16:57:40 +03:00
ledger-api-tests: Remove the performance-testing functionality. (#12948)
* ledger-api-tests: Remove the performance-testing functionality. This has been superseded by the Ledger API Bench Tool. CHANGELOG_BEGIN - [Ledger API Test Tool] The performance tests have been removed from this tool, in favor of the Ledger API Bench Tool, which is much more capable and configurable. CHANGELOG_END * ledger-on-sql/sandbox-on-x: Stop running performance tests. We don't check the results anyway.
This commit is contained in:
parent
612bf97b05
commit
ac12112dad
@ -1,38 +0,0 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.api.testtool.infrastructure
|
||||
|
||||
import java.io.PrintStream
|
||||
import java.nio.file.{Files, Path, StandardOpenOption}
|
||||
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
trait BenchmarkReporter {
|
||||
|
||||
def addReport(key: String, value: Double): Unit
|
||||
|
||||
def forKey(parentKey: String)(key: String, value: Double): Unit =
|
||||
addReport(parentKey + "." + key, value)
|
||||
}
|
||||
|
||||
object BenchmarkReporter {
|
||||
def toFile(path: Path) = new FileOutputBenchmarkReporter(path)
|
||||
|
||||
def toStream(printStream: PrintStream) = new PrintStreamBenchmarkReporter(printStream)
|
||||
}
|
||||
|
||||
class FileOutputBenchmarkReporter(path: Path) extends BenchmarkReporter {
|
||||
|
||||
override def addReport(key: String, value: Double): Unit = synchronized {
|
||||
val _ = Files
|
||||
.write(path, Seq(s"$key=$value").asJava, StandardOpenOption.CREATE, StandardOpenOption.APPEND)
|
||||
}
|
||||
}
|
||||
|
||||
class PrintStreamBenchmarkReporter(printStream: PrintStream) extends BenchmarkReporter {
|
||||
|
||||
override def addReport(key: String, value: Double): Unit = synchronized {
|
||||
printStream.println(s"$key=$value")
|
||||
}
|
||||
}
|
@ -1,43 +0,0 @@
|
||||
# Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
load("//bazel_tools:scala.bzl", "da_scala_library")
|
||||
load(
|
||||
"//daml-lf/language:daml-lf.bzl",
|
||||
"lf_version_configuration",
|
||||
"lf_version_configuration_versions",
|
||||
)
|
||||
|
||||
[
|
||||
da_scala_library(
|
||||
name = "performance-%s" % lf_version,
|
||||
srcs = glob(["src/main/scala/**/*.scala"]),
|
||||
tags = ["maven_coordinates=com.daml:ledger-api-tests-performance-%s:__VERSION__" % lf_version],
|
||||
visibility = ["//:__subpackages__"],
|
||||
deps = [
|
||||
"//daml-lf/data",
|
||||
"//language-support/scala/bindings",
|
||||
"//ledger/ledger-api-tests/infrastructure:infrastructure-%s" % lf_version,
|
||||
"//ledger/test-common:performance-tests-%s.scala" % lf_version,
|
||||
"@maven//:io_grpc_grpc_api",
|
||||
"@maven//:io_grpc_grpc_context",
|
||||
"@maven//:org_slf4j_slf4j_api",
|
||||
],
|
||||
)
|
||||
for lf_version in lf_version_configuration_versions
|
||||
]
|
||||
|
||||
[
|
||||
alias(
|
||||
name = "performance-%s" % name,
|
||||
actual = ":performance-%s" % lf_target,
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
for (name, lf_target) in lf_version_configuration.items()
|
||||
]
|
||||
|
||||
alias(
|
||||
name = "performance",
|
||||
actual = ":performance-default",
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
@ -1,107 +0,0 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.api.testtool.performance
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration.Duration
|
||||
|
||||
sealed abstract class Envelope(val name: String) extends Product with Serializable {
|
||||
def this(names: Vector[String]) = {
|
||||
this(names.mkString(Envelope.Separator))
|
||||
}
|
||||
}
|
||||
|
||||
object Envelope {
|
||||
|
||||
private val Separator = "."
|
||||
|
||||
private val Prefix = Vector("PerformanceEnvelope")
|
||||
|
||||
val All: immutable.Seq[Envelope] = Latency.All ++ Throughput.All ++ TransactionSize.All
|
||||
|
||||
sealed abstract class Latency(name: String, val latency: Duration, val numPings: Int)
|
||||
extends Envelope(Latency.Prefix :+ name)
|
||||
|
||||
object Latency {
|
||||
|
||||
private val Prefix = Envelope.Prefix :+ "Latency"
|
||||
|
||||
val All: immutable.Seq[Latency] =
|
||||
Vector(SixtySeconds, ThreeSeconds, OneSecond, HalfSecond)
|
||||
|
||||
case object SixtySeconds
|
||||
extends Latency("60000ms", latency = Duration(60, TimeUnit.SECONDS), numPings = 20)
|
||||
|
||||
case object ThreeSeconds
|
||||
extends Latency("3000ms", latency = Duration(3, TimeUnit.SECONDS), numPings = 20)
|
||||
|
||||
case object OneSecond
|
||||
extends Latency("1000ms", latency = Duration(1, TimeUnit.SECONDS), numPings = 20)
|
||||
|
||||
case object HalfSecond
|
||||
extends Latency("500ms", latency = Duration(500, TimeUnit.MILLISECONDS), numPings = 40)
|
||||
|
||||
case object QuarterSecond
|
||||
extends Latency("250ms", latency = Duration(250, TimeUnit.MILLISECONDS), numPings = 40)
|
||||
|
||||
}
|
||||
|
||||
sealed abstract class Throughput(name: String, val operationsPerSecond: Int)
|
||||
extends Envelope(Throughput.Prefix :+ name)
|
||||
|
||||
object Throughput {
|
||||
|
||||
private val Prefix = Envelope.Prefix :+ "Throughput"
|
||||
|
||||
val All: immutable.Seq[Envelope] =
|
||||
Vector(
|
||||
NoThroughput,
|
||||
FivePerSecond,
|
||||
TwentyPerSecond,
|
||||
FiftyPerSecond,
|
||||
HundredPerSecond,
|
||||
TwoFiftyPerSecond,
|
||||
FiveHundredPerSecond,
|
||||
)
|
||||
|
||||
case object NoThroughput extends Throughput("ZeroOPS", operationsPerSecond = 0)
|
||||
|
||||
case object FivePerSecond extends Throughput("FiveOPS", operationsPerSecond = 5)
|
||||
|
||||
case object TwentyPerSecond extends Throughput("TwentyOPS", operationsPerSecond = 20)
|
||||
|
||||
case object FiftyPerSecond extends Throughput("FiftyOPS", operationsPerSecond = 50)
|
||||
|
||||
case object HundredPerSecond extends Throughput("HundredOPS", operationsPerSecond = 100)
|
||||
|
||||
case object TwoFiftyPerSecond extends Throughput("TwoFiftyOPS", operationsPerSecond = 250)
|
||||
|
||||
case object FiveHundredPerSecond extends Throughput("FiveHundredOPS", operationsPerSecond = 500)
|
||||
|
||||
}
|
||||
|
||||
sealed abstract class TransactionSize(name: String, val kilobytes: Int)
|
||||
extends Envelope(TransactionSize.Prefix :+ name)
|
||||
|
||||
object TransactionSize {
|
||||
|
||||
private val Prefix = Envelope.Prefix :+ "TransactionSize"
|
||||
|
||||
val All: immutable.Seq[Envelope] =
|
||||
Vector(OneKilobyte, OneHundredKilobytes, OneMegabyte, FiveMegabytes, TwentyFiveMegabytes)
|
||||
|
||||
case object OneKilobyte extends TransactionSize("1KB", kilobytes = 1)
|
||||
|
||||
case object OneHundredKilobytes extends TransactionSize("100KB", kilobytes = 100)
|
||||
|
||||
case object OneMegabyte extends TransactionSize("1000KB", kilobytes = 1000)
|
||||
|
||||
case object FiveMegabytes extends TransactionSize("5000KB", kilobytes = 5000)
|
||||
|
||||
case object TwentyFiveMegabytes extends TransactionSize("25000KB", kilobytes = 25000)
|
||||
}
|
||||
|
||||
}
|
@ -1,445 +0,0 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.api.testtool.performance
|
||||
|
||||
import java.time.{Duration, Instant}
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import com.daml.ledger.api.testtool.infrastructure.Allocation._
|
||||
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
|
||||
import com.daml.ledger.api.testtool.infrastructure.{Allocation, Assertions, LedgerTestSuite}
|
||||
import com.daml.ledger.api.v1.command_completion_service.{
|
||||
CompletionEndRequest,
|
||||
CompletionStreamRequest,
|
||||
CompletionStreamResponse,
|
||||
}
|
||||
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
|
||||
import com.daml.ledger.api.v1.commands.{Command, Commands}
|
||||
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
|
||||
import com.daml.ledger.api.v1.transaction.Transaction
|
||||
import com.daml.ledger.api.v1.transaction_filter.{Filters, TransactionFilter}
|
||||
import com.daml.ledger.api.v1.transaction_service.{GetTransactionsRequest, GetTransactionsResponse}
|
||||
import com.daml.ledger.client.binding.{Primitive => P}
|
||||
import com.daml.ledger.test.performance.{PingPong => PingPongModule}
|
||||
import io.grpc.stub.StreamObserver
|
||||
import io.grpc.{Context, Status}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
import scalaz.syntax.tag._
|
||||
|
||||
import scala.collection.concurrent.TrieMap
|
||||
import scala.concurrent.{ExecutionContext, Future, Promise, blocking}
|
||||
import scala.util.{Failure, Random, Success, Try}
|
||||
|
||||
sealed trait PerformanceEnvelope[E <: Envelope] {
|
||||
|
||||
protected def logger: Logger
|
||||
|
||||
protected def envelope: E
|
||||
|
||||
protected def maxInflight: Int
|
||||
|
||||
protected def waitForParties(participants: Seq[Allocation.Participant]): Unit = {
|
||||
val (participantAlice, alice) = (participants.head.ledger, participants.head.parties.head)
|
||||
val (participantBob, bob) = (participants(1).ledger, participants(1).parties.head)
|
||||
val _ = participantAlice.waitForParties(Seq(participantBob), Set(alice, bob))
|
||||
}
|
||||
|
||||
/** swiss army knife for setting up envelope tests
|
||||
*
|
||||
* This function sends a series of pings from one participant to another,
|
||||
* using specific workflow ids to measure progress.
|
||||
*
|
||||
* The maxInflight parameter controls how many pings are on the way. In throughput tests,
|
||||
* we use it for back-pressure as most platforms don't support backpressue yet.
|
||||
*
|
||||
* The return value is the time it takes us to run all pings and the list of individual ping
|
||||
* times.
|
||||
*
|
||||
* The payload string is the string that we put on every ping.
|
||||
*
|
||||
* Therefore, this function allows us to test
|
||||
* - the max transaction size (1 ping with payload = 5kb string)
|
||||
* - throughput (e.g. 200 pings with 40 in-flight)
|
||||
* - latency (20 pings with 1 in-flight)
|
||||
*/
|
||||
protected def sendPings(
|
||||
from: Participant,
|
||||
to: Participant,
|
||||
workflowIds: List[String],
|
||||
payload: String,
|
||||
)(implicit ec: ExecutionContext): Future[(Duration, List[Duration])] = {
|
||||
|
||||
val (participantAlice, alice) = (from.ledger, from.parties.head)
|
||||
val (participantBob, bob) = (to.ledger, to.parties.head)
|
||||
val queued = new ConcurrentLinkedQueue[Promise[Unit]]()
|
||||
val inflight = new AtomicInteger(0)
|
||||
// used to track the duration of each ping (left is start time, right is elapsed once we know end-time)
|
||||
val timings = TrieMap[String, Either[Instant, Duration]]()
|
||||
val tracker = Promise[Either[String, Unit]]()
|
||||
|
||||
def sendPing(workflowId: String): Future[Unit] = {
|
||||
|
||||
val promise = Promise[Unit]()
|
||||
queued.add(promise)
|
||||
|
||||
// start one immediately (might be some other task we start)
|
||||
if (inflight.incrementAndGet() <= maxInflight) {
|
||||
Option(queued.poll()).foreach(_.success(()))
|
||||
}
|
||||
for {
|
||||
// wait for our turn
|
||||
_ <- blocking {
|
||||
promise.future
|
||||
}
|
||||
// build request
|
||||
request = submitRequest(
|
||||
participantAlice,
|
||||
alice,
|
||||
PingPongModule.Ping(payload, alice, List(bob)).create.command,
|
||||
workflowId,
|
||||
)
|
||||
_ = {
|
||||
logger.info(s"Submitting ping with workflowId=$workflowId")
|
||||
timings += workflowId -> Left(Instant.now)
|
||||
}
|
||||
// and submit it
|
||||
_ <- participantAlice.submit(request)
|
||||
} yield ()
|
||||
}
|
||||
|
||||
val awaiter = waitForAllTransactions(
|
||||
tracker,
|
||||
participantBob,
|
||||
bob,
|
||||
workflowIds.length,
|
||||
queued,
|
||||
inflight,
|
||||
timings,
|
||||
)
|
||||
for {
|
||||
end <- participantAlice.completionEnd(CompletionEndRequest(participantAlice.ledgerId))
|
||||
_ = listenCompletions(tracker, participantAlice, alice, end.offset)
|
||||
started = Instant.now
|
||||
_ <- Future.traverse(workflowIds)(sendPing)
|
||||
res <- awaiter
|
||||
} yield {
|
||||
res match {
|
||||
case Left(err) => Assertions.fail(err)
|
||||
case Right(_) =>
|
||||
val finished = Instant.now
|
||||
(
|
||||
Duration.between(started, finished),
|
||||
timings.values.flatMap(_.toOption.toList).toList,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def submitRequest(
|
||||
participant: ParticipantTestContext,
|
||||
party: P.Party,
|
||||
command: Command,
|
||||
commandAndWorkflowId: String,
|
||||
) = {
|
||||
new SubmitRequest(
|
||||
Some(
|
||||
new Commands(
|
||||
ledgerId = participant.ledgerId,
|
||||
applicationId = participant.applicationId,
|
||||
commandId = commandAndWorkflowId,
|
||||
workflowId = commandAndWorkflowId,
|
||||
party = party.unwrap,
|
||||
commands = Seq(command),
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
private def waitForAllTransactions(
|
||||
observedAll: Promise[Either[String, Unit]],
|
||||
participant: ParticipantTestContext,
|
||||
party: P.Party,
|
||||
numPings: Int,
|
||||
queue: ConcurrentLinkedQueue[Promise[Unit]],
|
||||
inflight: AtomicInteger,
|
||||
timings: TrieMap[String, Either[Instant, Duration]],
|
||||
)(implicit ec: ExecutionContext): Future[Either[String, Unit]] = {
|
||||
|
||||
val observed = new AtomicInteger(0)
|
||||
val context = Context.ROOT.withCancellation()
|
||||
|
||||
for {
|
||||
offset <- participant.currentEnd()
|
||||
} yield {
|
||||
context.run(() =>
|
||||
participant.transactionStream(
|
||||
GetTransactionsRequest(
|
||||
ledgerId = participant.ledgerId,
|
||||
begin = Some(offset),
|
||||
end = None,
|
||||
verbose = false,
|
||||
filter = Some(
|
||||
TransactionFilter(filtersByParty = Map(party.unwrap -> Filters(inclusive = None)))
|
||||
),
|
||||
),
|
||||
new StreamObserver[GetTransactionsResponse] {
|
||||
// find workflow ids and signal if we observed all expected
|
||||
@SuppressWarnings(Array("org.wartremover.warts.AnyVal"))
|
||||
override def onNext(value: GetTransactionsResponse): Unit = {
|
||||
value.transactions.foreach { tr: Transaction =>
|
||||
timings.get(tr.workflowId) match {
|
||||
case Some(Left(started)) =>
|
||||
val finished = Instant.now()
|
||||
val inf = inflight.decrementAndGet()
|
||||
val obs = observed.incrementAndGet()
|
||||
// start next ping
|
||||
Option(queue.poll()).foreach(_.success(()))
|
||||
logger.info(s"Observed ping ${tr.workflowId} (observed=$obs, inflight=$inf)")
|
||||
timings.update(tr.workflowId, Right(Duration.between(started, finished)))
|
||||
// signal via future that we are done
|
||||
if (observed.get() == numPings && !observedAll.isCompleted)
|
||||
observedAll.trySuccess(Right(()))
|
||||
// there shouldn't be running anything concurrently
|
||||
case None =>
|
||||
logger.error(
|
||||
s"Observed transaction with un-expected workflowId ${tr.workflowId}"
|
||||
)
|
||||
case Some(Right(_)) =>
|
||||
logger.error(s"Observed transaction with workflowId ${tr.workflowId} twice!")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def onError(t: Throwable): Unit = t match {
|
||||
case ex: io.grpc.StatusRuntimeException
|
||||
if ex.getStatus.getCode == io.grpc.Status.CANCELLED.getCode =>
|
||||
case _ => logger.error("GetTransactionResponse stopped due to an error", t)
|
||||
}
|
||||
|
||||
override def onCompleted(): Unit = {
|
||||
if (observed.get() != numPings) {
|
||||
logger.error(
|
||||
s"Transaction stream closed before I've observed all transactions. Missing are ${numPings - observed
|
||||
.get()}."
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
},
|
||||
)
|
||||
)
|
||||
}
|
||||
// ensure we cancel the stream once we've observed everything
|
||||
observedAll.future.map { x =>
|
||||
Try(context.cancel(Status.CANCELLED.asException())) match {
|
||||
case Success(_) => ()
|
||||
case Failure(ex) =>
|
||||
logger.error("Cancelling transaction stream failed with an exception", ex)
|
||||
}
|
||||
x
|
||||
}
|
||||
}
|
||||
|
||||
private def listenCompletions(
|
||||
tracker: Promise[Either[String, Unit]],
|
||||
sender: ParticipantTestContext,
|
||||
party: P.Party,
|
||||
offset: Option[LedgerOffset],
|
||||
)(implicit ec: ExecutionContext): Unit = {
|
||||
val context = Context.ROOT.withCancellation()
|
||||
|
||||
context.run(() =>
|
||||
sender.completionStream(
|
||||
CompletionStreamRequest(
|
||||
ledgerId = sender.ledgerId,
|
||||
applicationId = sender.applicationId,
|
||||
parties = Seq(party.unwrap),
|
||||
offset = offset,
|
||||
),
|
||||
new StreamObserver[CompletionStreamResponse] {
|
||||
@SuppressWarnings(Array("org.wartremover.warts.AnyVal"))
|
||||
override def onNext(value: CompletionStreamResponse): Unit = {
|
||||
value.completions.foreach { completion =>
|
||||
completion.status.foreach { status =>
|
||||
// TODO(rv) maybe, add re-submission logic once systems are smart enough to back-pressure
|
||||
if (status.code != 0) {
|
||||
if (status.code == io.grpc.Status.DEADLINE_EXCEEDED.getCode.value()) {
|
||||
logger.error(
|
||||
s"Command ${completion.commandId} timed-out. You might want to reduce the number of in-flight commands. $status"
|
||||
)
|
||||
} else {
|
||||
logger.error(s"Command ${completion.commandId} failed with $status")
|
||||
}
|
||||
// for now, we kill the test if we hit an error
|
||||
tracker.trySuccess(Left(s"Command ${completion.commandId} failed with $status"))
|
||||
} else {
|
||||
logger.debug(s"Command ${completion.commandId} succeeded")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def onError(t: Throwable): Unit = {}
|
||||
|
||||
override def onCompleted(): Unit = {}
|
||||
},
|
||||
)
|
||||
)
|
||||
tracker.future.map(_ => Try(context.cancel(Status.CANCELLED.asException())))
|
||||
()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object PerformanceEnvelope {
|
||||
|
||||
def apply[E <: Envelope](
|
||||
envelope: E,
|
||||
reporter: (String, Double) => Unit,
|
||||
): LedgerTestSuite =
|
||||
envelope match {
|
||||
case e: Envelope.Latency =>
|
||||
new LatencyTest(e, numPings = e.numPings, numWarmupPings = e.numPings, reporter = reporter)
|
||||
case e: Envelope.Throughput =>
|
||||
val numPings =
|
||||
Math.max(20, e.operationsPerSecond * 15) // test should run at least 15 seconds
|
||||
new ThroughputTest(
|
||||
envelope = e,
|
||||
maxInflight = e.operationsPerSecond * 5, // aiming for a latency of 5 seconds
|
||||
numPings = numPings,
|
||||
numWarmupPings = numPings,
|
||||
reporter = reporter,
|
||||
)
|
||||
case e: Envelope.TransactionSize => new TransactionSizeScaleTest(e)
|
||||
}
|
||||
|
||||
/** Throughput test
|
||||
*
|
||||
* @param numPings how many pings to run during the throughput test
|
||||
* @param maxInflight how many inflight commands we can have. set it high enough such that the system saturates, keep it low enough to not hit timeouts.
|
||||
* @param numWarmupPings how many pings to run before the perf test to warm up the system
|
||||
*/
|
||||
private final class ThroughputTest(
|
||||
override protected val envelope: Envelope.Throughput,
|
||||
override protected val maxInflight: Int,
|
||||
numPings: Int,
|
||||
numWarmupPings: Int,
|
||||
reporter: (String, Double) => Unit,
|
||||
) extends LedgerTestSuite
|
||||
with PerformanceEnvelope[Envelope.Throughput] {
|
||||
|
||||
override protected val logger: Logger = LoggerFactory.getLogger(getClass)
|
||||
|
||||
test(
|
||||
envelope.name.replace(".", ""),
|
||||
s"Verify that ledger passes the ${envelope.name} throughput envelope",
|
||||
allocate(SingleParty, SingleParty),
|
||||
)(implicit ec => { case participants =>
|
||||
waitForParties(participants.allocatedParticipants)
|
||||
|
||||
def runTest(num: Int, description: String): Future[(Duration, List[Duration])] =
|
||||
sendPings(
|
||||
from = participants.allocatedParticipants.head,
|
||||
to = participants.allocatedParticipants(1),
|
||||
workflowIds = (1 to num).map(x => s"$description-$x").toList,
|
||||
payload = description,
|
||||
)
|
||||
|
||||
for {
|
||||
_ <- runTest(numWarmupPings, "throughput-warmup")
|
||||
timings <- runTest(numPings, "throughput-test")
|
||||
} yield {
|
||||
val (elapsed, latencies) = timings
|
||||
val throughput = numPings / elapsed.toMillis.toDouble * 1000.0
|
||||
logger.info(
|
||||
s"Sending of $numPings succeeded after $elapsed, yielding a throughput of ${"%.2f" format throughput}."
|
||||
)
|
||||
reporter("rate", throughput)
|
||||
logger.info(
|
||||
s"Throughput latency stats: ${genStats(latencies.map(_.toMillis), (_, _) => ())}"
|
||||
)
|
||||
assert(
|
||||
throughput >= envelope.operationsPerSecond,
|
||||
s"Observed throughput of ${"%.2f" format throughput} is below the necessary envelope level ${envelope.operationsPerSecond}",
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private final class LatencyTest(
|
||||
override protected val envelope: Envelope.Latency,
|
||||
numPings: Int,
|
||||
numWarmupPings: Int,
|
||||
reporter: (String, Double) => Unit,
|
||||
) extends LedgerTestSuite
|
||||
with PerformanceEnvelope[Envelope.Latency] {
|
||||
|
||||
override protected val logger: Logger = LoggerFactory.getLogger(getClass)
|
||||
|
||||
override protected val maxInflight = 1 // will only be one
|
||||
require(numPings > 0 && numWarmupPings >= 0)
|
||||
|
||||
test(
|
||||
envelope.name.replace(".", ""),
|
||||
s"Verify that ledger passes the ${envelope.name} latency envelope",
|
||||
allocate(SingleParty, SingleParty),
|
||||
)(implicit ec => { case participants =>
|
||||
waitForParties(participants.allocatedParticipants)
|
||||
|
||||
sendPings(
|
||||
from = participants.allocatedParticipants.head,
|
||||
to = participants.allocatedParticipants(1),
|
||||
workflowIds = (1 to (numPings + numWarmupPings)).map(x => s"latency-$x").toList,
|
||||
payload = "latency",
|
||||
).map { case (_, latencies) =>
|
||||
val sample = latencies.drop(numWarmupPings).map(_.toMillis).sorted
|
||||
require(sample.length == numPings)
|
||||
val tailCount = sample.count(_ > envelope.latency.toMillis)
|
||||
val stats = genStats(sample, reporter)
|
||||
logger.info(s"Latency test finished: $stats")
|
||||
assert(
|
||||
tailCount <= numPings * 0.1,
|
||||
s"$tailCount out of $numPings are above the latency threshold. Stats are $stats",
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private def genStats(sample: List[Long], reporter: (String, Double) => Unit): String = {
|
||||
val num = sample.length.toDouble
|
||||
val avg = sample.sum / num
|
||||
val med = sample(sample.length / 2)
|
||||
val stddev = Math.sqrt(sample.map(x => (x - avg) * (x - avg)).sum / num)
|
||||
reporter("average", avg)
|
||||
reporter("median", med.toDouble)
|
||||
reporter("stddev", stddev)
|
||||
s"Sample size of ${sample.length}: avg=${"%.0f" format avg} ms, median=$med ms, stdev=${"%.0f" format stddev} ms"
|
||||
}
|
||||
|
||||
private final class TransactionSizeScaleTest(
|
||||
override protected val envelope: Envelope.TransactionSize
|
||||
) extends LedgerTestSuite
|
||||
with PerformanceEnvelope[Envelope.TransactionSize] {
|
||||
|
||||
override protected val logger: Logger = LoggerFactory.getLogger(getClass)
|
||||
override protected val maxInflight = 10
|
||||
|
||||
test(
|
||||
envelope.name.replace(".", ""),
|
||||
s"Verify that ledger passes the ${envelope.name} transaction size envelope",
|
||||
allocate(SingleParty, SingleParty),
|
||||
)(implicit ec => { case participants =>
|
||||
waitForParties(participants.allocatedParticipants)
|
||||
|
||||
sendPings(
|
||||
from = participants.allocatedParticipants.head,
|
||||
to = participants.allocatedParticipants(1),
|
||||
workflowIds = List("transaction-size"),
|
||||
payload = Random.alphanumeric.take(envelope.kilobytes * 1024).mkString(""),
|
||||
).map(_ => ())
|
||||
})
|
||||
}
|
||||
|
||||
}
|
@ -1,41 +0,0 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.api.testtool.performance
|
||||
|
||||
import java.nio.file.Path
|
||||
|
||||
import com.daml.ledger.api.testtool.infrastructure.{
|
||||
BenchmarkReporter,
|
||||
LedgerTestCase,
|
||||
LedgerTestSuite,
|
||||
}
|
||||
|
||||
import scala.collection.immutable.{SortedMap, SortedSet}
|
||||
|
||||
final class PerformanceTests(tests: SortedMap[String, LedgerTestSuite]) {
|
||||
def names: SortedSet[String] = tests.keySet
|
||||
|
||||
def isEmpty: Boolean = tests.isEmpty
|
||||
|
||||
def nonEmpty: Boolean = tests.nonEmpty
|
||||
|
||||
def filterByName(predicate: String => Boolean): PerformanceTests =
|
||||
new PerformanceTests(tests.view.filterKeys(predicate).to(SortedMap))
|
||||
|
||||
def cases: Vector[LedgerTestCase] = tests.values.view.flatMap(_.tests).toVector
|
||||
}
|
||||
|
||||
object PerformanceTests {
|
||||
def from(envelopes: Seq[Envelope], outputPath: Option[Path]): PerformanceTests = {
|
||||
val target =
|
||||
outputPath
|
||||
.map(BenchmarkReporter.toFile)
|
||||
.getOrElse(BenchmarkReporter.toStream(System.out))
|
||||
new PerformanceTests(
|
||||
envelopes.iterator
|
||||
.map(e => e.name -> PerformanceEnvelope(e, target.addReport))
|
||||
.to(SortedMap)
|
||||
)
|
||||
}
|
||||
}
|
@ -28,7 +28,6 @@ load("@os_info//:os_info.bzl", "is_windows")
|
||||
deps = [
|
||||
"//ledger/ledger-api-common",
|
||||
"//ledger/ledger-api-tests/infrastructure:infrastructure-%s" % lf_version,
|
||||
"//ledger/ledger-api-tests/performance:performance-%s" % lf_version,
|
||||
"//libs-scala/resources",
|
||||
"//libs-scala/resources-akka",
|
||||
"//libs-scala/resources-grpc",
|
||||
|
@ -4,12 +4,9 @@
|
||||
package com.daml.ledger.api.testtool.runner
|
||||
|
||||
import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite
|
||||
import com.daml.ledger.api.testtool.performance.PerformanceTests
|
||||
|
||||
trait AvailableTests {
|
||||
def defaultTests: Vector[LedgerTestSuite]
|
||||
|
||||
def optionalTests: Vector[LedgerTestSuite]
|
||||
|
||||
def performanceTests: PerformanceTests
|
||||
}
|
||||
|
@ -3,9 +3,6 @@
|
||||
|
||||
package com.daml.ledger.api.testtool.runner
|
||||
|
||||
import java.io.File
|
||||
import java.nio.file.Path
|
||||
|
||||
import com.daml.ledger.api.testtool.infrastructure.PartyAllocationConfiguration
|
||||
import com.daml.ledger.api.testtool.runner
|
||||
import com.daml.ledger.api.tls.TlsConfiguration
|
||||
@ -15,7 +12,6 @@ import scala.concurrent.duration.FiniteDuration
|
||||
final case class Config(
|
||||
participantsEndpoints: Vector[(String, Int)],
|
||||
maxConnectionAttempts: Int,
|
||||
darPackages: List[File],
|
||||
mustFail: Boolean,
|
||||
verbose: Boolean,
|
||||
timeoutScaleFactor: Double,
|
||||
@ -25,8 +21,6 @@ final case class Config(
|
||||
excluded: Set[String],
|
||||
included: Set[String],
|
||||
additional: Set[String],
|
||||
performanceTests: Set[String],
|
||||
performanceTestsReport: Option[Path],
|
||||
listTests: Boolean,
|
||||
listTestSuites: Boolean,
|
||||
shuffleParticipants: Boolean,
|
||||
@ -44,7 +38,6 @@ object Config {
|
||||
val default: Config = Config(
|
||||
participantsEndpoints = Vector.empty,
|
||||
maxConnectionAttempts = 10,
|
||||
darPackages = Nil,
|
||||
mustFail = false,
|
||||
verbose = false,
|
||||
timeoutScaleFactor = Defaults.TimeoutScaleFactor,
|
||||
@ -54,8 +47,6 @@ object Config {
|
||||
excluded = Set.empty,
|
||||
included = Set.empty,
|
||||
additional = Set.empty,
|
||||
performanceTests = Set.empty,
|
||||
performanceTestsReport = None,
|
||||
listTests = false,
|
||||
listTestSuites = false,
|
||||
shuffleParticipants = false,
|
||||
|
@ -4,12 +4,10 @@
|
||||
package com.daml.ledger.api.testtool.runner
|
||||
|
||||
import com.daml.ledger.api.testtool.infrastructure.{LedgerTestCase, LedgerTestSuite}
|
||||
import com.daml.ledger.api.testtool.performance.PerformanceTests
|
||||
|
||||
final class ConfiguredTests(availableTests: AvailableTests, config: Config) {
|
||||
val defaultTests: Vector[LedgerTestSuite] = availableTests.defaultTests
|
||||
val optionalTests: Vector[LedgerTestSuite] = availableTests.optionalTests
|
||||
val performanceTests: PerformanceTests = availableTests.performanceTests
|
||||
|
||||
val allTests: Vector[LedgerTestSuite] = defaultTests ++ optionalTests
|
||||
val missingTests: Set[String] = {
|
||||
@ -19,9 +17,6 @@ final class ConfiguredTests(availableTests: AvailableTests, config: Config) {
|
||||
)
|
||||
}
|
||||
|
||||
val missingPerformanceTests: Set[String] =
|
||||
config.performanceTests.filterNot(performanceTests.names(_))
|
||||
|
||||
val defaultCases: Vector[LedgerTestCase] = defaultTests.flatMap(_.tests)
|
||||
val optionalCases: Vector[LedgerTestCase] = optionalTests.flatMap(_.tests)
|
||||
val allCases: Vector[LedgerTestCase] = defaultCases ++ optionalCases
|
||||
|
@ -8,7 +8,6 @@ import java.nio.file.{Files, Paths, StandardCopyOption}
|
||||
import java.util.concurrent.Executors
|
||||
|
||||
import com.daml.ledger.api.testtool.infrastructure._
|
||||
import com.daml.ledger.api.testtool.performance.PerformanceTests
|
||||
import com.daml.ledger.api.testtool.runner.TestRunner._
|
||||
import com.daml.ledger.api.tls.TlsConfiguration
|
||||
import io.grpc.Channel
|
||||
@ -50,13 +49,6 @@ object TestRunner {
|
||||
tests.map(getName).sorted.foreach(println(_))
|
||||
}
|
||||
|
||||
private def printListOfPerformanceTests(tests: PerformanceTests): Unit = {
|
||||
println("Alternatively, you can run performance tests.")
|
||||
println("They are not run by default, but can be run with `--perf-tests=TEST-NAME`.")
|
||||
println()
|
||||
tests.names.foreach(println(_))
|
||||
}
|
||||
|
||||
private def printAvailableTestSuites(testSuites: Vector[LedgerTestSuite]): Unit = {
|
||||
println("Listing test suites. Run with --list-all to see individual tests.")
|
||||
printListOfTests(testSuites)(_.name)
|
||||
@ -95,30 +87,13 @@ final class TestRunner(availableTests: AvailableTests, config: Config) {
|
||||
sys.exit(64)
|
||||
}
|
||||
|
||||
if (tests.missingPerformanceTests.nonEmpty) {
|
||||
println(
|
||||
s"${tests.missingPerformanceTests.head} is not a valid performance test name. Use `--list` to see valid names."
|
||||
)
|
||||
sys.exit(64)
|
||||
}
|
||||
|
||||
val performanceTestsToRun = tests.performanceTests.filterByName(config.performanceTests)
|
||||
if (config.included.nonEmpty && performanceTestsToRun.nonEmpty) {
|
||||
println("Either regular or performance tests can be run, but not both.")
|
||||
sys.exit(64)
|
||||
}
|
||||
|
||||
if (config.listTestSuites) {
|
||||
printAvailableTestSuites(tests.allTests)
|
||||
println()
|
||||
printListOfPerformanceTests(tests.performanceTests)
|
||||
sys.exit(0)
|
||||
}
|
||||
|
||||
if (config.listTests) {
|
||||
printAvailableTests(tests.allTests)
|
||||
println()
|
||||
printListOfPerformanceTests(tests.performanceTests)
|
||||
sys.exit(0)
|
||||
}
|
||||
|
||||
@ -151,12 +126,7 @@ final class TestRunner(availableTests: AvailableTests, config: Config) {
|
||||
implicit val resourceManagementExecutionContext: ExecutionContext =
|
||||
ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
|
||||
|
||||
val runner =
|
||||
if (performanceTestsToRun.nonEmpty)
|
||||
newSequentialLedgerCasesRunner(config, performanceTestsToRun.cases)
|
||||
else
|
||||
newLedgerCasesRunner(config, testsToRun)
|
||||
|
||||
val runner = newLedgerCasesRunner(config, testsToRun)
|
||||
runner.flatMap(_.runTests(ExecutionContext.global)).onComplete {
|
||||
case Success(summaries) =>
|
||||
val excludedTestSummaries =
|
||||
@ -191,12 +161,6 @@ final class TestRunner(availableTests: AvailableTests, config: Config) {
|
||||
}
|
||||
}
|
||||
|
||||
private def newSequentialLedgerCasesRunner(
|
||||
config: Config,
|
||||
cases: Vector[LedgerTestCase],
|
||||
)(implicit executionContext: ExecutionContext): Future[LedgerTestCasesRunner] =
|
||||
createLedgerCasesRunner(config, cases, concurrentTestRuns = 1)
|
||||
|
||||
private def newLedgerCasesRunner(
|
||||
config: Config,
|
||||
cases: Vector[LedgerTestCase],
|
||||
|
@ -71,7 +71,6 @@ da_scala_binary(
|
||||
deps = [
|
||||
"//ledger/ledger-api-common",
|
||||
"//ledger/ledger-api-tests/infrastructure:infrastructure-%s" % lf_version,
|
||||
"//ledger/ledger-api-tests/performance:performance-%s" % lf_version,
|
||||
"//ledger/ledger-api-tests/runner:runner-%s" % lf_version,
|
||||
"//ledger/ledger-api-tests/suites:suites-%s" % lf_version,
|
||||
"//libs-scala/build-info",
|
||||
|
@ -4,7 +4,7 @@
|
||||
package com.daml.ledger.api.testtool
|
||||
|
||||
import java.io.File
|
||||
import java.nio.file.{Path, Paths}
|
||||
import java.nio.file.Paths
|
||||
|
||||
import com.daml.buildinfo.BuildInfo
|
||||
import com.daml.ledger.api.testtool.infrastructure.PartyAllocationConfiguration
|
||||
@ -23,8 +23,8 @@ object CliParser {
|
||||
argParser.parse(args, Config.default)
|
||||
|
||||
private def endpointRead: Read[(String, Int)] = new Read[(String, Int)] {
|
||||
val arity = 2
|
||||
val reads: String => (String, Int) = { s: String =>
|
||||
override val arity = 2
|
||||
override val reads: String => (String, Int) = { s: String =>
|
||||
splitAddress(s) match {
|
||||
case (k, v) => Read.stringRead.reads(k) -> Read.intRead.reads(v)
|
||||
}
|
||||
@ -38,8 +38,6 @@ object CliParser {
|
||||
case n: Int => (s.slice(0, n), s.slice(n + 1, s.length))
|
||||
}
|
||||
|
||||
private implicit val pathRead: Read[Path] = Read.reads(Paths.get(_))
|
||||
|
||||
private val argParser: OptionParser[Config] = new scopt.OptionParser[Config](Name) {
|
||||
head(
|
||||
"""The Ledger API Test Tool is a command line tool for testing the correctness of
|
||||
@ -154,18 +152,6 @@ object CliParser {
|
||||
|Mutually exclusive with `--include`.""".stripMargin
|
||||
)
|
||||
|
||||
opt[Seq[String]]("perf-tests")
|
||||
.action((inc, c) => c.copy(performanceTests = c.performanceTests ++ inc))
|
||||
.unbounded()
|
||||
.text("A comma-separated list of performance tests that should be run.")
|
||||
|
||||
opt[Path]("perf-tests-report")
|
||||
.action((inc, c) => c.copy(performanceTestsReport = Some(inc)))
|
||||
.optional()
|
||||
.text(
|
||||
"The path of the benchmark report file produced by performance tests (default: stdout)."
|
||||
)
|
||||
|
||||
opt[Unit]("shuffle-participants")
|
||||
.action((_, c) => c.copy(shuffleParticipants = true))
|
||||
.text(
|
||||
|
@ -4,7 +4,6 @@
|
||||
package com.daml.ledger.api.testtool
|
||||
|
||||
import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite
|
||||
import com.daml.ledger.api.testtool.performance.{Envelope, PerformanceTests}
|
||||
import com.daml.ledger.api.testtool.runner.{AvailableTests, TestRunner}
|
||||
|
||||
object Main {
|
||||
@ -16,9 +15,6 @@ object Main {
|
||||
|
||||
override def optionalTests: Vector[LedgerTestSuite] =
|
||||
Tests.optional()
|
||||
|
||||
override def performanceTests: PerformanceTests =
|
||||
PerformanceTests.from(Envelope.All, outputPath = config.performanceTestsReport)
|
||||
}
|
||||
new TestRunner(availableTests, config).runAndExit()
|
||||
}
|
||||
|
@ -259,22 +259,6 @@ da_scala_test_suite(
|
||||
"--verbose",
|
||||
],
|
||||
),
|
||||
conformance_test(
|
||||
name = "benchmark-performance-envelope-{}".format(db["name"]),
|
||||
ports = [6865],
|
||||
server = ":test-server-{}-bin".format(db["name"]),
|
||||
server_args = [
|
||||
"--contract-id-seeding=testing-weak",
|
||||
"--participant participant-id=example,port=6865",
|
||||
] + db.get("conformance_test_server_args", []),
|
||||
tags = db.get("benchmark_performance_envelope_tags", []),
|
||||
test_tool_args = [
|
||||
"--verbose",
|
||||
"--perf-tests=PerformanceEnvelope.Throughput.TwentyOPS",
|
||||
"--perf-tests=PerformanceEnvelope.Latency.1000ms",
|
||||
"--perf-tests=PerformanceEnvelope.TransactionSize.1000KB",
|
||||
],
|
||||
),
|
||||
)
|
||||
for db in supported_databases
|
||||
]
|
||||
|
@ -413,18 +413,3 @@ conformance_test(
|
||||
"--verbose",
|
||||
],
|
||||
)
|
||||
|
||||
server_conformance_test(
|
||||
name = "benchmark-performance-envelope",
|
||||
server_args = [
|
||||
"--contract-id-seeding=testing-weak",
|
||||
"--participant=participant-id=example,port=6865",
|
||||
],
|
||||
servers = SERVERS,
|
||||
test_tool_args = [
|
||||
"--verbose",
|
||||
"--perf-tests=PerformanceEnvelope.Throughput.TwentyOPS",
|
||||
"--perf-tests=PerformanceEnvelope.Latency.1000ms",
|
||||
"--perf-tests=PerformanceEnvelope.TransactionSize.1000KB",
|
||||
],
|
||||
)
|
||||
|
@ -753,45 +753,3 @@ These tests check that all the services which are part of the Ledger API behave
|
||||
as expected, with a particular attention to ensure that issuing commands and
|
||||
reading transactions respect the confidentiality and privacy guarantees defined
|
||||
by the Daml Ledger Model.
|
||||
|
||||
##Performance envelope
|
||||
|
||||
Furthermore, this implementation is regularly tested to comply with the Daml
|
||||
Ledger Implementation Performance Envelope tests.
|
||||
|
||||
In particular, the tests are run to ensure that *Sandbox on X* can:
|
||||
|
||||
- process transactions as large as 1 MB
|
||||
- have a tail latency no greater than 1 second when issuing 20 pings
|
||||
- have a throughput of 20 pings per second
|
||||
|
||||
You can read more on performance tests in the documentation of the
|
||||
[Ledger API Test Tool](https://docs.daml.com/tools/ledger-api-test-tool/index.html#performance-tests).
|
||||
|
||||
##Replicate performance envelope tests
|
||||
|
||||
The following setup has been used to run the performance envelope tests:
|
||||
|
||||
- PostgreSQL server: a GCP Cloud SQL managed instance using PostgreSQL 12, with
|
||||
1 vCPU, 3.75 GB of RAM, 250 MB/s of network throughput, a 10 GB SDD HD, 1.2
|
||||
MB/s of R/W disk throughput, 8 RIOPS and 15 WIOPS, no automatic failover or
|
||||
disk increase, default PostgreSQL 12 configuration.
|
||||
|
||||
- *Sandbox on X* server: a GCP N1-Standard-1 instance, with 1 vCPU, 3.75
|
||||
GB of RAM, Ubuntu 20.04 LTS (64 bit), 10 GB boot disk, OpenJDK 1.8.0_242
|
||||
|
||||
- Ledger API test tool client: a GCP F1-Micro instance, with 1 shared vCPU, 614
|
||||
MB of RAM, Ubuntu 20.04 LTS (64 bit), 10 GB boot disk, OpenJDK 1.8.0_242
|
||||
|
||||
The three instances were in the same region and availability zone to minimize
|
||||
the latency between the three.
|
||||
|
||||
The tests run to evaluate the performance envelope are:
|
||||
|
||||
- PerformanceEnvelope.Latency.1000ms
|
||||
- PerformanceEnvelope.Throughput.TwentyOPS
|
||||
- PerformanceEnvelope.TransactionSize.1000KB
|
||||
|
||||
Please refer to the documentation for the Ledger API Test Tool to learn how to
|
||||
run these tests.
|
||||
|
||||
|
@ -105,10 +105,6 @@
|
||||
type: jar-scala
|
||||
- target: //ledger/ledger-api-tests/infrastructure:infrastructure-1.dev
|
||||
type: jar-scala
|
||||
- target: //ledger/ledger-api-tests/performance:performance-1.14
|
||||
type: jar-scala
|
||||
- target: //ledger/ledger-api-tests/performance:performance-1.dev
|
||||
type: jar-scala
|
||||
- target: //ledger/ledger-api-tests/runner:runner-1.14
|
||||
type: jar-scala
|
||||
- target: //ledger/ledger-api-tests/runner:runner-1.dev
|
||||
|
Loading…
Reference in New Issue
Block a user