ledger-api-client + participant-integration-api: Increase the default maximum inbound error size, and truncate errors well before that. (#6807)

* participant-integration-api: `GrpcServerOwner` -> `GrpcServer.Owner`.

Mostly so I can create a test class named `GrpcServerSpec`.

* ports: Move the free port search from postgresql-testing.

* participant-integration-api: Test the basics of GrpcServer.

This uses the HelloService to make sure the server behaves normally.

* ledger-api-client: Extract out channel configuration from LedgerClient.

So we can test it independently of the LedgerClient itself.

* ledger-api-client: Increase the default maximum inbound header size.

Increased from 8 KB to 1 MB.

* participant-integration-api: Reduce the maximum error message size.

Truncate GRPC error descriptions to 256 KB.

* participant-integration-api: Use `Port.Dynamic` instead of `FreePort`.

In tests.

* participant-integration-api: Explicit null checks when they're shorter.

Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>

* ledger-api-client: Reduce the max inbound message size back to 8 KB.

And reduce the maximum size of an error description pushed out by the
server accordingly.

CHANGELOG_BEGIN
- [Integration Kit] Truncate GPRC error messages at 4 KB. This ensures
  that we won't trigger a protocol error when sending errors to the
  client.
CHANGELOG_END

Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>
This commit is contained in:
Samir Talwar 2020-07-21 17:50:33 +02:00 committed by GitHub
parent ee74551642
commit d6fc2bbb58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 317 additions and 88 deletions

View File

@ -19,6 +19,7 @@ message HelloResponse {
} }
service HelloService { service HelloService {
rpc Single (HelloRequest) returns (HelloResponse); rpc Single (HelloRequest) returns (HelloResponse);
rpc ServerStreaming (HelloRequest) returns (stream HelloResponse); rpc ServerStreaming (HelloRequest) returns (stream HelloResponse);
rpc Fails (HelloRequest) returns (HelloResponse);
} }

View File

