[JSON-API] Port of http-perf-test to sandbox(aka next) (#11543)

* Changes to migrate http-perf-test back to sandbox with more parallelization for single user scenarios.
Increased parallelization is due to the architectural changes in sandbox where it uses
a tick every 100 millis to trigger stuff/data to be available on the read side

CHANGELOG_BEGIN
CHANGELOG_END

* Parallelization fixes for scenarios ExerciseCommand and SyncQueryNewAcs scenarios

* refactor sequential scenario run, make query part of SyncQueryVariableAcs run with single user
This commit is contained in:
akshayshirahatti-da 2021-11-09 08:44:37 +00:00 committed by GitHub
parent 56a6db8f73
commit 8d2b1b9ffe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 85 additions and 45 deletions

View File

@ -49,12 +49,13 @@ class AsyncQueryConstantAcs
pause(1.second)
}
.exec(
1.to(numberOfRuns).map(runId => exec(query(runId)))
1.to(numberOfRuns / defaultNumUsers).map(runId => exec(query(runId)))
)
setUp(
fillAcsScenario(wantedAcsSize, silent = true).inject(atOnceUsers(1)),
asyncQueryScenario.inject(atOnceUsers(1)),
fillAcsScenario(wantedAcsSize, silent = true).inject(atOnceUsers(defaultNumUsers)).andThen {
asyncQueryScenario.inject(atOnceUsers(defaultNumUsers))
}
).protocols(httpProtocol)
}

View File

@ -23,15 +23,16 @@ class CreateAndExerciseCommand extends Simulation with SimulationConfig {
}
}"""
private val numberOfRuns = 2000
private val request = http("CreateAndExerciseCommand")
.post("/v1/create-and-exercise")
.body(StringBody(jsonCommand))
private val scn = scenario("CreateAndExerciseCommandScenario")
.repeat(2000)(exec(request.silent)) // server warmup
.repeat(2000)(exec(request))
.repeat(numberOfRuns / defaultNumUsers)(exec(request.silent)) // server warmup
.repeat(numberOfRuns / defaultNumUsers)(exec(request))
setUp(
scn.inject(atOnceUsers(1))
scn.inject(atOnceUsers(defaultNumUsers))
).protocols(httpProtocol)
}

View File

@ -19,14 +19,15 @@ class CreateCommand extends Simulation with SimulationConfig {
}
}"""
private val numberOfRuns = 1000
private val request = http("CreateCommand")
.post("/v1/create")
.body(StringBody(jsonCommand))
private val scn = scenario("CreateCommandScenario")
.repeat(1000)(exec(request))
.repeat(numberOfRuns / defaultNumUsers)(exec(request))
setUp(
scn.inject(atOnceUsers(1))
scn.inject(atOnceUsers(defaultNumUsers))
).protocols(httpProtocol)
}

View File

