Add tests for connection failures in the JSON API (#7751)

* Add tests for connection failures in the JSON API

This PR adds some toxiproxy based tests to see how the JSON API reacts
if the connection to the ledger is killed. There are a bunch of
inconsistencies here in the tests some of which we might want to
address and the others we should at least document but I’ll leave that
for future PRs.

changelog_begin
changelog_end

* Import HttpServiceTestFixture instead of prefixing

changelog_begin
changelog_end
This commit is contained in:
Moritz Kiefer 2020-10-20 18:59:11 +02:00 committed by GitHub
parent 48527e21a5
commit ea453c35cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 643 additions and 251 deletions

View File

@ -32,6 +32,7 @@ da_scala_library(
"//ledger-service/http-json-testing",
"//ledger-service/jwt",
"//libs-scala/gatling-utils",
"//libs-scala/ports",
"//libs-scala/postgresql-testing",
"//libs-scala/scala-utils",
"@maven//:com_fasterxml_jackson_core_jackson_core",

View File

@ -9,7 +9,7 @@ import akka.actor.ActorSystem
import akka.stream.Materializer
import com.daml.gatling.stats.{SimulationLog, SimulationLogSyntax}
import com.daml.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory}
import com.daml.http.HttpServiceTestFixture.withHttpService
import com.daml.http.HttpServiceTestFixture.{withLedger, withHttpService}
import com.daml.http.domain.{JwtPayload, LedgerId}
import com.daml.http.perf.scenario.SimulationConfig
import com.daml.http.util.FutureUtil._
@ -121,17 +121,19 @@ object Main extends StrictLogging {
aesf: ExecutionSequencerFactory,
ec: ExecutionContext
): Future[ExitCode] =
withJsonApiJdbcConfig(config.queryStoreIndex) { jsonApiJdbcConfig =>
withHttpService(ledgerId.unwrap, config.dars, jsonApiJdbcConfig, None) { (uri, _, _, _) =>
runGatlingScenario(config, uri.authority.host.address, uri.authority.port)
.flatMap {
case (exitCode, dir) =>
toFuture(generateReport(dir))
.map { _ =>
logger.info(s"Report directory: ${dir.getAbsolutePath}")
exitCode
}
}: Future[ExitCode]
withLedger(config.dars, ledgerId.unwrap) { (ledgerPort, _) =>
withJsonApiJdbcConfig(config.queryStoreIndex) { jsonApiJdbcConfig =>
withHttpService(ledgerId.unwrap, ledgerPort, jsonApiJdbcConfig, None) { (uri, _, _, _) =>
runGatlingScenario(config, uri.authority.host.address, uri.authority.port)
.flatMap {
case (exitCode, dir) =>
toFuture(generateReport(dir))
.map { _ =>
logger.info(s"Report directory: ${dir.getAbsolutePath}")
exitCode
}
}: Future[ExitCode]
}
}
}

View File

@ -43,6 +43,7 @@ da_scala_library(
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_http_core_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:io_spray_spray_json_2_12",
"@maven//:org_tpolecat_doobie_core_2_12",
"@maven//:org_tpolecat_doobie_free_2_12",
"@maven//:org_typelevel_cats_core_2_12",

View File

@ -4,20 +4,30 @@
package com.daml.http
import java.io.File
import java.time.Instant
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken}
import akka.http.scaladsl.model._
import akka.stream.Materializer
import com.daml.api.util.TimestampConversion
import com.daml.bazeltools.BazelRunfiles.rlocation
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.http.dbbackend.ContractDao
import com.daml.http.json.{DomainJsonDecoder, DomainJsonEncoder}
import com.daml.http.json.{DomainJsonDecoder, DomainJsonEncoder, SprayJson}
import com.daml.http.util.ClientUtil.boxedRecord
import com.daml.http.util.TestUtil.getResponseDataBytes
import com.daml.http.util.{FutureUtil, NewBoolean}
import com.daml.ledger.api.auth.AuthService
import com.daml.jwt.JwtSigner
import com.daml.jwt.domain.{DecodedJwt, Jwt}
import com.daml.ledger.api.auth.{AuthService, AuthServiceJWTCodec, AuthServiceJWTPayload}
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.refinements.ApiTypes.ApplicationId
import com.daml.ledger.api.refinements.{ApiTypes => lar}
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.ledger.api.v1.{value => v}
import com.daml.ledger.client.LedgerClient
import com.daml.ledger.client.configuration.{
CommandClientConfiguration,
@ -30,21 +40,27 @@ import com.daml.platform.sandbox.SandboxServer
import com.daml.platform.sandbox.config.SandboxConfig
import com.daml.platform.services.time.TimeProviderType
import com.daml.ports.Port
import com.typesafe.scalalogging.LazyLogging
import scalaz._
import scalaz.std.option._
import scalaz.std.scalaFuture._
import scalaz.syntax.show._
import scalaz.syntax.tag._
import scalaz.syntax.traverse._
import spray.json._
import scala.concurrent.duration.{DAYS, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
object HttpServiceTestFixture {
object HttpServiceTestFixture extends LazyLogging {
import json.JsonProtocol._
private val doNotReloadPackages = FiniteDuration(100, DAYS)
def withHttpService[A](
testName: String,
dars: List[File],
ledgerPort: Port,
jdbcConfig: Option[JdbcConfig],
staticContentConfig: Option[StaticContentConfig],
leakPasswords: LeakPasswords = LeakPasswords.FiresheepStyle,
@ -56,19 +72,11 @@ object HttpServiceTestFixture {
aesf: ExecutionSequencerFactory,
ec: ExecutionContext): Future[A] = {
val ledgerId = LedgerId(testName)
val applicationId = ApplicationId(testName)
val contractDaoF: Future[Option[ContractDao]] = jdbcConfig.map(c => initializeDb(c)).sequence
val ledgerF = for {
ledger <- Future(
new SandboxServer(ledgerConfig(Port.Dynamic, dars, ledgerId, useTls = useTls), mat))
port <- ledger.portF
} yield (ledger, port)
val httpServiceF: Future[ServerBinding] = for {
(_, ledgerPort) <- ledgerF
contractDao <- contractDaoF
config = Config(
ledgerHost = "localhost",
@ -91,7 +99,6 @@ object HttpServiceTestFixture {
} yield httpService
val clientF: Future[LedgerClient] = for {
(_, ledgerPort) <- ledgerF
client <- LedgerClient.singleHost(
"localhost",
ledgerPort.value,
@ -116,7 +123,6 @@ object HttpServiceTestFixture {
Future
.sequence(
Seq(
ledgerF.map(_._1.close()),
httpServiceF.flatMap(_.unbind()),
) map (_ fallbackTo Future.unit))
.transform(_ => ta)
@ -127,7 +133,8 @@ object HttpServiceTestFixture {
dars: List[File],
testName: String,
token: Option[String] = None,
authService: Option[AuthService] = None)(testFn: LedgerClient => Future[A])(
useTls: UseTls = UseTls.NoTls,
authService: Option[AuthService] = None)(testFn: (Port, LedgerClient) => Future[A])(
implicit mat: Materializer,
aesf: ExecutionSequencerFactory,
ec: ExecutionContext): Future[A] = {
@ -137,7 +144,7 @@ object HttpServiceTestFixture {
val ledgerF = for {
ledger <- Future(
new SandboxServer(ledgerConfig(Port.Dynamic, dars, ledgerId, authService), mat))
new SandboxServer(ledgerConfig(Port.Dynamic, dars, ledgerId, authService, useTls), mat))
port <- ledger.portF
} yield (ledger, port)
@ -146,12 +153,13 @@ object HttpServiceTestFixture {
client <- LedgerClient.singleHost(
"localhost",
ledgerPort.value,
clientConfig(applicationId, token))
clientConfig(applicationId, token, useTls))
} yield client
val fa: Future[A] = for {
(_, ledgerPort) <- ledgerF
client <- clientF
a <- testFn(client)
a <- testFn(ledgerPort, client)
} yield a
fa.onComplete { _ =>
@ -165,8 +173,8 @@ object HttpServiceTestFixture {
ledgerPort: Port,
dars: List[File],
ledgerId: LedgerId,
authService: Option[AuthService] = None,
useTls: UseTls = UseTls.NoTls
authService: Option[AuthService],
useTls: UseTls,
): SandboxConfig =
sandbox.DefaultConfig.copy(
port = ledgerPort,
@ -180,7 +188,7 @@ object HttpServiceTestFixture {
private def clientConfig[A](
applicationId: ApplicationId,
token: Option[String] = None,
useTls: UseTls = UseTls.NoTls): LedgerClientConfiguration =
useTls: UseTls): LedgerClientConfiguration =
LedgerClientConfiguration(
applicationId = ApplicationId.unwrap(applicationId),
ledgerIdRequirement = LedgerIdRequirement.none,
@ -235,4 +243,145 @@ object HttpServiceTestFixture {
private val serverTlsConfig = TlsConfiguration(enabled = true, serverCrt, serverPem, caCrt)
private val clientTlsConfig = TlsConfiguration(enabled = true, clientCrt, clientPem, caCrt)
private val noTlsConfig = TlsConfiguration(enabled = false, None, None, None)
def jwtForParties(parties: List[String], ledgerId: String) = {
import AuthServiceJWTCodec.JsonImplicits._
val decodedJwt = DecodedJwt(
"""{"alg": "HS256", "typ": "JWT"}""",
AuthServiceJWTPayload(
ledgerId = Some(ledgerId),
applicationId = Some("test"),
actAs = parties,
participantId = None,
exp = None,
admin = false,
readAs = List()
).toJson.prettyPrint
)
JwtSigner.HMAC256
.sign(decodedJwt, "secret")
.fold(e => throw new IllegalArgumentException(s"cannot sign a JWT: ${e.shows}"), identity)
}
def headersWithPartyAuth(parties: List[String], ledgerId: String) =
authorizationHeader(jwtForParties(parties, ledgerId))
def authorizationHeader(token: Jwt): List[Authorization] =
List(Authorization(OAuth2BearerToken(token.value)))
def postRequest(uri: Uri, json: JsValue, headers: List[HttpHeader] = Nil)(
implicit as: ActorSystem,
ec: ExecutionContext,
mat: Materializer): Future[(StatusCode, JsValue)] = {
Http()
.singleRequest(
HttpRequest(
method = HttpMethods.POST,
uri = uri,
headers = headers,
entity = HttpEntity(ContentTypes.`application/json`, json.prettyPrint))
)
.flatMap { resp =>
val bodyF: Future[String] = getResponseDataBytes(resp, debug = true)
bodyF.map(body => {
(resp.status, body.parseJson)
})
}
}
def postJsonStringRequest(uri: Uri, jsonString: String, headers: List[HttpHeader])(
implicit as: ActorSystem,
ec: ExecutionContext,
mat: Materializer): Future[(StatusCode, JsValue)] = {
logger.info(s"postJson: ${uri.toString} json: ${jsonString: String}")
Http()
.singleRequest(
HttpRequest(
method = HttpMethods.POST,
uri = uri,
headers = headers,
entity = HttpEntity(ContentTypes.`application/json`, jsonString))
)
.flatMap { resp =>
val bodyF: Future[String] = getResponseDataBytes(resp, debug = true)
bodyF.map(body => (resp.status, body.parseJson))
}
}
def postJsonRequest(uri: Uri, json: JsValue, headers: List[HttpHeader])(
implicit as: ActorSystem,
ec: ExecutionContext,
mat: Materializer): Future[(StatusCode, JsValue)] =
postJsonStringRequest(uri, json.prettyPrint, headers)
def postCreateCommand(
cmd: domain.CreateCommand[v.Record],
encoder: DomainJsonEncoder,
uri: Uri,
headers: List[HttpHeader])(
implicit as: ActorSystem,
ec: ExecutionContext,
mat: Materializer): Future[(StatusCode, JsValue)] = {
import encoder.implicits._
for {
json <- FutureUtil.toFuture(SprayJson.encode1(cmd)): Future[JsValue]
result <- postJsonRequest(uri.withPath(Uri.Path("/v1/create")), json, headers = headers)
} yield result
}
def postArchiveCommand(
templateId: domain.TemplateId.OptionalPkg,
contractId: domain.ContractId,
encoder: DomainJsonEncoder,
uri: Uri,
headers: List[HttpHeader])(
implicit as: ActorSystem,
ec: ExecutionContext,
mat: Materializer): Future[(StatusCode, JsValue)] = {
val ref = domain.EnrichedContractId(Some(templateId), contractId)
val cmd = archiveCommand(ref)
for {
json <- FutureUtil.toFuture(encoder.encodeExerciseCommand(cmd)): Future[JsValue]
result <- postJsonRequest(uri.withPath(Uri.Path("/v1/exercise")), json, headers)
} yield result
}
def getRequest(uri: Uri, headers: List[HttpHeader])(
implicit as: ActorSystem,
ec: ExecutionContext,
mat: Materializer): Future[(StatusCode, JsValue)] = {
Http()
.singleRequest(
HttpRequest(method = HttpMethods.GET, uri = uri, headers = headers)
)
.flatMap { resp =>
val bodyF: Future[String] = getResponseDataBytes(resp, debug = true)
bodyF.map(body => (resp.status, body.parseJson))
}
}
def archiveCommand[Ref](reference: Ref): domain.ExerciseCommand[v.Value, Ref] = {
val arg: v.Record = v.Record()
val choice = lar.Choice("Archive")
domain.ExerciseCommand(reference, choice, boxedRecord(arg), None)
}
def accountCreateCommand(
owner: domain.Party,
number: String,
time: v.Value.Sum.Timestamp = TimestampConversion.instantToMicros(Instant.now))
: domain.CreateCommand[v.Record] = {
val templateId = domain.TemplateId(None, "Account", "Account")
val timeValue = v.Value(time)
val enabledVariantValue =
v.Value(v.Value.Sum.Variant(v.Variant(None, "Enabled", Some(timeValue))))
val arg = v.Record(
fields = List(
v.RecordField("owner", Some(v.Value(v.Value.Sum.Party(owner.unwrap)))),
v.RecordField("number", Some(v.Value(v.Value.Sum.Text(number)))),
v.RecordField("status", Some(enabledVariantValue))
))
domain.CreateCommand(templateId, arg, None)
}
}

View File

@ -0,0 +1,54 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.http.util
import java.io.{BufferedWriter, File, FileWriter}
import akka.http.scaladsl.model.HttpResponse
import akka.stream.Materializer
import akka.util.ByteString
import com.daml.lf.data.TryOps.Bracket.bracket
import com.typesafe.scalalogging.LazyLogging
import scala.concurrent.{ExecutionContext, Future}
import scala.io.Source
import scala.util.{Failure, Success, Try}
object TestUtil extends LazyLogging {
def requiredFile(fileName: String): Try[File] = {
val file = new File(fileName).getAbsoluteFile
if (file.exists()) Success(file)
else
Failure(new IllegalStateException(s"File doest not exist: $fileName"))
}
def writeToFile(file: File, text: String): Try[File] =
bracket(Try(new BufferedWriter(new FileWriter(file))))(x => Try(x.close())).flatMap { bw =>
Try {
bw.write(text)
file
}
}
def readFile(resourcePath: String): String =
Try {
val source = Source.fromResource(resourcePath)
val content = source.getLines().mkString
source.close
content
} match {
case Success(value) => value
case Failure(ex) => throw ex
}
def getResponseDataBytes(resp: HttpResponse, debug: Boolean = false)(
implicit mat: Materializer,
ec: ExecutionContext): Future[String] = {
val fb = resp.entity.dataBytes.runFold(ByteString.empty)((b, a) => b ++ a).map(_.utf8String)
if (debug) fb.foreach(x => logger.info(s"---- response data: $x"))
fb
}
}

View File

@ -9,6 +9,7 @@ load(
"lf_scalacopts",
)
load("//rules_daml:daml.bzl", "daml_compile")
load("@os_info//:os_info.bzl", "is_windows")
hj_scalacopts = lf_scalacopts + [
"-P:wartremover:traverser:org.wartremover.warts.NonUnitStatements",
@ -203,4 +204,50 @@ da_scala_test(
],
)
da_scala_test(
name = "failure-tests",
srcs = glob(["src/failure/scala/**/*.scala"]),
data = [
":Account.dar",
"//docs:quickstart-model.dar",
"//ledger/test-common:model-tests.dar",
"//ledger/test-common/test-certificates",
"@toxiproxy_dev_env//:bin/toxiproxy-cmd" if not is_windows else "@toxiproxy_dev_env//:toxiproxy-server-windows-amd64.exe",
],
plugins = [
"@maven//:org_spire_math_kind_projector_2_12",
],
resources = glob(["src/failure/resources/**/*"]),
scalacopts = hj_scalacopts,
silent_annotations = False,
deps = [
":http-json",
"//bazel_tools/runfiles:scala_runfiles",
"//daml-lf/data",
"//language-support/scala/bindings-akka",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger-service/http-json-testing",
"//ledger/caching",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-common",
"//ledger/ledger-resources",
"//ledger/participant-integration-api",
"//ledger/participant-integration-api:participant-integration-api-tests-lib",
"//ledger/participant-state",
"//ledger/sandbox",
"//ledger/sandbox-common",
"//ledger/sandbox-common:sandbox-common-scala-tests-lib",
"//libs-scala/ports",
"//libs-scala/resources",
"//libs-scala/timer-utils",
"@maven//:com_typesafe_akka_akka_http_core_2_12",
"@maven//:com_typesafe_scala_logging_scala_logging_2_12",
"@maven//:eu_rekawek_toxiproxy_toxiproxy_java_2_1_3",
"@maven//:io_spray_spray_json_2_12",
"@maven//:org_scalatest_scalatest_2_12",
"@maven//:org_scalaz_scalaz_core_2_12",
],
)
exports_files(["src/main/resources/logback.xml"])

View File

@ -0,0 +1,19 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="io.netty" level="WARN">
<appender-ref ref="stderr-appender"/>
</logger>
<logger name="io.grpc.netty" level="WARN">
<appender-ref ref="stderr-appender"/>
</logger>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@ -0,0 +1,161 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.http
import akka.http.scaladsl.model.{StatusCodes, Uri}
import com.daml.http.json.{JsonError, SprayJson}
import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll
import com.daml.timer.RetryStrategy
import org.scalatest._
import org.scalatest.concurrent.Eventually
import scalaz.\/
import scalaz.syntax.show._
import scalaz.syntax.tag._
import spray.json._
import scala.concurrent.duration._
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
final class FailureTests
extends AsyncFreeSpec
with HttpFailureTestFixture
with Matchers
with SuiteResourceManagementAroundAll
with Eventually
with Inside {
import HttpServiceTestFixture._
"Command submission succeeds after reconnect" in withHttpService[Assertion] {
(uri, encoder, _, client) =>
for {
p <- allocateParty(client, "Alice")
(status, _) <- postCreateCommand(
accountCreateCommand(p, "23"),
encoder,
uri,
headersWithPartyAuth(List(p.unwrap), ledgerId().unwrap))
_ = status shouldBe StatusCodes.OK
_ = proxy.disable()
(status, output) <- postCreateCommand(
accountCreateCommand(p, "24"),
encoder,
uri,
headersWithPartyAuth(List(p.unwrap), ledgerId().unwrap))
_ = status shouldBe StatusCodes.InternalServerError
_ <- inside(output) {
case JsObject(fields) =>
inside(fields.get("status")) {
case Some(JsNumber(code)) => code shouldBe 500
}
}
_ = proxy.enable()
// eventually doesnt handle Futures in the version of scalatest were using.
_ <- RetryStrategy.constant(5, 2.seconds)((_, _) =>
for {
(status, _) <- postCreateCommand(
accountCreateCommand(p, "25"),
encoder,
uri,
headersWithPartyAuth(List(p.unwrap), ledgerId().unwrap))
} yield status shouldBe StatusCodes.OK)
} yield succeed
}
"/v1/query GET succeeds after reconnect" in withHttpService[Assertion] {
(uri, encoder, _, client) =>
for {
p <- allocateParty(client, "Alice")
(status, _) <- postCreateCommand(
accountCreateCommand(p, "23"),
encoder,
uri,
headersWithPartyAuth(List(p.unwrap), ledgerId().unwrap))
(status, output) <- getRequest(
uri = uri.withPath(Uri.Path("/v1/query")),
headersWithPartyAuth(List(p.unwrap), ledgerId().unwrap))
_ <- inside(output) {
case JsObject(fields) =>
inside(fields.get("result")) {
case Some(JsArray(rs)) => rs.size shouldBe 1
}
}
_ = proxy.disable()
(status, output) <- getRequest(
uri = uri.withPath(Uri.Path("/v1/query")),
headersWithPartyAuth(List(p.unwrap), ledgerId().unwrap))
_ <- inside(output) {
case JsObject(fields) =>
inside(fields.get("status")) {
case Some(JsNumber(code)) => code shouldBe 501
}
}
// TODO Document this properly or adjust it
_ = status shouldBe StatusCodes.OK
_ = proxy.enable()
} yield succeed
}
"/v1/query POST succeeds after reconnect" in withHttpService[Assertion] {
(uri, encoder, _, client) =>
for {
p <- allocateParty(client, "Alice")
(status, _) <- postCreateCommand(
accountCreateCommand(p, "23"),
encoder,
uri,
headersWithPartyAuth(List(p.unwrap), ledgerId().unwrap))
_ = status shouldBe StatusCodes.OK
query = jsObject("""{"templateIds": ["Account:Account"]}""")
_ = println("first query")
(status, output) <- postRequest(
uri = uri.withPath(Uri.Path("/v1/query")),
query,
headersWithPartyAuth(List(p.unwrap), ledgerId().unwrap))
_ = status shouldBe StatusCodes.OK
_ <- inside(output) {
case JsObject(fields) =>
inside(fields.get("result")) {
case Some(JsArray(rs)) => rs.size shouldBe 1
}
}
_ = proxy.disable()
(status, output) <- postRequest(
uri = uri.withPath(Uri.Path("/v1/query")),
query,
headersWithPartyAuth(List(p.unwrap), ledgerId().unwrap))
_ <- inside(output) {
case JsObject(fields) =>
inside(fields.get("status")) {
case Some(JsNumber(code)) => code shouldBe 501
}
}
// TODO Document this properly or adjust it
_ = status shouldBe StatusCodes.OK
_ = proxy.enable()
// eventually doesnt handle Futures in the version of scalatest were using.
_ <- RetryStrategy.constant(5, 2.seconds)((_, _) =>
for {
(status, output) <- postRequest(
uri = uri.withPath(Uri.Path("/v1/query")),
query,
headersWithPartyAuth(List(p.unwrap), ledgerId().unwrap))
_ = status shouldBe StatusCodes.OK
_ <- inside(output) {
case JsObject(fields) =>
inside(fields.get("result")) {
case Some(JsArray(rs)) => rs.size shouldBe 1
}
}
} yield succeed)
} yield succeed
}
protected def jsObject(s: String): JsObject = {
val r: JsonError \/ JsObject = for {
jsVal <- SprayJson.parse(s).leftMap(e => JsonError(e.shows))
jsObj <- SprayJson.mustBeJsObject(jsVal)
} yield jsObj
r.valueOr(e => fail(e.shows))
}
}

View File

@ -0,0 +1,31 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.http
import akka.http.scaladsl.model.Uri
import com.daml.bazeltools.BazelRunfiles
import com.daml.http.json.{DomainJsonDecoder, DomainJsonEncoder}
import com.daml.ledger.client.LedgerClient
import org.scalatest.Suite
import scala.concurrent.{ExecutionContext, Future}
trait HttpFailureTestFixture extends ToxicSandboxFixture { self: Suite =>
private implicit val ec: ExecutionContext = system.dispatcher
override def packageFiles =
List(
BazelRunfiles.requiredResource("docs/quickstart-model.dar"),
BazelRunfiles.requiredResource("ledger-service/http-json/Account.dar"))
protected def allocateParty(client: LedgerClient, displayName: String): Future[domain.Party] =
client.partyManagementClient
.allocateParty(None, Some(displayName), None)
.map(p => domain.Party(p.party))
protected def withHttpService[A]
: ((Uri, DomainJsonEncoder, DomainJsonDecoder, LedgerClient) => Future[A]) => Future[A] =
HttpServiceTestFixture.withHttpService(this.getClass.getSimpleName, proxiedPort, None, None)
}

View File

@ -0,0 +1,91 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.http
import java.net.InetAddress
import com.daml.bazeltools.BazelRunfiles
import com.daml.ledger.api.testing.utils.{OwnedResource, SuiteResource, Resource => TestResource}
import com.daml.platform.apiserver.services.GrpcClientResource
import com.daml.platform.sandbox.{AbstractSandboxFixture, SandboxBackend}
import com.daml.platform.sandboxnext.Runner
import com.daml.ports.{LockedFreePort, Port}
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.timer.RetryStrategy
import eu.rekawek.toxiproxy._
import io.grpc.Channel
import org.scalatest.{BeforeAndAfterEach, Suite}
import scala.concurrent.duration.DurationInt
import scala.concurrent.Future
import scala.sys.process.Process
// Fixture for Sandbox Next behind toxiproxy to simulate failures.
trait ToxicSandboxFixture
extends AbstractSandboxFixture
with SuiteResource[(Port, Channel, Port, ToxiproxyClient, Proxy)]
with BeforeAndAfterEach {
self: Suite =>
private def toxiproxy(ledger: Port): ResourceOwner[(Port, ToxiproxyClient, Proxy)] =
new ResourceOwner[(Port, ToxiproxyClient, Proxy)] {
val host = InetAddress.getLoopbackAddress
val isWindows: Boolean = sys.props("os.name").toLowerCase.contains("windows")
override def acquire()(
implicit context: ResourceContext): Resource[(Port, ToxiproxyClient, Proxy)] = {
def start(): Future[(Port, ToxiproxyClient, Proxy, Process)] = {
val toxiproxyExe =
if (!isWindows) BazelRunfiles.rlocation("external/toxiproxy_dev_env/bin/toxiproxy-cmd")
else
BazelRunfiles.rlocation(
"external/toxiproxy_dev_env/toxiproxy-server-windows-amd64.exe")
for {
toxiproxyPort <- Future(LockedFreePort.find())
toxiproxyServer <- Future(
Process(Seq(toxiproxyExe, "--port", toxiproxyPort.port.value.toString)).run())
_ <- RetryStrategy.constant(attempts = 3, waitTime = 2.seconds)((_, _) =>
Future(toxiproxyPort.testAndUnlock(host)))
toxiproxyClient = new ToxiproxyClient(host.getHostName, toxiproxyPort.port.value)
ledgerProxyPort = LockedFreePort.find()
proxy = toxiproxyClient.createProxy(
"ledger",
s"${host.getHostName}:${ledgerProxyPort.port}",
s"${host.getHostName}:$ledger")
_ = ledgerProxyPort.unlock()
} yield (ledgerProxyPort.port, toxiproxyClient, proxy, toxiproxyServer)
}
def stop(r: (Port, ToxiproxyClient, Proxy, Process)) = Future {
r._4.destroy
val _ = r._4.exitValue
()
}
Resource(start())(stop).map {
case (port, toxiproxyClient, proxy, _) => (port, toxiproxyClient, proxy)
}
}
}
override protected def serverPort = suiteResource.value._1
override protected def channel: Channel = suiteResource.value._2
protected def proxiedPort: Port = suiteResource.value._3
protected def proxyClient: ToxiproxyClient = suiteResource.value._4
protected def proxy: Proxy = suiteResource.value._5
override protected lazy val suiteResource
: TestResource[(Port, Channel, Port, ToxiproxyClient, Proxy)] = {
implicit val resourceContext: ResourceContext = ResourceContext(system.dispatcher)
new OwnedResource[ResourceContext, (Port, Channel, Port, ToxiproxyClient, Proxy)](
for {
jdbcUrl <- database
.getOrElse(SandboxBackend.H2Database.owner)
.map(info => Some(info.jdbcUrl))
port <- new Runner(config.copy(jdbcUrl = jdbcUrl))
channel <- GrpcClientResource.owner(port)
(proxiedPort, proxyClient, proxy) <- toxiproxy(port)
} yield (port, channel, proxiedPort, proxyClient, proxy),
acquisitionTimeout = 1.minute,
releaseTimeout = 1.minute,
)
}
override protected def beforeEach() = proxyClient.reset()
}

View File

@ -9,7 +9,6 @@ import java.time.Instant
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken}
import akka.stream.Materializer
import akka.stream.scaladsl.{Source, StreamConverters}
import akka.util.ByteString
@ -27,7 +26,6 @@ import com.daml.http.util.FutureUtil.toFuture
import com.daml.http.util.{FutureUtil, TestUtil}
import com.daml.jwt.JwtSigner
import com.daml.jwt.domain.{DecodedJwt, Jwt}
import com.daml.ledger.api.auth.{AuthServiceJWTPayload, AuthServiceJWTCodec}
import com.daml.ledger.api.refinements.{ApiTypes => lar}
import com.daml.ledger.api.v1.{value => v}
import com.daml.ledger.client.LedgerClient
@ -76,6 +74,7 @@ trait AbstractHttpServiceIntegrationTestFuns extends StrictLogging {
this: AsyncFreeSpec with Matchers with Inside with StrictLogging =>
import AbstractHttpServiceIntegrationTestFuns._
import json.JsonProtocol._
import HttpServiceTestFixture._
def jdbcConfig: Option[JdbcConfig]
@ -90,26 +89,7 @@ trait AbstractHttpServiceIntegrationTestFuns extends StrictLogging {
protected val metdata2: MetadataReader.LfMetadata =
MetadataReader.readFromDar(dar2).valueOr(e => fail(s"Cannot read dar2 metadata: $e"))
protected def jwtForParties(parties: List[String]) = {
import AuthServiceJWTCodec.JsonImplicits._
val decodedJwt = DecodedJwt(
"""{"alg": "HS256", "typ": "JWT"}""",
AuthServiceJWTPayload(
ledgerId = Some(testId),
applicationId = Some("test"),
actAs = parties,
participantId = None,
exp = None,
admin = false,
readAs = List()
).toJson.prettyPrint
)
JwtSigner.HMAC256
.sign(decodedJwt, "secret")
.fold(e => fail(s"cannot sign a JWT: ${e.shows}"), identity)
}
protected val jwt: Jwt = jwtForParties(List("Alice"))
protected val jwt: Jwt = jwtForParties(List("Alice"), testId)
protected val jwtAdminNoParty: Jwt = {
val decodedJwt = DecodedJwt(
@ -129,75 +109,49 @@ trait AbstractHttpServiceIntegrationTestFuns extends StrictLogging {
import tag.@@ // used for subtyping to make `AHS ec` beat executionContext
implicit val `AHS ec`: ExecutionContext @@ this.type = tag[this.type](`AHS asys`.dispatcher)
protected def withHttpServiceAndClient[A]
: ((Uri, DomainJsonEncoder, DomainJsonDecoder, LedgerClient) => Future[A]) => Future[A] =
HttpServiceTestFixture
.withHttpService[A](
testId,
List(dar1, dar2),
jdbcConfig,
staticContentConfig,
useTls = useTls,
wsConfig = wsConfig)
protected def withHttpServiceAndClient[A](
testFn: (Uri, DomainJsonEncoder, DomainJsonDecoder, LedgerClient) => Future[A]): Future[A] =
HttpServiceTestFixture.withLedger[A](List(dar1, dar2), testId, None, useTls) {
case (ledgerPort, _) =>
HttpServiceTestFixture.withHttpService[A](
testId,
ledgerPort,
jdbcConfig,
staticContentConfig,
useTls = useTls,
wsConfig = wsConfig)(testFn)
}
protected def withHttpService[A](
f: (Uri, DomainJsonEncoder, DomainJsonDecoder) => Future[A]): Future[A] =
withHttpServiceAndClient((a, b, c, _) => f(a, b, c))
protected def withLedger[A]: (LedgerClient => Future[A]) => Future[A] =
HttpServiceTestFixture.withLedger[A](List(dar1, dar2), testId)
protected def accountCreateCommand(
owner: domain.Party,
number: String,
time: v.Value.Sum.Timestamp = TimestampConversion.instantToMicros(Instant.now))
: domain.CreateCommand[v.Record] = {
val templateId = domain.TemplateId(None, "Account", "Account")
val timeValue = v.Value(time)
val enabledVariantValue =
v.Value(v.Value.Sum.Variant(v.Variant(None, "Enabled", Some(timeValue))))
val arg = v.Record(
fields = List(
v.RecordField("owner", Some(v.Value(v.Value.Sum.Party(owner.unwrap)))),
v.RecordField("number", Some(v.Value(v.Value.Sum.Text(number)))),
v.RecordField("status", Some(enabledVariantValue))
))
domain.CreateCommand(templateId, arg, None)
}
protected def withLedger[A](testFn: LedgerClient => Future[A]): Future[A] =
HttpServiceTestFixture.withLedger[A](List(dar1, dar2), testId) {
case (_, client) => testFn(client)
}
protected val headersWithAuth = authorizationHeader(jwt)
protected def headersWithPartyAuth(parties: List[String]) =
authorizationHeader(jwtForParties(parties))
protected def authorizationHeader(token: Jwt): List[Authorization] =
List(Authorization(OAuth2BearerToken(token.value)))
HttpServiceTestFixture.headersWithPartyAuth(parties, testId)
protected def postJsonStringRequest(
uri: Uri,
jsonString: String
): Future[(StatusCode, JsValue)] =
TestUtil.postJsonStringRequest(uri, jsonString, headersWithAuth)
HttpServiceTestFixture.postJsonStringRequest(uri, jsonString, headersWithAuth)
protected def postJsonRequest(
uri: Uri,
json: JsValue,
headers: List[HttpHeader] = headersWithAuth): Future[(StatusCode, JsValue)] =
TestUtil.postJsonStringRequest(uri, json.prettyPrint, headers)
HttpServiceTestFixture.postJsonRequest(uri, json, headers)
protected def getRequest(
uri: Uri,
headers: List[HttpHeader] = headersWithAuth): Future[(StatusCode, JsValue)] = {
Http()
.singleRequest(
HttpRequest(method = HttpMethods.GET, uri = uri, headers = headers)
)
.flatMap { resp =>
val bodyF: Future[String] = getResponseDataBytes(resp, debug = true)
bodyF.map(body => (resp.status, body.parseJson))
}
}
headers: List[HttpHeader] = headersWithAuth): Future[(StatusCode, JsValue)] =
HttpServiceTestFixture.getRequest(uri, headers)
protected def getResponseDataBytes(resp: HttpResponse, debug: Boolean = false): Future[String] = {
val fb = resp.entity.dataBytes.runFold(ByteString.empty)((b, a) => b ++ a).map(_.utf8String)
@ -209,27 +163,16 @@ trait AbstractHttpServiceIntegrationTestFuns extends StrictLogging {
cmd: domain.CreateCommand[v.Record],
encoder: DomainJsonEncoder,
uri: Uri,
headers: List[HttpHeader] = headersWithAuth): Future[(StatusCode, JsValue)] = {
import encoder.implicits._
for {
json <- toFuture(SprayJson.encode1(cmd)): Future[JsValue]
result <- postJsonRequest(uri.withPath(Uri.Path("/v1/create")), json, headers = headers)
} yield result
}
headers: List[HttpHeader] = headersWithAuth): Future[(StatusCode, JsValue)] =
HttpServiceTestFixture.postCreateCommand(cmd, encoder, uri, headers)
protected def postArchiveCommand(
templateId: domain.TemplateId.OptionalPkg,
contractId: domain.ContractId,
encoder: DomainJsonEncoder,
uri: Uri,
headers: List[HttpHeader] = headersWithAuth): Future[(StatusCode, JsValue)] = {
val ref = domain.EnrichedContractId(Some(templateId), contractId)
val cmd = archiveCommand(ref)
for {
json <- toFuture(encoder.encodeExerciseCommand(cmd)): Future[JsValue]
result <- postJsonRequest(uri.withPath(Uri.Path("/v1/exercise")), json, headers)
} yield result
}
headers: List[HttpHeader] = headersWithAuth): Future[(StatusCode, JsValue)] =
HttpServiceTestFixture.postArchiveCommand(templateId, contractId, encoder, uri, headers)
protected def lookupContractAndAssert(contractLocator: domain.ContractLocator[JsValue])(
contractId: ContractId,
@ -306,12 +249,6 @@ trait AbstractHttpServiceIntegrationTestFuns extends StrictLogging {
meta = None)
}
protected def archiveCommand[Ref](reference: Ref): domain.ExerciseCommand[v.Value, Ref] = {
val arg: v.Record = v.Record()
val choice = lar.Choice("Archive")
domain.ExerciseCommand(reference, choice, boxedRecord(arg), None)
}
protected def assertStatus(jsObj: JsValue, expectedStatus: StatusCode): Assertion = {
inside(jsObj) {
case JsObject(fields) =>
@ -459,7 +396,7 @@ trait AbstractHttpServiceIntegrationTestFuns extends StrictLogging {
protected def initialIouCreate(serviceUri: Uri): Future[(StatusCode, JsValue)] = {
val payload = TestUtil.readFile("it/iouCreateCommand.json")
TestUtil.postJsonStringRequest(
HttpServiceTestFixture.postJsonStringRequest(
serviceUri.withPath(Uri.Path("/v1/create")),
payload,
headersWithAuth)
@ -482,6 +419,7 @@ abstract class AbstractHttpServiceIntegrationTest
with AbstractHttpServiceIntegrationTestFuns {
import json.JsonProtocol._
import HttpServiceTestFixture._
override final def useTls = UseTls.NoTls
@ -1250,7 +1188,8 @@ abstract class AbstractHttpServiceIntegrationTest
"fetch by key" in withHttpService { (uri, encoder, _) =>
val owner = domain.Party("Alice")
val accountNumber = "abc123"
val command: domain.CreateCommand[v.Record] = accountCreateCommand(owner, accountNumber)
val command: domain.CreateCommand[v.Record] =
accountCreateCommand(owner, accountNumber)
postCreateCommand(command, encoder, uri).flatMap {
case (status, output) =>
@ -1268,7 +1207,8 @@ abstract class AbstractHttpServiceIntegrationTest
"commands/exercise Archive by key" in withHttpService { (uri, encoder, _) =>
val owner = domain.Party("Alice")
val accountNumber = "abc123"
val create: domain.CreateCommand[v.Record] = accountCreateCommand(owner, accountNumber)
val create: domain.CreateCommand[v.Record] =
accountCreateCommand(owner, accountNumber)
val keyRecord = v.Record(
fields = Seq(
@ -1357,7 +1297,8 @@ abstract class AbstractHttpServiceIntegrationTest
val accountNumber = "abc123"
val now = TimestampConversion.instantToMicros(Instant.now)
val nowStr = TimestampConversion.microsToInstant(now).toString
val command: domain.CreateCommand[v.Record] = accountCreateCommand(owner, accountNumber, now)
val command: domain.CreateCommand[v.Record] =
accountCreateCommand(owner, accountNumber, now)
val packageId: Ref.PackageId = MetadataReader
.templateByName(metdata2)(Ref.QualifiedName.assertFromString("Account:Account"))

View File

@ -16,7 +16,7 @@ import com.daml.ledger.client.LedgerClient
import org.scalatest.{AsyncFlatSpec, BeforeAndAfterAll, Matchers}
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
final class AuthorizationTest extends AsyncFlatSpec with BeforeAndAfterAll with Matchers {
@ -57,9 +57,11 @@ final class AuthorizationTest extends AsyncFlatSpec with BeforeAndAfterAll with
}
}
protected def withLedger[A] =
protected def withLedger[A](testFn: LedgerClient => Future[A]): Future[A] =
HttpServiceTestFixture
.withLedger[A](List(dar), testId, Option(publicToken), mockedAuthService) _
.withLedger[A](List(dar), testId, Option(publicToken), authService = mockedAuthService) {
case (_, client) => testFn(client)
}
private def packageService(client: LedgerClient): PackageService =
new PackageService(HttpService.loadPackageStoreUpdates(client.packageClient, tokenHolder))

View File

@ -9,7 +9,6 @@ import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage, WebSock
import akka.http.scaladsl.model.{StatusCodes, Uri}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import com.daml.http.json.SprayJson
import com.daml.http.util.TestUtil
import HttpServiceTestFixture.UseTls
import akka.actor.ActorSystem
import com.daml.jwt.domain.Jwt
@ -38,6 +37,7 @@ class WebsocketServiceIntegrationTest
with AbstractHttpServiceIntegrationTestFuns
with BeforeAndAfterAll {
import HttpServiceTestFixture._
import WebsocketServiceIntegrationTest._
override def jdbcConfig: Option[JdbcConfig] = None
@ -289,7 +289,7 @@ class WebsocketServiceIntegrationTest
ContractDelta(Vector((ctid, _)), Vector(), None) <- readOne
_ = (ctid: String) shouldBe (iouCid.unwrap: String)
_ <- liftF(
TestUtil.postJsonRequest(
postJsonRequest(
uri.withPath(Uri.Path("/v1/exercise")),
exercisePayload(domain.ContractId(ctid)),
headersWithAuth) map {
@ -447,9 +447,10 @@ class WebsocketServiceIntegrationTest
_ = r2._1 shouldBe 'success
cid2 = getContractId(getResult(r2._2))
lastState <- singleClientQueryStream(jwtForParties(List("Alice", "Bob")), uri, query) via parseResp runWith resp(
cid1,
cid2)
lastState <- singleClientQueryStream(
jwtForParties(List("Alice", "Bob"), testId),
uri,
query) via parseResp runWith resp(cid1, cid2)
liveOffset = inside(lastState) {
case ShouldHaveEnded(liveStart, 5, lastSeen) =>
lastSeen.unwrap should be > liveStart.unwrap
@ -685,16 +686,17 @@ class WebsocketServiceIntegrationTest
_ = r2._1 shouldBe 'success
cid2 = getContractId(getResult(r2._2))
lastState <- singleClientFetchStream(jwtForParties(List("Alice", "Bob")), uri, query) via parseResp runWith resp(
cid1,
cid2)
lastState <- singleClientFetchStream(
jwtForParties(List("Alice", "Bob"), testId),
uri,
query) via parseResp runWith resp(cid1, cid2)
liveOffset = inside(lastState) {
case ShouldHaveEnded(liveStart, 5, lastSeen) =>
lastSeen.unwrap should be > liveStart.unwrap
liveStart
}
rescan <- (singleClientFetchStream(
jwtForParties(List("Alice", "Bob")),
jwtForParties(List("Alice", "Bob"), testId),
uri,
query,
Some(liveOffset))
@ -749,7 +751,7 @@ class WebsocketServiceIntegrationTest
case Node(_, l, r) =>
for {
(StatusCodes.OK, _) <- liftF(
TestUtil.postJsonRequest(
postJsonRequest(
serviceUri.withPath(Uri.Path("/v1/exercise")),
exercisePayload(createdCid, l.x),
headersWithAuth))
@ -780,7 +782,7 @@ class WebsocketServiceIntegrationTest
}
for {
(StatusCodes.OK, _) <- liftF(
TestUtil.postJsonRequest(
postJsonRequest(
serviceUri.withPath(Uri.Path("/v1/create")),
initialPayload,
headersWithAuth))

View File

@ -1,109 +0,0 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.http.util
import java.io.{BufferedWriter, File, FileWriter}
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{
ContentTypes,
HttpEntity,
HttpHeader,
HttpMethods,
HttpRequest,
HttpResponse,
StatusCode,
Uri
}
import akka.stream.Materializer
import akka.util.ByteString
import com.daml.lf.data.TryOps.Bracket.bracket
import com.typesafe.scalalogging.LazyLogging
import spray.json._
import scala.concurrent.{ExecutionContext, Future}
import scala.io.Source
import scala.util.{Failure, Success, Try}
object TestUtil extends LazyLogging {
def requiredFile(fileName: String): Try[File] = {
val file = new File(fileName).getAbsoluteFile
if (file.exists()) Success(file)
else
Failure(new IllegalStateException(s"File doest not exist: $fileName"))
}
def writeToFile(file: File, text: String): Try[File] =
bracket(Try(new BufferedWriter(new FileWriter(file))))(x => Try(x.close())).flatMap { bw =>
Try {
bw.write(text)
file
}
}
def readFile(resourcePath: String): String =
Try {
val source = Source.fromResource(resourcePath)
val content = source.getLines().mkString
source.close
content
} match {
case Success(value) => value
case Failure(ex) => throw ex
}
def getResponseDataBytes(resp: HttpResponse, debug: Boolean = false)(
implicit mat: Materializer,
ec: ExecutionContext): Future[String] = {
val fb = resp.entity.dataBytes.runFold(ByteString.empty)((b, a) => b ++ a).map(_.utf8String)
if (debug) fb.foreach(x => logger.info(s"---- response data: $x"))
fb
}
def postRequest(uri: Uri, json: JsValue, headers: List[HttpHeader] = Nil)(
implicit as: ActorSystem,
ec: ExecutionContext,
mat: Materializer): Future[(StatusCode, JsValue)] = {
Http()
.singleRequest(
HttpRequest(
method = HttpMethods.POST,
uri = uri,
headers = headers,
entity = HttpEntity(ContentTypes.`application/json`, json.prettyPrint))
)
.flatMap { resp =>
val bodyF: Future[String] = getResponseDataBytes(resp, debug = true)
bodyF.map(body => {
(resp.status, body.parseJson)
})
}
}
def postJsonStringRequest(uri: Uri, jsonString: String, headers: List[HttpHeader])(
implicit as: ActorSystem,
ec: ExecutionContext,
mat: Materializer): Future[(StatusCode, JsValue)] = {
logger.info(s"postJson: ${uri.toString} json: ${jsonString: String}")
Http()
.singleRequest(
HttpRequest(
method = HttpMethods.POST,
uri = uri,
headers = headers,
entity = HttpEntity(ContentTypes.`application/json`, jsonString))
)
.flatMap { resp =>
val bodyF: Future[String] = getResponseDataBytes(resp, debug = true)
bodyF.map(body => (resp.status, body.parseJson))
}
}
def postJsonRequest(uri: Uri, json: JsValue, headers: List[HttpHeader])(
implicit as: ActorSystem,
ec: ExecutionContext,
mat: Materializer): Future[(StatusCode, JsValue)] =
postJsonStringRequest(uri, json.prettyPrint, headers)
}