@ -13,6 +13,9 @@ trait Responding extends HelloService {
override def single(request: HelloRequest): Future[HelloResponse] = override def single(request: HelloRequest): Future[HelloResponse] =
Future.successful(response(request)) Future.successful(response(request))
override def fails(request: HelloRequest): Future[HelloResponse] =
Future.failed(new IllegalStateException(request.payload.toStringUtf8))
protected def response(request: HelloRequest): HelloResponse = protected def response(request: HelloRequest): HelloResponse =
HelloResponse(request.reqInt * 2, request.payload) HelloResponse(request.reqInt * 2, request.payload)

View File

@ -23,6 +23,8 @@ da_scala_library(
"//ledger/ledger-api-domain", "//ledger/ledger-api-domain",
"//libs-scala/direct-execution-context", "//libs-scala/direct-execution-context",
"//libs-scala/grpc-utils", "//libs-scala/grpc-utils",
"//libs-scala/ports",
"//libs-scala/resources",
"@maven//:com_typesafe_akka_akka_actor_2_12", "@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12", "@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:io_grpc_grpc_netty", "@maven//:io_grpc_grpc_netty",

View File

@ -0,0 +1,43 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.client
import java.net.{InetAddress, InetSocketAddress}
import com.daml.ledger.client.configuration.LedgerClientConfiguration
import com.daml.ports.Port
import com.daml.resources.{Resource, ResourceOwner}
import io.grpc.ManagedChannel
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
import scala.concurrent.{ExecutionContext, Future}
object GrpcChannel {
final class Owner(builder: NettyChannelBuilder, configuration: LedgerClientConfiguration)
extends ResourceOwner[ManagedChannel] {
def this(port: Port, configuration: LedgerClientConfiguration) =
this(
NettyChannelBuilder
.forAddress(new InetSocketAddress(InetAddress.getLoopbackAddress, port.value)),
configuration,
)
override def acquire()(implicit executionContext: ExecutionContext): Resource[ManagedChannel] =
Resource(
Future {
configuration.sslContext
.fold(builder.usePlaintext())(
builder.sslContext(_).negotiationType(NegotiationType.TLS))
builder.maxInboundMetadataSize(configuration.maxInboundMessageSize)
builder.build()
}
)(channel =>
Future {
channel.shutdownNow()
()
})
}
}

View File

@ -20,15 +20,14 @@ import com.daml.ledger.api.v1.package_service.PackageServiceGrpc
import com.daml.ledger.api.v1.transaction_service.TransactionServiceGrpc import com.daml.ledger.api.v1.transaction_service.TransactionServiceGrpc
import com.daml.ledger.client.configuration.LedgerClientConfiguration import com.daml.ledger.client.configuration.LedgerClientConfiguration
import com.daml.ledger.client.services.acs.ActiveContractSetClient import com.daml.ledger.client.services.acs.ActiveContractSetClient
import com.daml.ledger.client.services.admin.PackageManagementClient import com.daml.ledger.client.services.admin.{PackageManagementClient, PartyManagementClient}
import com.daml.ledger.client.services.admin.PartyManagementClient
import com.daml.ledger.client.services.commands.{CommandClient, SynchronousCommandClient} import com.daml.ledger.client.services.commands.{CommandClient, SynchronousCommandClient}
import com.daml.ledger.client.services.identity.LedgerIdentityClient import com.daml.ledger.client.services.identity.LedgerIdentityClient
import com.daml.ledger.client.services.pkg.PackageClient import com.daml.ledger.client.services.pkg.PackageClient
import com.daml.ledger.client.services.transactions.TransactionClient import com.daml.ledger.client.services.transactions.TransactionClient
import io.grpc.{Channel, ManagedChannel} import io.grpc.netty.NettyChannelBuilder
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
import io.grpc.stub.AbstractStub import io.grpc.stub.AbstractStub
import io.grpc.{Channel, ManagedChannel}
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
@ -126,15 +125,14 @@ object LedgerClient {
def fromBuilder(builder: NettyChannelBuilder, configuration: LedgerClientConfiguration)( def fromBuilder(builder: NettyChannelBuilder, configuration: LedgerClientConfiguration)(
implicit ec: ExecutionContext, implicit ec: ExecutionContext,
esf: ExecutionSequencerFactory): Future[LedgerClient] = { esf: ExecutionSequencerFactory): Future[LedgerClient] = {
configuration.sslContext.fold(builder.usePlaintext())( val resource = new GrpcChannel.Owner(builder, configuration).acquire()
builder.sslContext(_).negotiationType(NegotiationType.TLS)) resource.asFuture.flatMap { channel =>
val channel = builder.build() sys.addShutdownHook {
sys.addShutdownHook { resource.release()
if (!channel.isShutdown) { ()
val _ = channel.shutdownNow()
} }
apply(channel, configuration)
} }
apply(channel, configuration)
} }
} }

View File

@ -3,18 +3,22 @@
package com.daml.ledger.client.configuration package com.daml.ledger.client.configuration
import io.grpc.internal.GrpcUtil
import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContext
/** /**
* @param applicationId The string that will be used as an application identifier when issuing commands and retrieving transactions * @param applicationId The string that will be used as an application identifier when issuing commands and retrieving transactions
* @param ledgerIdRequirement A [[LedgerIdRequirement]] specifying how the ledger identifier must be checked against the one returned by the LedgerIdentityService * @param ledgerIdRequirement A [[LedgerIdRequirement]] specifying how the ledger identifier must be checked against the one returned by the LedgerIdentityService
* @param commandClient The [[CommandClientConfiguration]] that defines how the command client should be setup with regards to timeouts, commands in flight and command TTL * @param commandClient The [[CommandClientConfiguration]] that defines how the command client should be setup with regards to timeouts, commands in flight and command TTL
* @param sslContext If defined, the context will be passed on to the underlying gRPC code to ensure the communication channel is secured by TLS * @param sslContext If defined, the context will be passed on to the underlying gRPC code to ensure the communication channel is secured by TLS
* @param token If defined, the access token that will be passed by default, unless overridden in individual calls (mostly useful for short-lived applications) * @param token If defined, the access token that will be passed by default, unless overridden in individual calls (mostly useful for short-lived applications)
* @param maxInboundMessageSize The maximum size of the response headers.
*/ */
final case class LedgerClientConfiguration( final case class LedgerClientConfiguration(
applicationId: String, applicationId: String,
ledgerIdRequirement: LedgerIdRequirement, ledgerIdRequirement: LedgerIdRequirement,
commandClient: CommandClientConfiguration, commandClient: CommandClientConfiguration,
sslContext: Option[SslContext], sslContext: Option[SslContext],
token: Option[String] = None) token: Option[String] = None,
maxInboundMessageSize: Int = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
)

View File

@ -8,56 +8,72 @@ import java.net.{BindException, InetAddress, InetSocketAddress}
import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.TimeUnit.SECONDS
import com.daml.metrics.Metrics import com.daml.metrics.Metrics
import com.daml.platform.apiserver.GrpcServerOwner._
import com.daml.ports.Port import com.daml.ports.Port
import com.daml.resources.{Resource, ResourceOwner} import com.daml.resources.{Resource, ResourceOwner}
import com.google.protobuf.Message import com.google.protobuf.Message
import io.grpc.netty.NettyServerBuilder
import io.grpc._ import io.grpc._
import io.grpc.netty.NettyServerBuilder
import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContext
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
final class GrpcServerOwner( object GrpcServer {
address: Option[String],
desiredPort: Port, // Unfortunately, we can't get the maximum inbound message size from the client, so we don't know
maxInboundMessageSize: Int, // how big this should be. This seems long enough to contain useful data, but short enough that it
sslContext: Option[SslContext] = None, // won't break most well-configured clients.
interceptors: List[ServerInterceptor] = List.empty, // As the default response header limit for a Netty client is 8 KB, we set our limit to 4 KB to
metrics: Metrics, // allow for extra information such as the exception stack trace.
eventLoopGroups: ServerEventLoopGroups, private val MaximumStatusDescriptionLength = 4 * 1024 // 4 KB
services: Iterable[BindableService],
) extends ResourceOwner[Server] { final class Owner(
override def acquire()(implicit executionContext: ExecutionContext): Resource[Server] = { address: Option[String],
val host = address.map(InetAddress.getByName).getOrElse(InetAddress.getLoopbackAddress) desiredPort: Port,
Resource(Future { maxInboundMessageSize: Int,
val builder = NettyServerBuilder.forAddress(new InetSocketAddress(host, desiredPort.value)) sslContext: Option[SslContext] = None,
builder.sslContext(sslContext.orNull) interceptors: List[ServerInterceptor] = List.empty,
builder.channelType(classOf[NioServerSocketChannel]) metrics: Metrics,
builder.permitKeepAliveTime(10, SECONDS) eventLoopGroups: ServerEventLoopGroups,
builder.permitKeepAliveWithoutCalls(true) services: Iterable[BindableService],
builder.directExecutor() ) extends ResourceOwner[Server] {
builder.maxInboundMessageSize(maxInboundMessageSize) override def acquire()(implicit executionContext: ExecutionContext): Resource[Server] = {
interceptors.foreach(builder.intercept) val host = address.map(InetAddress.getByName).getOrElse(InetAddress.getLoopbackAddress)
builder.intercept(new MetricsInterceptor(metrics)) Resource(Future {
eventLoopGroups.populate(builder) val builder = NettyServerBuilder.forAddress(new InetSocketAddress(host, desiredPort.value))
services.foreach { service => builder.sslContext(sslContext.orNull)
builder.addService(service) builder.channelType(classOf[NioServerSocketChannel])
toLegacyService(service).foreach(builder.addService) builder.permitKeepAliveTime(10, SECONDS)
} builder.permitKeepAliveWithoutCalls(true)
val server = builder.build() builder.directExecutor()
try { builder.maxInboundMessageSize(maxInboundMessageSize)
server.start() interceptors.foreach(builder.intercept)
} catch { builder.intercept(new MetricsInterceptor(metrics))
case e: IOException if e.getCause != null && e.getCause.isInstanceOf[BindException] => builder.intercept(new TruncatedStatusInterceptor(MaximumStatusDescriptionLength))
throw new UnableToBind(desiredPort, e.getCause) eventLoopGroups.populate(builder)
} services.foreach { service =>
server builder.addService(service)
})(server => Future(server.shutdown().awaitTermination())) toLegacyService(service).foreach(builder.addService)
}
val server = builder.build()
try {
server.start()
} catch {
case e: IOException if e.getCause != null && e.getCause.isInstanceOf[BindException] =>
throw new UnableToBind(desiredPort, e.getCause)
}
server
})(server => Future(server.shutdown().awaitTermination()))
}
} }
final class UnableToBind(port: Port, cause: Throwable)
extends RuntimeException(
s"The API server was unable to bind to port $port. Terminate the process occupying the port, or choose a different one.",
cause)
with NoStackTrace
// This exposes the existing services under com.daml also under com.digitalasset. // This exposes the existing services under com.daml also under com.digitalasset.
// This is necessary to allow applications built with an earlier version of the SDK // This is necessary to allow applications built with an earlier version of the SDK
// to still work. // to still work.
@ -88,14 +104,5 @@ final class GrpcServerOwner(
Option(digitalassetDef.build()) Option(digitalassetDef.build())
} else None } else None
} }
}
object GrpcServerOwner {
final class UnableToBind(port: Port, cause: Throwable)
extends RuntimeException(
s"The API server was unable to bind to port $port. Terminate the process occupying the port, or choose a different one.",
cause)
with NoStackTrace
} }