@ -28,6 +28,7 @@ class ExerciseCommand extends Simulation with SimulationConfig {
}
}"""
private val numberOfRuns = 2000
private val createRequest = http("CreateCommand")
.post("/v1/create")
.body(StringBody(createCommand))
@ -37,18 +38,24 @@ class ExerciseCommand extends Simulation with SimulationConfig {
.post("/v1/exercise")
.body(StringBody(exerciseCommand))
private val scn = scenario("ExerciseCommandScenario")
.repeat(2000)(exec(createRequest.silent)) // populate the ACS
.exec(
// retrieve all contractIds
http("GetACS")
.get("/v1/query")
.check(status.is(200), jsonPath("$.result[*].contractId").findAll.saveAs("contractIds"))
.silent
private val scn = scenario("ExerciseCommandScenario-Create")
.repeat(numberOfRuns / defaultNumUsers)(exec(createRequest.silent)) // populate the ACS
private val queryScn = scenario("ExerciseCommandScenario-QueryAndExercise")
.repeat(1)(
exec(
// retrieve all contractIds
http("GetACS")
.get("/v1/query")
.check(status.is(200), jsonPath("$.result[*].contractId").findAll.saveAs("contractIds"))
.silent
)
)
.foreach("${contractIds}", "contractId")(exec(exerciseRequest))
setUp(
scn.inject(atOnceUsers(1))
scn
.inject(atOnceUsers(defaultNumUsers))
.andThen(queryScn.inject(atOnceUsers(1)))
).protocols(httpProtocol)
}

View File

@ -12,7 +12,7 @@ import io.gatling.http.check.HttpCheck
import io.gatling.http.request.builder.HttpRequestBuilder
private[scenario] trait HasCreateRequest {
this: HasRandomAmount =>
this: HasRandomAmount with SimulationConfig =>
private lazy val acsQueue: BlockingQueue[String] = new LinkedBlockingQueue[String]()
@ -40,7 +40,7 @@ private[scenario] trait HasCreateRequest {
def fillAcsScenario(size: Int, silent: Boolean): ScenarioBuilder =
scenario(s"FillAcsScenario, size: $size")
.doWhile(_ => this.acsSize() < size) {
.repeat(size / defaultNumUsers) {
feed(Iterator.continually(Map("amount" -> randomAmount())))
.group("FillAcsGroup") {
val create =

View File

@ -19,6 +19,7 @@ private[scenario] trait SimulationConfig {
.authorizationHeader(s"Bearer $jwt")
.contentTypeHeader("application/json")
protected[this] val defaultNumUsers = 10
private lazy val hostAndPort: String = System.getProperty(HostAndPortKey, "localhost:7575")
protected[this] lazy val jwt: String = System.getProperty(JwtKey, aliceJwt)

View File

@ -31,18 +31,18 @@ class SyncQueryConstantAcs extends Simulation with SimulationConfig with HasRand
}"""))
private val scn = scenario("SyncQueryScenario")
.repeat(5000) {
.repeat(5000 / defaultNumUsers) {
// populate the ACS
feed(Iterator.continually(Map("amount" -> randomAmount())))
.exec(createRequest.silent)
}
.repeat(500) {
.repeat(500 / defaultNumUsers) {
// run queries
feed(Iterator.continually(Map("amount" -> randomAmount())))
.exec(queryRequest)
}
setUp(
scn.inject(atOnceUsers(1))
scn.inject(atOnceUsers(defaultNumUsers))
).protocols(httpProtocol)
}

View File

@ -94,12 +94,12 @@ class SyncQueryMegaAcs extends Simulation with SimulationConfig with HasRandomAm
scenario(s"SyncQueryMegaScenario $scnName")
.exec(createRequest.silent)
// populate the ACS
.repeat(10, "amount") {
.repeat(10 / defaultNumUsers, "amount") {
feed(Iterator continually env)
.exec(createManyRequest.silent)
}
// run queries
.repeat(500) {
.repeat(500 / defaultNumUsers) {
// unless we request under Alice, we don't get negatives in the DB
def m(amount: Int, reqJwt: String, templateId: String): Record[Any] =
Map("amount" -> amount, "reqJwt" -> reqJwt, "templateId" -> templateId)
@ -116,7 +116,7 @@ class SyncQueryMegaAcs extends Simulation with SimulationConfig with HasRandomAm
private val scn = scns.head
setUp(
scn.inject(atOnceUsers(1))
scn.inject(atOnceUsers(defaultNumUsers))
).protocols(httpProtocol)
}

View File

@ -5,6 +5,9 @@ package com.daml.http.perf.scenario
import io.gatling.core.Predef._
import io.gatling.http.Predef._
import io.gatling.http.check.HttpCheck
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
class SyncQueryNewAcs
@ -21,24 +24,29 @@ class SyncQueryNewAcs
private val syncQueryNewAcs =
scenario(s"SyncQueryNewAcs, numberOfRuns: $numberOfRuns, ACS size: $wantedAcsSize")
.repeat(numberOfRuns) {
.repeat(numberOfRuns / defaultNumUsers) {
val acsQueue: BlockingQueue[String] = new LinkedBlockingQueue[String]()
val captureContractId: HttpCheck =
jsonPath("$.result.contractId").transform(x => acsQueue.put(x))
group("Populate ACS") {
doWhile(_ => acsSize() < wantedAcsSize) {
doWhile(_ => acsQueue.size() < wantedAcsSize) {
feed(Iterator.continually(Map("amount" -> String.valueOf(randomAmount()))))
.exec(randomAmountCreateRequest.check(status.is(200), captureContractId).silent)
.exec {
randomAmountCreateRequest.check(status.is(200), captureContractId).silent
}
}
}.group("Run Query") {
feed(Iterator.continually(Map("amount" -> String.valueOf(randomAmount()))))
.exec(randomAmountQueryRequest.notSilent)
}.group("Archive ACS") {
doWhile(_ => acsSize() > 0) {
feed(Iterator.continually(Map("archiveContractId" -> removeNextContractIdFromAcs())))
doWhile(_ => acsQueue.size() > 0) {
feed(Iterator.continually(Map("archiveContractId" -> acsQueue.poll())))
.exec(archiveRequest.silent)
}
}
}
setUp(
syncQueryNewAcs.inject(atOnceUsers(1))
syncQueryNewAcs.inject(atOnceUsers(defaultNumUsers))
).protocols(httpProtocol)
}

View File

@ -43,7 +43,8 @@ class SyncQueryVariableAcs
}
setUp(
fillAcsScenario(wantedAcsSize, silent = true).inject(atOnceUsers(1)),
syncQueryScenario.inject(atOnceUsers(1)),
fillAcsScenario(wantedAcsSize, silent = true).inject(atOnceUsers(defaultNumUsers)).andThen {
syncQueryScenario.inject(atOnceUsers(1))
}
).protocols(httpProtocol)
}

View File

@ -60,12 +60,12 @@ hj_scalacopts = lf_scalacopts + [
"//ledger/participant-integration-api",
"//ledger/sandbox",
"//ledger/sandbox:sandbox-scala-tests-lib",
"//ledger/sandbox-classic",
"//ledger/sandbox-common",
"//ledger/sandbox-common:sandbox-common-scala-tests-lib",
"//libs-scala/contextualized-logging",
"//libs-scala/db-utils",
"//libs-scala/ports",
"//libs-scala/resources",
"@maven//:io_dropwizard_metrics_metrics_core",
],
)

View File

@ -37,13 +37,14 @@ import com.daml.ledger.client.configuration.{
LedgerIdRequirement,
}
import com.daml.ledger.client.withoutledgerid.{LedgerClient => DamlLedgerClient}
import com.daml.ledger.resources.ResourceContext
import com.daml.logging.LoggingContextOf
import com.daml.metrics.Metrics
import com.daml.platform.apiserver.SeedService.Seeding
import com.daml.platform.common.LedgerIdMode
import com.daml.platform.sandbox
import com.daml.platform.sandbox.SandboxServer
import com.daml.platform.sandbox.SandboxBackend
import com.daml.platform.sandbox.config.SandboxConfig
import com.daml.platform.sandboxnext.Runner
import com.daml.platform.services.time.TimeProviderType
import com.daml.ports.Port
import com.typesafe.scalalogging.LazyLogging
@ -150,19 +151,35 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside {
useTls: UseTls = UseTls.NoTls,
authService: Option[AuthService] = None,
)(testFn: (Port, DamlLedgerClient, LedgerId) => Future[A])(implicit
mat: Materializer,
aesf: ExecutionSequencerFactory,
ec: ExecutionContext,
): Future[A] = {
val ledgerId = LedgerId(testName)
val applicationId = ApplicationId(testName)
implicit val resourceContext: ResourceContext = ResourceContext(ec)
val ledgerF = for {
ledger <- Future(
new SandboxServer(ledgerConfig(Port.Dynamic, dars, ledgerId, authService, useTls), mat)
urlResource <- Future(
SandboxBackend.H2Database.owner
.map(info => Some(info.jdbcUrl))
.acquire()
)
port <- ledger.portF
jdbcUrl <- urlResource.asFuture
ledger <- Future(
new Runner(
ledgerConfig(
Port.Dynamic,
dars,
ledgerId,
useTls = useTls,
authService = authService,
jdbcUrl = jdbcUrl,
)
)
.acquire()
)
port <- ledger.asFuture
} yield (ledger, port)
val clientF: Future[DamlLedgerClient] = for {
@ -179,11 +196,12 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside {
a <- testFn(ledgerPort, client, ledgerId)
} yield a
fa.onComplete { _ =>
ledgerF.foreach(_._1.close())
fa.transformWith { ta =>
ledgerF
.flatMap(_._1.release())
.fallbackTo(Future.unit)
.transform(_ => ta)
}
fa
}
private def ledgerConfig(
@ -192,10 +210,12 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside {
ledgerId: LedgerId,
authService: Option[AuthService],
useTls: UseTls,
jdbcUrl: Option[String],
): SandboxConfig =
sandbox.DefaultConfig.copy(
SandboxConfig.defaultConfig.copy(
port = ledgerPort,
damlPackages = dars,
jdbcUrl = jdbcUrl,
timeProviderType = Some(TimeProviderType.WallClock),
tlsConfig = if (useTls) Some(serverTlsConfig) else None,
ledgerIdMode = LedgerIdMode.Static(ledgerId),