mirror of
https://github.com/digital-asset/daml.git
synced 2024-11-09 15:37:05 +03:00
[JSON-API] Remove dependency on user provided tokens for the perf runner & refactor main (#13114)
* Don't error when user tokens are provided for the perf runner changelog_begin changelog_end * Better token parsing error handling & extract user id to allocate the user if so * make the code prettier * Fix & simplify token parsing * fix formatting of bazel file * Update ledger-service/http-json-perf/src/main/scala/com/daml/http/perf/Main.scala Co-authored-by: Raymond Roestenburg <98821776+ray-roestenburg-da@users.noreply.github.com> * correctly handle the response of createUser & don't throw if no LedgerId was found in the token * Wrap exceptions in Futures * Refactor Main.scala of the perf runner completely & remove dependency on user provided JWT's * Minimize diff * simplify code further * Update ledger-service/http-json-perf/src/main/scala/com/daml/http/perf/Main.scala Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com> * Update ledger-service/http-json-perf/src/main/scala/com/daml/http/perf/Main.scala Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com> * Update ledger-service/http-json-perf/src/main/scala/com/daml/http/perf/Main.scala Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com> * Fix build * Don't pass a jwt to the perf runner anymore everywhere it was used & fix ledger id to be right * Minimize diff Co-authored-by: Raymond Roestenburg <98821776+ray-roestenburg-da@users.noreply.github.com> Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>
This commit is contained in:
parent
825bf08846
commit
0dc167fa48
@ -151,8 +151,6 @@ jobs:
|
||||
bazel build //docs:quickstart-model
|
||||
DAR="${PWD}/bazel-bin/docs/quickstart-model.dar"
|
||||
|
||||
JWT="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJodHRwczovL2RhbWwuY29tL2xlZGdlci1hcGkiOnsibGVkZ2VySWQiOiJNeUxlZGdlciIsImFwcGxpY2F0aW9uSWQiOiJmb29iYXIiLCJhY3RBcyI6WyJBbGljZSJdfX0.VdDI96mw5hrfM5ZNxLyetSVwcD7XtLT4dIdHIOa9lcU"
|
||||
|
||||
START=$(git log -n1 --format=%cd --date=format:%Y%m%d).$(git rev-list --count HEAD).$(Build.BuildId).$(git log -n1 --format=%h --abbrev=8)
|
||||
REPORT_ID="http_json_perf_results_${START}"
|
||||
OUT="$(Build.StagingDirectory)/${REPORT_ID}"
|
||||
@ -161,8 +159,7 @@ jobs:
|
||||
bazel run //ledger-service/http-json-perf:http-json-perf-binary-ce -- \
|
||||
--scenario=${scenario} \
|
||||
--dars=${DAR} \
|
||||
--reports-dir=${OUT} \
|
||||
--jwt=${JWT}
|
||||
--reports-dir=${OUT}
|
||||
done
|
||||
|
||||
GZIP=-9 tar -zcvf ${OUT}.tgz ${OUT}
|
||||
@ -242,7 +239,6 @@ jobs:
|
||||
# ]
|
||||
# }
|
||||
# }
|
||||
JWT="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJodHRwczovL2RhbWwuY29tL2xlZGdlci1hcGkiOnsibGVkZ2VySWQiOiJNeUxlZGdlciIsImFwcGxpY2F0aW9uSWQiOiJmb29iYXIiLCJhY3RBcyI6WyJBbGljZSJdfX0.VdDI96mw5hrfM5ZNxLyetSVwcD7XtLT4dIdHIOa9lcU"
|
||||
|
||||
METADATA=$(git log -n1 --format=%cd --date=format:%Y%m%d).$(git rev-list --count HEAD).$(Build.BuildId).$(git log -n1 --format=%h --abbrev=8)
|
||||
REPORT_ID="http_json_perf_${QUERY_STORE}_results_${METADATA}"
|
||||
@ -279,7 +275,6 @@ jobs:
|
||||
--scenario=com.daml.http.perf.scenario.MultiUserQueryScenario \
|
||||
--dars=${DAR} \
|
||||
--reports-dir=${OUT}/${CASE} \
|
||||
--jwt=${JWT} \
|
||||
--query-store-index=${QUERY_STORE} > "${LOG_DIR}/${CASE}_log.out"
|
||||
done
|
||||
for KEY in $READ_PERF_KEYS; do
|
||||
@ -297,7 +292,6 @@ jobs:
|
||||
--scenario=com.daml.http.perf.scenario.MultiUserQueryScenario \
|
||||
--dars=${DAR} \
|
||||
--reports-dir=${OUT}/${TEST_CASE} \
|
||||
--jwt=${JWT} \
|
||||
--query-store-index=${QUERY_STORE} > "${LOG_DIR}/${TEST_CASE}_log.out"
|
||||
|
||||
for KEY in $READ_PERF_KEYS; do
|
||||
|
@ -57,7 +57,6 @@ perf_runtime_deps = {
|
||||
deps = [
|
||||
"//language-support/scala/bindings-akka",
|
||||
"//ledger-api/rs-grpc-bridge",
|
||||
"//ledger-service/fetch-contracts",
|
||||
"@maven//:io_gatling_gatling_netty_util",
|
||||
"@maven//:io_netty_netty_common",
|
||||
"@maven//:io_netty_netty_transport",
|
||||
@ -66,6 +65,7 @@ perf_runtime_deps = {
|
||||
"//ledger-service/http-json-cli:{}".format(edition),
|
||||
"//ledger-service/http-json-testing:{}".format(edition),
|
||||
"//ledger-service/jwt",
|
||||
"//ledger/ledger-api-auth",
|
||||
"//libs-scala/db-utils",
|
||||
"//libs-scala/gatling-utils",
|
||||
"//libs-scala/oracle-testing",
|
||||
|
@ -30,8 +30,7 @@ $ bazel run //ledger-service/http-json-perf:http-json-perf-binary -- --help
|
||||
$ bazel run //ledger-service/http-json-perf:http-json-perf-binary -- \
|
||||
--scenario=com.daml.http.perf.scenario.CreateCommand \
|
||||
--dars="${PWD}/bazel-bin/docs/quickstart-model.dar" \
|
||||
--reports-dir=/home/leos/tmp/results/ \
|
||||
--jwt="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJodHRwczovL2RhbWwuY29tL2xlZGdlci1hcGkiOnsibGVkZ2VySWQiOiJNeUxlZGdlciIsImFwcGxpY2F0aW9uSWQiOiJmb29iYXIiLCJhY3RBcyI6WyJBbGljZSJdfX0.VdDI96mw5hrfM5ZNxLyetSVwcD7XtLT4dIdHIOa9lcU"
|
||||
--reports-dir=/home/leos/tmp/results/
|
||||
```
|
||||
|
||||
## 2.3 Running MultiUserQueryScenario
|
||||
@ -51,7 +50,7 @@ We can control a few scenario parameters i.e `NUM_RECORDS` `NUM_QUERIES` `NUM_RE
|
||||
|
||||
```
|
||||
|
||||
USE_DEFAULT_USER=true RETAIN_DATA=true RUN_MODE="populateCache" bazel run //ledger-service/http-json-perf:http-json-perf-binary-ee -- --scenario=com.daml.http.perf.scenario.OracleMultiUserQueryScenario --jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJodHRwczovL2RhbWwuY29tL2xlZGdlci1hcGkiOnsibGVkZ2VySWQiOiJNeUxlZGdlciIsImFwcGxpY2F0aW9uSWQiOiJmb29iYXIiLCJhY3RBcyI6WyJBbGljZSJdfX0.VdDI96mw5hrfM5ZNxLyetSVwcD7XtLT4dIdHIOa9lcU --dars=$PWD/bazel-bin/ledger-service/http-json-perf/LargeAcs.dar --query-store-index oracle
|
||||
USE_DEFAULT_USER=true RETAIN_DATA=true RUN_MODE="populateCache" bazel run //ledger-service/http-json-perf:http-json-perf-binary-ee -- --scenario=com.daml.http.perf.scenario.OracleMultiUserQueryScenario --dars=$PWD/bazel-bin/ledger-service/http-json-perf/LargeAcs.dar --query-store-index oracle
|
||||
|
||||
```
|
||||
|
||||
@ -61,7 +60,7 @@ Query contracts by the defined key field.
|
||||
|
||||
```
|
||||
|
||||
USE_DEFAULT_USER=true RETAIN_DATA=true RUN_MODE="fetchByKey" NUM_QUERIES=100 bazel run //ledger-service/http-json-perf:http-json-perf-binary-ee -- --scenario=com.daml.http.perf.scenario.OracleMultiUserQueryScenario --jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJodHRwczovL2RhbWwuY29tL2xlZGdlci1hcGkiOnsibGVkZ2VySWQiOiJNeUxlZGdlciIsImFwcGxpY2F0aW9uSWQiOiJmb29iYXIiLCJhY3RBcyI6WyJBbGljZSJdfX0.VdDI96mw5hrfM5ZNxLyetSVwcD7XtLT4dIdHIOa9lcU --dars=$PWD/bazel-bin/ledger-service/http-json-perf/LargeAcs.dar --query-store-index oracle
|
||||
USE_DEFAULT_USER=true RETAIN_DATA=true RUN_MODE="fetchByKey" NUM_QUERIES=100 bazel run //ledger-service/http-json-perf:http-json-perf-binary-ee -- --scenario=com.daml.http.perf.scenario.OracleMultiUserQueryScenario --dars=$PWD/bazel-bin/ledger-service/http-json-perf/LargeAcs.dar --query-store-index oracle
|
||||
|
||||
```
|
||||
|
||||
@ -71,7 +70,7 @@ Query contracts by a field on the payload which is the `currency` in this case.
|
||||
|
||||
```
|
||||
|
||||
USE_DEFAULT_USER=true RETAIN_DATA=true RUN_MODE="fetchByQuery" bazel run //ledger-service/http-json-perf:http-json-perf-binary-ee -- --scenario=com.daml.http.perf.scenario.OracleMultiUserQueryScenario --jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJodHRwczovL2RhbWwuY29tL2xlZGdlci1hcGkiOnsibGVkZ2VySWQiOiJNeUxlZGdlciIsImFwcGxpY2F0aW9uSWQiOiJmb29iYXIiLCJhY3RBcyI6WyJBbGljZSJdfX0.VdDI96mw5hrfM5ZNxLyetSVwcD7XtLT4dIdHIOa9lcU --dars=$PWD/bazel-bin/ledger-service/http-json-perf/LargeAcs.dar --query-store-index oracle
|
||||
USE_DEFAULT_USER=true RETAIN_DATA=true RUN_MODE="fetchByQuery" bazel run //ledger-service/http-json-perf:http-json-perf-binary-ee -- --scenario=com.daml.http.perf.scenario.OracleMultiUserQueryScenario --dars=$PWD/bazel-bin/ledger-service/http-json-perf/LargeAcs.dar --query-store-index oracle
|
||||
|
||||
```
|
||||
|
||||
|
@ -5,9 +5,6 @@ package com.daml.http.perf
|
||||
|
||||
import java.io.File
|
||||
|
||||
import com.daml.jwt.JwtDecoder
|
||||
import com.daml.jwt.domain.Jwt
|
||||
import scalaz.{Applicative, Traverse}
|
||||
import scopt.RenderingMode
|
||||
|
||||
import Config.QueryStoreIndex
|
||||
@ -19,7 +16,6 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
|
||||
private[perf] final case class Config[+S](
|
||||
scenario: S,
|
||||
dars: List[File],
|
||||
jwt: Jwt,
|
||||
reportsDir: File,
|
||||
maxDuration: Option[FiniteDuration],
|
||||
queryStoreIndex: QueryStoreIndex,
|
||||
@ -40,21 +36,11 @@ private[perf] object Config {
|
||||
Config[String](
|
||||
scenario = "",
|
||||
dars = List.empty,
|
||||
jwt = Jwt(""),
|
||||
reportsDir = new File(""),
|
||||
maxDuration = None,
|
||||
queryStoreIndex = QueryStoreIndex.No,
|
||||
)
|
||||
|
||||
implicit val configInstance: Traverse[Config] = new Traverse[Config] {
|
||||
override def traverseImpl[G[_]: Applicative, A, B](
|
||||
fa: Config[A]
|
||||
)(f: A => G[B]): G[Config[B]] = {
|
||||
import scalaz.syntax.functor._
|
||||
f(fa.scenario).map(b => fa.copy(scenario = b))
|
||||
}
|
||||
}
|
||||
|
||||
def parseConfig(args: collection.Seq[String]): Option[Config[String]] =
|
||||
configParser.parse(args, Config.Empty)
|
||||
|
||||
@ -77,12 +63,6 @@ private[perf] object Config {
|
||||
.required()
|
||||
.text("DAR files to pass to Sandbox.")
|
||||
|
||||
opt[String]("jwt")
|
||||
.action((x, c) => c.copy(jwt = Jwt(x)))
|
||||
.required()
|
||||
.validate(validateJwt)
|
||||
.text("JWT token to use when connecting to JSON API.")
|
||||
|
||||
opt[QueryStoreIndex]("query-store-index")
|
||||
.action((x, c) => c.copy(queryStoreIndex = x))
|
||||
.optional()
|
||||
@ -103,18 +83,6 @@ private[perf] object Config {
|
||||
)
|
||||
}
|
||||
|
||||
private def validateJwt(s: String): Either[String, Unit] = {
|
||||
import scalaz.syntax.show._
|
||||
|
||||
JwtDecoder
|
||||
.decode(Jwt(s))
|
||||
.bimap(
|
||||
error => error.shows,
|
||||
_ => (),
|
||||
)
|
||||
.toEither
|
||||
}
|
||||
|
||||
sealed abstract class QueryStoreIndex extends Product with Serializable
|
||||
object QueryStoreIndex {
|
||||
case object No extends QueryStoreIndex
|
||||
|
@ -4,32 +4,20 @@
|
||||
package com.daml.http.perf
|
||||
|
||||
import java.nio.file.{Files, Path}
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.Materializer
|
||||
import com.daml.dbutils
|
||||
import com.daml.gatling.stats.{SimulationLog, SimulationLogSyntax}
|
||||
import com.daml.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory}
|
||||
import com.daml.http.HttpServiceTestFixture.{withLedger, withHttpService}
|
||||
import com.daml.http.domain.{JwtPayload, LedgerId}
|
||||
import com.daml.http.HttpServiceTestFixture.{withHttpService, withLedger}
|
||||
import com.daml.http.perf.scenario.SimulationConfig
|
||||
import com.daml.http.util.FutureUtil._
|
||||
import com.daml.http.dbbackend.{DbStartupMode, JdbcConfig}
|
||||
import com.daml.http.{EndpointsCompanion, HttpService}
|
||||
import com.daml.jwt.domain.Jwt
|
||||
import com.daml.scalautil.Statement.discard
|
||||
import com.daml.testing.postgresql.PostgresDatabase
|
||||
import com.typesafe.scalalogging.StrictLogging
|
||||
import io.gatling.core.scenario.Simulation
|
||||
import io.gatling.netty.util.Transports
|
||||
import io.netty.channel.EventLoopGroup
|
||||
import scalaz.std.scalaFuture._
|
||||
import scalaz.std.string._
|
||||
import scalaz.syntax.tag._
|
||||
import scalaz.{-\/, EitherT, \/, \/-}
|
||||
|
||||
import Config.QueryStoreIndex
|
||||
import com.daml.dbutils.ConnectionPool
|
||||
import scalaz.\/
|
||||
|
||||
import scala.concurrent.duration.{Duration, _}
|
||||
import scala.concurrent.{Await, ExecutionContext, Future, Promise, TimeoutException}
|
||||
@ -37,9 +25,7 @@ import scala.util.{Failure, Success, Try}
|
||||
|
||||
object Main extends StrictLogging {
|
||||
|
||||
private type ET[A] = EitherT[Future, Throwable, A]
|
||||
|
||||
sealed abstract class ExitCode(val code: Int) extends Product with Serializable
|
||||
sealed abstract class ExitCode(val unwrap: Int) extends Product with Serializable
|
||||
object ExitCode {
|
||||
case object Ok extends ExitCode(0)
|
||||
case object InvalidUsage extends ExitCode(100)
|
||||
@ -49,232 +35,14 @@ object Main extends StrictLogging {
|
||||
case object GatlingError extends ExitCode(104)
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val name = "http-json-perf"
|
||||
val terminationTimeout: FiniteDuration = 30.seconds
|
||||
|
||||
implicit val asys: ActorSystem = ActorSystem(name)
|
||||
implicit val mat: Materializer = Materializer(asys)
|
||||
implicit val aesf: ExecutionSequencerFactory =
|
||||
new AkkaExecutionSequencerPool(poolName = name, terminationTimeout = terminationTimeout)
|
||||
implicit val elg: EventLoopGroup = Transports.newEventLoopGroup(true, 0, "gatling")
|
||||
implicit val ec: ExecutionContext = asys.dispatcher
|
||||
|
||||
def terminate(): Unit = {
|
||||
discard { Await.result(asys.terminate(), terminationTimeout) }
|
||||
val promise = Promise[Unit]()
|
||||
val future = elg.shutdownGracefully(0, terminationTimeout.length, terminationTimeout.unit)
|
||||
discard {
|
||||
future.addListener((f: io.netty.util.concurrent.Future[_]) =>
|
||||
discard { promise.complete(Try(f.get).map(_ => ())) }
|
||||
)
|
||||
}
|
||||
discard { Await.result(promise.future, terminationTimeout) }
|
||||
}
|
||||
|
||||
val exitCode: ExitCode = Config.parseConfig(args) match {
|
||||
case None =>
|
||||
// error is printed out by scopt
|
||||
ExitCode.InvalidUsage
|
||||
case Some(config) =>
|
||||
waitForResult(logCompletion(main1(config)), config.maxDuration.getOrElse(Duration.Inf))
|
||||
}
|
||||
|
||||
terminate()
|
||||
sys.exit(exitCode.code)
|
||||
}
|
||||
|
||||
private def logCompletion(fa: Future[Throwable \/ _])(implicit ec: ExecutionContext): fa.type = {
|
||||
fa.onComplete {
|
||||
case Success(\/-(_)) => logger.info(s"Scenario completed")
|
||||
case Success(-\/(e)) => logger.error(s"Scenario failed", e)
|
||||
case Failure(e) => logger.error(s"Scenario failed", e)
|
||||
}
|
||||
fa
|
||||
}
|
||||
|
||||
private def waitForResult[A](fa: Future[Throwable \/ ExitCode], timeout: Duration): ExitCode =
|
||||
try {
|
||||
Await
|
||||
.result(fa, timeout)
|
||||
.valueOr(_ => ExitCode.GatlingError)
|
||||
} catch {
|
||||
case e: TimeoutException =>
|
||||
logger.error(s"Scenario failed", e)
|
||||
ExitCode.TimedOutScenario
|
||||
}
|
||||
|
||||
private def main1(config: Config[String])(implicit
|
||||
asys: ActorSystem,
|
||||
mat: Materializer,
|
||||
aesf: ExecutionSequencerFactory,
|
||||
ec: ExecutionContext,
|
||||
elg: EventLoopGroup,
|
||||
): Future[Throwable \/ ExitCode] = {
|
||||
import scalaz.syntax.traverse._
|
||||
|
||||
logger.info(s"$config")
|
||||
|
||||
val et: ET[ExitCode] = for {
|
||||
ledgerId <- either(
|
||||
getLedgerId(config.jwt).leftMap(_ =>
|
||||
new IllegalArgumentException("Cannot infer Ledger ID from JWT")
|
||||
)
|
||||
): ET[LedgerId]
|
||||
|
||||
_ <- either(
|
||||
config.traverse(s => resolveSimulationClass(s))
|
||||
): ET[Config[Class[_ <: Simulation]]]
|
||||
|
||||
exitCode <- rightT(
|
||||
main2(ledgerId, config)
|
||||
): ET[ExitCode]
|
||||
|
||||
} yield exitCode
|
||||
|
||||
et.run
|
||||
}
|
||||
|
||||
private def main2(ledgerId: LedgerId, config: Config[String])(implicit
|
||||
asys: ActorSystem,
|
||||
mat: Materializer,
|
||||
aesf: ExecutionSequencerFactory,
|
||||
ec: ExecutionContext,
|
||||
elg: EventLoopGroup,
|
||||
): Future[ExitCode] =
|
||||
withLedger(config.dars, ledgerId.unwrap) { (ledgerPort, _, _) =>
|
||||
withJsonApiJdbcConfig(config.queryStoreIndex) { jsonApiJdbcConfig =>
|
||||
withHttpService(
|
||||
ledgerId.unwrap,
|
||||
ledgerPort,
|
||||
jsonApiJdbcConfig,
|
||||
None,
|
||||
) { (uri, _, _, _) =>
|
||||
runGatlingScenario(config, uri.authority.host.address, uri.authority.port)
|
||||
.flatMap { case (exitCode, dir) =>
|
||||
toFuture(generateReport(dir))
|
||||
.map { _ =>
|
||||
logger.info(s"Report directory: ${dir.toAbsolutePath}")
|
||||
exitCode
|
||||
}
|
||||
}: Future[ExitCode]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def withJsonApiJdbcConfig[A](jsonApiQueryStoreEnabled: QueryStoreIndex)(
|
||||
fn: Option[JdbcConfig] => Future[A]
|
||||
)(implicit
|
||||
ec: ExecutionContext
|
||||
): Future[A] = QueryStoreBracket lookup jsonApiQueryStoreEnabled match {
|
||||
case Some(b: QueryStoreBracket[s, d]) =>
|
||||
import b._
|
||||
for {
|
||||
dbInstance <- Future.successful(state())
|
||||
dbConfig <- toFuture(start(dbInstance))
|
||||
jsonApiDbConfig <- Future.successful(config(dbInstance, dbConfig))
|
||||
a <- fn(Some(jsonApiDbConfig))
|
||||
_ <- Future.successful(
|
||||
stop(dbInstance, dbConfig) // XXX ignores resulting Try
|
||||
) // TODO: use something like `lf.data.TryOps.Bracket.bracket`
|
||||
} yield a
|
||||
|
||||
case None => fn(None)
|
||||
}
|
||||
|
||||
private[this] final case class QueryStoreBracket[S, D](
|
||||
state: () => S,
|
||||
start: S => Try[D],
|
||||
config: (S, D) => JdbcConfig,
|
||||
stop: (S, D) => Try[Unit],
|
||||
)
|
||||
private[this] object QueryStoreBracket {
|
||||
type T = QueryStoreBracket[_, _]
|
||||
val Postgres: T = QueryStoreBracket[PostgresRunner, PostgresDatabase](
|
||||
() => new PostgresRunner(),
|
||||
_.start(),
|
||||
(_, d) => jsonApiJdbcConfig(d),
|
||||
(r, _) => r.stop(),
|
||||
)
|
||||
|
||||
import com.daml.testing.oracle, oracle.OracleAround
|
||||
val Oracle: T = QueryStoreBracket[OracleRunner, OracleAround.RichOracleUser](
|
||||
() => new OracleRunner,
|
||||
_.start(),
|
||||
_ jdbcConfig _,
|
||||
_.stop(_),
|
||||
)
|
||||
|
||||
private[this] final class OracleRunner {
|
||||
|
||||
private val defaultUser = "ORACLE_USER"
|
||||
private val retainData = sys.env.get("RETAIN_DATA").exists(_ equalsIgnoreCase "true")
|
||||
private val useDefaultUser = sys.env.get("USE_DEFAULT_USER").exists(_ equalsIgnoreCase "true")
|
||||
type St = OracleAround.RichOracleUser
|
||||
|
||||
def start(): Try[St] = Try {
|
||||
if (useDefaultUser) OracleAround.createOrReuseUser(defaultUser)
|
||||
else OracleAround.createNewUniqueRandomUser()
|
||||
}
|
||||
|
||||
def jdbcConfig(user: St): JdbcConfig = {
|
||||
import DbStartupMode._
|
||||
val startupMode: DbStartupMode = if (retainData) CreateIfNeededAndStart else CreateAndStart
|
||||
JdbcConfig(
|
||||
dbutils.JdbcConfig(
|
||||
"oracle.jdbc.OracleDriver",
|
||||
user.jdbcUrlWithoutCredentials,
|
||||
user.oracleUser.name,
|
||||
user.oracleUser.pwd,
|
||||
ConnectionPool.PoolSize.Production,
|
||||
),
|
||||
startMode = startupMode,
|
||||
)
|
||||
}
|
||||
|
||||
def stop(user: St): Try[Unit] = {
|
||||
if (retainData) Success(()) else Try(user.drop())
|
||||
}
|
||||
}
|
||||
|
||||
def lookup(q: QueryStoreIndex): Option[T] = q match {
|
||||
case QueryStoreIndex.No => None
|
||||
case QueryStoreIndex.Postgres => Some(Postgres)
|
||||
case QueryStoreIndex.Oracle => Some(Oracle)
|
||||
}
|
||||
}
|
||||
|
||||
private def jsonApiJdbcConfig(c: PostgresDatabase): JdbcConfig =
|
||||
JdbcConfig(
|
||||
dbutils
|
||||
.JdbcConfig(
|
||||
driver = "org.postgresql.Driver",
|
||||
url = c.url,
|
||||
user = "test",
|
||||
password = "",
|
||||
ConnectionPool.PoolSize.Production,
|
||||
),
|
||||
startMode = DbStartupMode.CreateOnly,
|
||||
)
|
||||
|
||||
private def resolveSimulationClass(str: String): Throwable \/ Class[_ <: Simulation] = {
|
||||
try {
|
||||
private def resolveSimulationClass(
|
||||
str: String
|
||||
)(implicit ec: ExecutionContext): Future[Class[_ <: Simulation]] = {
|
||||
Future {
|
||||
val klass: Class[_] = Class.forName(str)
|
||||
val simClass = klass.asSubclass(classOf[Simulation])
|
||||
\/-(simClass)
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
logger.error(s"Cannot resolve scenario: '$str'", e)
|
||||
-\/(e)
|
||||
}
|
||||
}
|
||||
|
||||
private def getLedgerId(
|
||||
jwt: Jwt
|
||||
): EndpointsCompanion.Error \/ LedgerId = {
|
||||
EndpointsCompanion
|
||||
.customDecodeAndParsePayload[JwtPayload](jwt, HttpService.decodeJwt)
|
||||
.map { case (_, payload) => payload.ledgerId }
|
||||
simClass
|
||||
}.transform(identity, new Exception(s"Cannot resolve scenario: '$str'", _))
|
||||
}
|
||||
|
||||
private def runGatlingScenario(
|
||||
@ -292,7 +60,6 @@ object Main extends StrictLogging {
|
||||
|
||||
val hostAndPort = s"${jsonApiHost: String}:${jsonApiPort: Int}"
|
||||
discard { System.setProperty(SimulationConfig.HostAndPortKey, hostAndPort) }
|
||||
discard { System.setProperty(SimulationConfig.JwtKey, config.jwt.value) }
|
||||
|
||||
val configBuilder = new GatlingPropertiesBuilder()
|
||||
.simulationClass(config.scenario)
|
||||
@ -323,4 +90,87 @@ object Main extends StrictLogging {
|
||||
}
|
||||
simulationLog.map(_ => ())
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val name = "http-json-perf"
|
||||
val terminationTimeout: FiniteDuration = 30.seconds
|
||||
|
||||
implicit val asys: ActorSystem = ActorSystem(name)
|
||||
implicit val mat: Materializer = Materializer(asys)
|
||||
implicit val aesf: ExecutionSequencerFactory =
|
||||
new AkkaExecutionSequencerPool(poolName = name, terminationTimeout = terminationTimeout)
|
||||
implicit val elg: EventLoopGroup = Transports.newEventLoopGroup(true, 0, "gatling")
|
||||
implicit val ec: ExecutionContext = asys.dispatcher
|
||||
|
||||
def terminate(): Unit = {
|
||||
discard { Await.result(asys.terminate(), terminationTimeout) }
|
||||
val promise = Promise[Unit]()
|
||||
val future = elg.shutdownGracefully(0, terminationTimeout.length, terminationTimeout.unit)
|
||||
discard {
|
||||
future.addListener((f: io.netty.util.concurrent.Future[_]) =>
|
||||
discard { promise.complete(Try(f.get).map(_ => ())) }
|
||||
)
|
||||
}
|
||||
discard { Await.result(promise.future, terminationTimeout) }
|
||||
}
|
||||
|
||||
def runScenario(config: Config[String]) =
|
||||
resolveSimulationClass(config.scenario).flatMap { _ =>
|
||||
withLedger(config.dars, SimulationConfig.LedgerId) { (ledgerPort, _, _) =>
|
||||
QueryStoreBracket.withJsonApiJdbcConfig(config.queryStoreIndex) { jsonApiJdbcConfig =>
|
||||
withHttpService(
|
||||
SimulationConfig.LedgerId,
|
||||
ledgerPort,
|
||||
jsonApiJdbcConfig,
|
||||
None,
|
||||
) { (uri, _, _, _) =>
|
||||
runGatlingScenario(config, uri.authority.host.address, uri.authority.port)
|
||||
.flatMap { case (exitCode, dir) =>
|
||||
toFuture(generateReport(dir))
|
||||
.map { _ =>
|
||||
logger.info(s"Report directory: ${dir.toAbsolutePath}")
|
||||
exitCode
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val exitCode: ExitCode = Config.parseConfig(args) match {
|
||||
case None =>
|
||||
// error is printed out by scopt
|
||||
ExitCode.InvalidUsage
|
||||
case Some(config) =>
|
||||
logger.info(config.toString)
|
||||
waitForResult(
|
||||
logCompletion(runScenario(config)),
|
||||
config.maxDuration.getOrElse(Duration.Inf),
|
||||
)
|
||||
}
|
||||
terminate()
|
||||
sys.exit(exitCode.unwrap)
|
||||
}
|
||||
|
||||
private def waitForResult[A](fa: Future[ExitCode], timeout: Duration): ExitCode =
|
||||
try {
|
||||
Await
|
||||
.result(fa, timeout)
|
||||
} catch {
|
||||
case e: TimeoutException =>
|
||||
logger.error("Scenario failed", e)
|
||||
ExitCode.TimedOutScenario
|
||||
}
|
||||
|
||||
private def logCompletion(
|
||||
fa: Future[ExitCode]
|
||||
)(implicit ec: ExecutionContext): Future[ExitCode] = {
|
||||
fa.transform { res =>
|
||||
res match {
|
||||
case Success(_) => logger.info("Scenario completed")
|
||||
case Failure(e) => logger.error("Scenario failed", e)
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,43 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml
|
||||
package http.perf
|
||||
|
||||
import dbutils.ConnectionPool
|
||||
import http.dbbackend.{DbStartupMode, JdbcConfig}
|
||||
import testing.oracle.OracleAround
|
||||
|
||||
import scala.util.{Success, Try}
|
||||
|
||||
private[this] final class OracleRunner {
|
||||
|
||||
private val defaultUser = "ORACLE_USER"
|
||||
private val retainData = sys.env.get("RETAIN_DATA").exists(_ equalsIgnoreCase "true")
|
||||
private val useDefaultUser = sys.env.get("USE_DEFAULT_USER").exists(_ equalsIgnoreCase "true")
|
||||
type St = OracleAround.RichOracleUser
|
||||
|
||||
def start(): Try[St] = Try {
|
||||
if (useDefaultUser) OracleAround.createOrReuseUser(defaultUser)
|
||||
else OracleAround.createNewUniqueRandomUser()
|
||||
}
|
||||
|
||||
def jdbcConfig(user: St): JdbcConfig = {
|
||||
import DbStartupMode._
|
||||
val startupMode: DbStartupMode = if (retainData) CreateIfNeededAndStart else CreateAndStart
|
||||
JdbcConfig(
|
||||
dbutils.JdbcConfig(
|
||||
"oracle.jdbc.OracleDriver",
|
||||
user.jdbcUrlWithoutCredentials,
|
||||
user.oracleUser.name,
|
||||
user.oracleUser.pwd,
|
||||
ConnectionPool.PoolSize.Production,
|
||||
),
|
||||
startMode = startupMode,
|
||||
)
|
||||
}
|
||||
|
||||
def stop(user: St): Try[Unit] = {
|
||||
if (retainData) Success(()) else Try(user.drop())
|
||||
}
|
||||
}
|
@ -0,0 +1,80 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.http.perf
|
||||
|
||||
import com.daml.dbutils
|
||||
import com.daml.dbutils.ConnectionPool
|
||||
import com.daml.http.dbbackend.{DbStartupMode, JdbcConfig}
|
||||
import com.daml.http.perf.Config.QueryStoreIndex
|
||||
import com.daml.http.util.FutureUtil.toFuture
|
||||
import com.daml.testing.postgresql.PostgresDatabase
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.Try
|
||||
|
||||
private[this] final case class QueryStoreBracket[S, D](
|
||||
state: () => S,
|
||||
start: S => Try[D],
|
||||
config: (S, D) => JdbcConfig,
|
||||
stop: (S, D) => Try[Unit],
|
||||
)
|
||||
|
||||
private[this] object QueryStoreBracket {
|
||||
private def jsonApiJdbcConfig(c: PostgresDatabase): JdbcConfig =
|
||||
JdbcConfig(
|
||||
dbutils
|
||||
.JdbcConfig(
|
||||
driver = "org.postgresql.Driver",
|
||||
url = c.url,
|
||||
user = "test",
|
||||
password = "",
|
||||
ConnectionPool.PoolSize.Production,
|
||||
),
|
||||
startMode = DbStartupMode.CreateOnly,
|
||||
)
|
||||
|
||||
type T = QueryStoreBracket[_, _]
|
||||
val Postgres: T = QueryStoreBracket[PostgresRunner, PostgresDatabase](
|
||||
() => new PostgresRunner(),
|
||||
_.start(),
|
||||
(_, d) => jsonApiJdbcConfig(d),
|
||||
(r, _) => r.stop(),
|
||||
)
|
||||
|
||||
import com.daml.testing.oracle, oracle.OracleAround
|
||||
val Oracle: T = QueryStoreBracket[OracleRunner, OracleAround.RichOracleUser](
|
||||
() => new OracleRunner,
|
||||
_.start(),
|
||||
_ jdbcConfig _,
|
||||
_.stop(_),
|
||||
)
|
||||
|
||||
def lookup(q: QueryStoreIndex): Option[T] = q match {
|
||||
case QueryStoreIndex.No => None
|
||||
case QueryStoreIndex.Postgres => Some(Postgres)
|
||||
case QueryStoreIndex.Oracle => Some(Oracle)
|
||||
}
|
||||
|
||||
def withJsonApiJdbcConfig[A](
|
||||
jsonApiQueryStoreEnabled: QueryStoreIndex
|
||||
)(
|
||||
fn: Option[JdbcConfig] => Future[A]
|
||||
)(implicit
|
||||
ec: ExecutionContext
|
||||
): Future[A] = QueryStoreBracket lookup jsonApiQueryStoreEnabled match {
|
||||
case Some(b: QueryStoreBracket[s, d]) =>
|
||||
import b._
|
||||
for {
|
||||
dbInstance <- Future.successful(state())
|
||||
dbConfig <- toFuture(start(dbInstance))
|
||||
jsonApiDbConfig <- Future.successful(config(dbInstance, dbConfig))
|
||||
a <- fn(Some(jsonApiDbConfig))
|
||||
_ <- Future.successful(
|
||||
stop(dbInstance, dbConfig) // XXX ignores resulting Try
|
||||
) // TODO: use something like `lf.data.TryOps.Bracket.bracket`
|
||||
} yield a
|
||||
|
||||
case None => fn(None)
|
||||
}
|
||||
}
|
@ -32,4 +32,5 @@ private[scenario] trait SimulationConfig {
|
||||
object SimulationConfig {
|
||||
val HostAndPortKey = "com.daml.http.perf.hostAndPort"
|
||||
val JwtKey = "com.daml.http.perf.jwt"
|
||||
val LedgerId = "MyLedger"
|
||||
}
|
||||
|
@ -334,14 +334,6 @@ object EndpointsCompanion {
|
||||
|
||||
private[http] def format(a: JsValue): ByteString = ByteString(a.compactPrint)
|
||||
|
||||
private[http] def customDecodeAndParsePayload[A](jwt: Jwt, decodeJwt: ValidateJwt)(implicit
|
||||
parse: ParsePayload[A]
|
||||
) =
|
||||
for {
|
||||
a <- decodeJwt(jwt): Unauthorized \/ DecodedJwt[String]
|
||||
p <- parse.parsePayload(a)
|
||||
} yield (jwt, p)
|
||||
|
||||
private[http] def decodeAndParsePayload[A](
|
||||
jwt: Jwt,
|
||||
decodeJwt: ValidateJwt,
|
||||
|
Loading…
Reference in New Issue
Block a user