View File

@ -38,7 +38,7 @@ final class LedgerApiServer(
).acquire() ).acquire()
apiServicesResource = apiServicesOwner.acquire() apiServicesResource = apiServicesOwner.acquire()
apiServices <- apiServicesResource apiServices <- apiServicesResource
server <- new GrpcServerOwner( server <- new GrpcServer.Owner(
address, address,
desiredPort, desiredPort,
maxInboundMessageSize, maxInboundMessageSize,

View File

@ -0,0 +1,30 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall
import io.grpc.{Metadata, ServerCall, ServerCallHandler, ServerInterceptor, Status}
class TruncatedStatusInterceptor(maximumDescriptionLength: Int) extends ServerInterceptor {
override def interceptCall[ReqT, RespT](
call: ServerCall[ReqT, RespT],
headers: Metadata,
next: ServerCallHandler[ReqT, RespT],
): ServerCall.Listener[ReqT] =
next.startCall(
new SimpleForwardingServerCall[ReqT, RespT](call) {
override def close(status: Status, trailers: Metadata): Unit = {
val truncatedStatus = status.withDescription(truncate(status.getDescription))
super.close(truncatedStatus, trailers)
}
},
headers,
)
private def truncate(description: String): String =
if (description != null && description.length > maximumDescriptionLength)
description.substring(0, maximumDescriptionLength - 3) + "..."
else
description
}

View File

@ -0,0 +1,117 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver
import com.codahale.metrics.MetricRegistry
import com.daml.grpc.sampleservice.implementations.ReferenceImplementation
import com.daml.ledger.client.GrpcChannel
import com.daml.ledger.client.configuration.{
CommandClientConfiguration,
LedgerClientConfiguration,
LedgerIdRequirement
}
import com.daml.metrics.Metrics
import com.daml.platform.apiserver.GrpcServerSpec._
import com.daml.platform.hello.{HelloRequest, HelloServiceGrpc}
import com.daml.ports.Port
import com.daml.resources.ResourceOwner
import com.google.protobuf.ByteString
import io.grpc.ManagedChannel
import org.scalatest.{AsyncWordSpec, Matchers}
final class GrpcServerSpec extends AsyncWordSpec with Matchers {
"a GRPC server" should {
"handle a request to a valid service" in {
resources().use { channel =>
val helloService = HelloServiceGrpc.stub(channel)
for {
response <- helloService.single(HelloRequest(7))
} yield {
response.respInt shouldBe 14
}
}
}
"fail with a nice exception" in {
resources().use { channel =>
val helloService = HelloServiceGrpc.stub(channel)
for {
exception <- helloService
.fails(HelloRequest(7, ByteString.copyFromUtf8("This is some text.")))
.failed
} yield {
exception.getMessage shouldBe "INTERNAL: This is some text."
}
}
}
"fail with a nice exception, even when the text is quite long" in {
val length = 2 * 1024
val exceptionMessage = "There was an error. " + Stream.continually("x").take(length).mkString
resources().use { channel =>
val helloService = HelloServiceGrpc.stub(channel)
for {
exception <- helloService
.fails(HelloRequest(7, ByteString.copyFromUtf8(exceptionMessage)))
.failed
} yield {
exception.getMessage shouldBe s"INTERNAL: $exceptionMessage"
}
}
}
"fail with a nice exception, even when the text is too long for the client to process" in {
val length = 1024 * 1024
val exceptionMessage =
"There was an error. " +
Stream.continually("x").take(length).mkString +
" And then some extra text that won't be sent."
resources().use { channel =>
val helloService = HelloServiceGrpc.stub(channel)
for {
exception <- helloService
.fails(HelloRequest(7, ByteString.copyFromUtf8(exceptionMessage)))
.failed
} yield {
// We don't want to test the exact message content, just that it does indeed contain a
// large chunk of the response error message, followed by "...".
exception.getMessage should fullyMatch regex "INTERNAL: There was an error. x{1024,}\\.\\.\\.".r
}
}
}
}
}
object GrpcServerSpec {
private val maxInboundMessageSize = 4 * 1024 * 1024 /* copied from the Sandbox configuration */
private val clientConfiguration = LedgerClientConfiguration(
applicationId = classOf[GrpcServerSpec].getSimpleName,
ledgerIdRequirement = LedgerIdRequirement.none,
commandClient = CommandClientConfiguration.default,
sslContext = None,
)
private def resources(): ResourceOwner[ManagedChannel] =
for {
eventLoopGroups <- new ServerEventLoopGroups.Owner(
classOf[GrpcServerSpec].getSimpleName,
workerParallelism = sys.runtime.availableProcessors(),
bossParallelism = 1,
)
server <- new GrpcServer.Owner(
address = None,
desiredPort = Port.Dynamic,
maxInboundMessageSize = maxInboundMessageSize,
metrics = new Metrics(new MetricRegistry),
eventLoopGroups = eventLoopGroups,
services = Seq(new ReferenceImplementation),
)
channel <- new GrpcChannel.Owner(Port(server.getPort), clientConfiguration)
} yield channel
}

View File

@ -0,0 +1,19 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ports
import java.net.{InetAddress, ServerSocket}
object FreePort {
def find(): Port = {
val socket = new ServerSocket(0, 0, InetAddress.getLoopbackAddress)
try {
Port(socket.getLocalPort)
} finally {
socket.close()
}
}
}

View File

@ -0,0 +1,15 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ports
import org.scalatest.{Matchers, WordSpec}
class FreePortSpec extends WordSpec with Matchers {
"a free port" should {
"always be available" in {
val port = FreePort.find()
port.value should (be >= 1024 and be < 65536)
}
}
}

View File

@ -3,29 +3,19 @@
package com.daml.testing.postgresql package com.daml.testing.postgresql
import java.net.{InetAddress, ServerSocket} import com.daml.ports
import com.daml.ports.Port
import scala.annotation.tailrec import scala.annotation.tailrec
private[postgresql] object FreePort { private[postgresql] object LockedFreePort {
@tailrec @tailrec
def find(tries: Int = 10): PortLock.Locked = { def find(tries: Int = 10): PortLock.Locked = {
val socket = new ServerSocket(0, 0, InetAddress.getLoopbackAddress) val port = ports.FreePort.find()
val portLock = try { PortLock.lock(port) match {
val port = Port(socket.getLocalPort)
PortLock.lock(port)
} finally {
socket.close()
}
portLock match {
case Right(locked) => case Right(locked) =>
socket.close()
locked locked
case Left(failure) => case Left(failure) =>
socket.close()
if (tries <= 1) { if (tries <= 1) {
throw failure throw failure
} else { } else {

View File

@ -55,7 +55,7 @@ trait PostgresAround {
val dataDir = root.resolve("data") val dataDir = root.resolve("data")
val configPath = dataDir.resolve("postgresql.conf") val configPath = dataDir.resolve("postgresql.conf")
val logFile = Files.createFile(root.resolve("postgresql.log")) val logFile = Files.createFile(root.resolve("postgresql.log"))
val lockedPort = FreePort.find() val lockedPort = LockedFreePort.find()
val hostName = InetAddress.getLoopbackAddress.getHostAddress val hostName = InetAddress.getLoopbackAddress.getHostAddress
val port = lockedPort.port val port = lockedPort.port
val userName = "test" val userName = "test"

View File

@ -5,10 +5,10 @@ package com.daml.testing.postgresql
import org.scalatest.{Matchers, WordSpec} import org.scalatest.{Matchers, WordSpec}
class FreePortSpec extends WordSpec with Matchers { class LockedFreePortSpec extends WordSpec with Matchers {
"a free port" should { "a free port" should {
"always be available" in { "always be available" in {
val lockedPort = FreePort.find() val lockedPort = LockedFreePort.find()
try { try {
lockedPort.port.value should (be >= 1024 and be < 65536) lockedPort.port.value should (be >= 1024 and be < 65536)
} finally { } finally {
@ -17,7 +17,7 @@ class FreePortSpec extends WordSpec with Matchers {
} }
"lock, to prevent race conditions" in { "lock, to prevent race conditions" in {
val lockedPort = FreePort.find() val lockedPort = LockedFreePort.find()
try { try {
PortLock.lock(lockedPort.port) should be(Left(PortLock.FailedToLock(lockedPort.port))) PortLock.lock(lockedPort.port) should be(Left(PortLock.FailedToLock(lockedPort.port)))
} finally { } finally {
@ -26,7 +26,7 @@ class FreePortSpec extends WordSpec with Matchers {
} }
"unlock when the server's started" in { "unlock when the server's started" in {
val lockedPort = FreePort.find() val lockedPort = LockedFreePort.find()
lockedPort.unlock() lockedPort.unlock()
val locked = PortLock val locked = PortLock
@ -36,8 +36,8 @@ class FreePortSpec extends WordSpec with Matchers {
succeed succeed
} }
"can be unlocked twice" in { "not error if it's unlocked twice" in {
val lockedPort = FreePort.find() val lockedPort = LockedFreePort.find()
lockedPort.unlock() lockedPort.unlock()
lockedPort.unlock() lockedPort.unlock()
succeed succeed