participant-integration-api: Move tracker code around, and tidy up tests. (#10663)

* participant-integration-api: Delete `SizeCappedMap`. It's unused.

* participant-integration-api: `TrackerImpl` → `QueueBackedTracker`.

CHANGELOG_BEGIN
CHANGELOG_END

* participant-integration-api: Clean up QueueBackedTrackerSpec.
This commit is contained in:
Samir Talwar 2021-08-25 12:55:26 +02:00 committed by GitHub
parent fc9d35935a
commit b27cde6440
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 134 additions and 202 deletions

View File

@ -39,7 +39,7 @@ import com.daml.metrics.Metrics
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.apiserver.configuration.LedgerConfigurationSubscription
import com.daml.platform.apiserver.services.ApiCommandService._
import com.daml.platform.apiserver.services.tracking.{Tracker, TrackerImpl, TrackerMap}
import com.daml.platform.apiserver.services.tracking.{Tracker, QueueBackedTracker, TrackerMap}
import com.daml.platform.server.api.ApiException
import com.daml.platform.server.api.services.grpc.GrpcCommandService
import com.daml.util.Ctx
@ -249,7 +249,7 @@ private[apiserver] object ApiCommandService {
lengthCounter = metrics.daml.commands.maxInFlightLength(metricsPrefixFirstParty),
).joinMat(commandTrackerFlow)(Keep.right)
TrackerImpl(
QueueBackedTracker(
trackingFlow,
configuration.inputBufferSize,
capacityCounter = metrics.daml.commands.inputBufferCapacity(metricsPrefixFirstParty),

View File

@ -14,6 +14,7 @@ import com.daml.ledger.client.services.commands.CommandTrackerFlow.Materialized
import com.daml.ledger.client.services.commands.tracker.CompletionResponse._
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.InstrumentedSource
import com.daml.platform.apiserver.services.tracking.QueueBackedTracker._
import com.daml.platform.server.api.ApiException
import com.daml.util.Ctx
import io.grpc.{Status => GrpcStatus}
@ -25,28 +26,17 @@ import scala.util.{Failure, Success}
/** Tracks SubmitAndWaitRequests.
* @param queue The input queue to the tracking flow.
*/
private[services] final class TrackerImpl(
queue: SourceQueueWithComplete[TrackerImpl.QueueInput],
private[services] final class QueueBackedTracker(
queue: SourceQueueWithComplete[QueueBackedTracker.QueueInput],
done: Future[Done],
)(implicit
loggingContext: LoggingContext
) extends Tracker {
import TrackerImpl.logger
)(implicit loggingContext: LoggingContext)
extends Tracker {
override def track(request: SubmitAndWaitRequest)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = {
logger.trace("Tracking command")
submitNewRequest(request)
}
private def submitNewRequest(
request: SubmitAndWaitRequest
)(implicit
ec: ExecutionContext
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = {
val trackedPromise = Promise[Either[CompletionFailure, CompletionSuccess]]()
queue
.offer(
@ -105,7 +95,7 @@ private[services] final class TrackerImpl(
}
}
private[services] object TrackerImpl {
private[services] object QueueBackedTracker {
private val logger = ContextualizedLogger.get(this.getClass)
@ -122,7 +112,7 @@ private[services] object TrackerImpl {
capacityCounter: Counter,
lengthCounter: Counter,
delayTimer: Timer,
)(implicit materializer: Materializer, loggingContext: LoggingContext): TrackerImpl = {
)(implicit materializer: Materializer, loggingContext: LoggingContext): QueueBackedTracker = {
val ((queue, mat), done) = InstrumentedSource
.queue[QueueInput](
inputBufferSize,
@ -170,7 +160,7 @@ private[services] object TrackerImpl {
)(DirectExecutionContext)
}(DirectExecutionContext)
new TrackerImpl(queue, done)
new QueueBackedTracker(queue, done)
}
type QueueInput = Ctx[Promise[Either[CompletionFailure, CompletionSuccess]], SubmitRequest]

View File

@ -1,15 +0,0 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver.services.tracking
import java.util
private[tracking] class SizeCappedMap[K, V](initialCapacity: Int, maxCapacity: Int)
extends util.LinkedHashMap[K, V](initialCapacity) {
override def removeEldestEntry(eldest: util.Map.Entry[K, V]): Boolean = {
size() > maxCapacity
}
}

View File

@ -0,0 +1,124 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver.services.tracking
import akka.stream.scaladsl.{Keep, Source, SourceQueueWithComplete}
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{Materializer, OverflowStrategy}
import akka.{Done, NotUsed}
import com.daml.grpc.GrpcStatus
import com.daml.ledger.api.testing.utils.{AkkaBeforeAndAfterAll, TestingException}
import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.daml.ledger.api.v1.commands.Commands
import com.daml.ledger.client.services.commands.tracker.CompletionResponse
import com.daml.logging.LoggingContext
import com.daml.platform.apiserver.services.tracking.QueueBackedTracker.QueueInput
import com.daml.platform.apiserver.services.tracking.QueueBackedTrackerSpec._
import com.google.rpc.status.{Status => StatusProto}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import org.scalatest.{BeforeAndAfterEach, Inside}
import scala.concurrent.{ExecutionContext, Future}
class QueueBackedTrackerSpec
extends AsyncWordSpec
with Matchers
with BeforeAndAfterEach
with AkkaBeforeAndAfterAll
with Inside {
private implicit val ec: ExecutionContext = ExecutionContext.global
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
private var consumer: TestSubscriber.Probe[NotUsed] = _
private var queue: SourceQueueWithComplete[QueueBackedTracker.QueueInput] = _
override protected def beforeEach(): Unit = {
val (q, sink) = alwaysSuccessfulQueue(bufferSize = 1)
queue = q
consumer = sink
}
override protected def afterEach(): Unit = {
consumer.cancel()
queue.complete()
}
"Tracker Implementation" when {
"input is submitted, and the queue is available" should {
"work successfully" in {
val tracker = new QueueBackedTracker(queue, Future.successful(Done))
val completion1F = tracker.track(input(1))
consumer.requestNext()
val completion2F = tracker.track(input(2))
consumer.requestNext()
for {
_ <- completion1F
_ <- completion2F
} yield succeed
}
}
"input is submitted, and the queue is backpressuring" should {
"return a RESOURCE_EXHAUSTED error" in {
val tracker = new QueueBackedTracker(queue, Future.successful(Done))
tracker.track(input(1))
tracker.track(input(2)).map { completion =>
completion should matchPattern {
case Left(CompletionResponse.QueueSubmitFailure(GrpcStatus.RESOURCE_EXHAUSTED())) =>
}
}
}
}
"input is submitted, and the queue has been completed" should {
"return an ABORTED error" in {
val tracker = new QueueBackedTracker(queue, Future.successful(Done))
queue.complete()
tracker.track(input(2)).map { completion =>
completion should matchPattern {
case Left(CompletionResponse.QueueSubmitFailure(GrpcStatus.ABORTED())) =>
}
}
}
}
"input is submitted, and the queue has failed" should {
"return an ABORTED error" in {
val tracker = new QueueBackedTracker(queue, Future.successful(Done))
queue.fail(TestingException("The queue fails with this error."))
tracker.track(input(2)).map { completion =>
completion should matchPattern {
case Left(CompletionResponse.QueueSubmitFailure(GrpcStatus.ABORTED())) =>
}
}
}
}
}
}
object QueueBackedTrackerSpec {
private def input(commandId: Int) = SubmitAndWaitRequest.of(
commands = Some(Commands(commandId = commandId.toString))
)
private def alwaysSuccessfulQueue(bufferSize: Int)(implicit
materializer: Materializer
): (SourceQueueWithComplete[QueueInput], TestSubscriber.Probe[NotUsed]) =
Source
.queue[QueueInput](bufferSize, OverflowStrategy.dropNew)
.map { in =>
val completion = CompletionResponse.CompletionSuccess(
commandId = in.value.getCommands.commandId,
transactionId = "",
originalStatus = StatusProto.defaultInstance,
)
in.context.success(Right(completion))
NotUsed
}
.toMat(TestSink.probe[NotUsed](materializer.system))(Keep.both)
.run()
}

View File

@ -1,45 +0,0 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver.services.tracking
import org.scalatest.BeforeAndAfterEach
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
class SizeCappedMapTest extends AnyWordSpec with BeforeAndAfterEach with Matchers {
private var sut: SizeCappedMap[Long, Long] = _
override protected def beforeEach(): Unit = {
sut = new SizeCappedMap[Long, Long](1, 2)
}
"ExpiringMap" when {
"has spare capacity " should {
"retain elements as a new one is inserted" in {
sut.put(0L, 0L)
sut.put(1L, 1L)
Option(sut.get(0L)) should not be empty
Option(sut.get(1L)) should not be empty
}
}
"does not have spare capacity" should {
"drop the oldest element as a new one is inserted" in {
sut.put(0L, 0L)
sut.put(1L, 1L)
sut.put(2L, 2L)
Option(sut.get(0L)) shouldBe empty
Option(sut.get(1L)) should not be empty
Option(sut.get(2L)) should not be empty
}
}
}
}

View File

@ -1,122 +0,0 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver.services.tracking
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Keep, Source, SourceQueueWithComplete}
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.TestSink
import akka.{Done, NotUsed}
import com.daml.dec.DirectExecutionContext
import com.daml.ledger.api.testing.utils.{AkkaBeforeAndAfterAll, TestingException}
import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.daml.ledger.api.v1.commands.Commands
import com.daml.ledger.client.services.commands.tracker.CompletionResponse
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.QueueSubmitFailure
import com.daml.logging.LoggingContext
import com.google.rpc.status.{Status => StatusProto}
import io.grpc.Status.Code
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.{BeforeAndAfterEach, Inside, Succeeded}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
class TrackerImplTest
extends AnyWordSpec
with Matchers
with BeforeAndAfterEach
with ScalaFutures
with AkkaBeforeAndAfterAll
with Inside {
override implicit def patienceConfig: PatienceConfig = PatienceConfig(5.seconds, 1.second)
private var sut: Tracker = _
private var consumer: TestSubscriber.Probe[NotUsed] = _
private var queue: SourceQueueWithComplete[TrackerImpl.QueueInput] = _
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
private def input(cid: Int) = SubmitAndWaitRequest(Some(Commands(commandId = cid.toString)))
override protected def beforeEach(): Unit = {
val (q, sink) = Source
.queue[TrackerImpl.QueueInput](1, OverflowStrategy.dropNew)
.map { in =>
in.context.success(
Right(
CompletionResponse.CompletionSuccess(in.value.getCommands.commandId, "", StatusProto())
)
)
NotUsed
}
.toMat(TestSink.probe[NotUsed])(Keep.both)
.run()
queue = q
sut = new TrackerImpl(q, Future.successful(Done))
consumer = sink
}
override protected def afterEach(): Unit = {
consumer.cancel()
queue.complete()
}
"Tracker Implementation" when {
"input is submitted, and the queue is available" should {
"work successfully" in {
val resultF1 = sut.track(input(1))
consumer.requestNext()
val resultF = resultF1.flatMap(_ => sut.track(input(2)))(DirectExecutionContext)
consumer.requestNext()
whenReady(resultF)(_ => Succeeded)
}
}
"input is submitted, and the queue is backpressuring" should {
"return a RESOURCE_EXHAUSTED error" in {
sut.track(input(1))
whenReady(sut.track(input(2)))(failure => {
inside(failure) { case Left(QueueSubmitFailure(statusCode)) =>
statusCode.getCode should be(Code.RESOURCE_EXHAUSTED)
}
})
}
}
"input is submitted, and the queue has been completed" should {
"return an ABORTED error" in {
queue.complete()
whenReady(sut.track(input(2)))(failure => {
inside(failure) { case Left(QueueSubmitFailure(statusCode)) =>
statusCode.getCode should be(Code.ABORTED)
}
})
}
}
"input is submitted, and the queue has failed" should {
"return an ABORTED error" in {
queue.fail(TestingException("The queue fails with this error."))
whenReady(sut.track(input(2)))(failure => {
inside(failure) { case Left(QueueSubmitFailure(statusCode)) =>
statusCode.getCode should be(Code.ABORTED)
}
})
}
}
}
}