From b27cde64401368b40eca7257c359a83fea289e9b Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Wed, 25 Aug 2021 12:55:26 +0200 Subject: [PATCH] participant-integration-api: Move tracker code around, and tidy up tests. (#10663) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * participant-integration-api: Delete `SizeCappedMap`. It's unused. * participant-integration-api: `TrackerImpl` → `QueueBackedTracker`. CHANGELOG_BEGIN CHANGELOG_END * participant-integration-api: Clean up QueueBackedTrackerSpec. --- .../services/ApiCommandService.scala | 4 +- ...kerImpl.scala => QueueBackedTracker.scala} | 26 ++-- .../services/tracking/SizeCappedMap.scala | 15 --- .../tracking/QueueBackedTrackerSpec.scala | 124 ++++++++++++++++++ .../services/tracking/SizeCappedMapTest.scala | 45 ------- .../services/tracking/TrackerImplTest.scala | 122 ----------------- 6 files changed, 134 insertions(+), 202 deletions(-) rename ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/{TrackerImpl.scala => QueueBackedTracker.scala} (92%) delete mode 100644 ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/SizeCappedMap.scala create mode 100644 ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/QueueBackedTrackerSpec.scala delete mode 100644 ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/SizeCappedMapTest.scala delete mode 100644 ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/TrackerImplTest.scala diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala index 4fd869a3f17..3765d90c47c 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala @@ -39,7 +39,7 @@ import com.daml.metrics.Metrics import com.daml.platform.api.grpc.GrpcApiService import com.daml.platform.apiserver.configuration.LedgerConfigurationSubscription import com.daml.platform.apiserver.services.ApiCommandService._ -import com.daml.platform.apiserver.services.tracking.{Tracker, TrackerImpl, TrackerMap} +import com.daml.platform.apiserver.services.tracking.{Tracker, QueueBackedTracker, TrackerMap} import com.daml.platform.server.api.ApiException import com.daml.platform.server.api.services.grpc.GrpcCommandService import com.daml.util.Ctx @@ -249,7 +249,7 @@ private[apiserver] object ApiCommandService { lengthCounter = metrics.daml.commands.maxInFlightLength(metricsPrefixFirstParty), ).joinMat(commandTrackerFlow)(Keep.right) - TrackerImpl( + QueueBackedTracker( trackingFlow, configuration.inputBufferSize, capacityCounter = metrics.daml.commands.inputBufferCapacity(metricsPrefixFirstParty), diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerImpl.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/QueueBackedTracker.scala similarity index 92% rename from ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerImpl.scala rename to ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/QueueBackedTracker.scala index fab89a3fd10..ddfde05c8ba 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerImpl.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/QueueBackedTracker.scala @@ -14,6 +14,7 @@ import com.daml.ledger.client.services.commands.CommandTrackerFlow.Materialized import com.daml.ledger.client.services.commands.tracker.CompletionResponse._ import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.InstrumentedSource +import com.daml.platform.apiserver.services.tracking.QueueBackedTracker._ import com.daml.platform.server.api.ApiException import com.daml.util.Ctx import io.grpc.{Status => GrpcStatus} @@ -25,28 +26,17 @@ import scala.util.{Failure, Success} /** Tracks SubmitAndWaitRequests. * @param queue The input queue to the tracking flow. */ -private[services] final class TrackerImpl( - queue: SourceQueueWithComplete[TrackerImpl.QueueInput], +private[services] final class QueueBackedTracker( + queue: SourceQueueWithComplete[QueueBackedTracker.QueueInput], done: Future[Done], -)(implicit - loggingContext: LoggingContext -) extends Tracker { - - import TrackerImpl.logger +)(implicit loggingContext: LoggingContext) + extends Tracker { override def track(request: SubmitAndWaitRequest)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext, ): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = { logger.trace("Tracking command") - submitNewRequest(request) - } - - private def submitNewRequest( - request: SubmitAndWaitRequest - )(implicit - ec: ExecutionContext - ): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = { val trackedPromise = Promise[Either[CompletionFailure, CompletionSuccess]]() queue .offer( @@ -105,7 +95,7 @@ private[services] final class TrackerImpl( } } -private[services] object TrackerImpl { +private[services] object QueueBackedTracker { private val logger = ContextualizedLogger.get(this.getClass) @@ -122,7 +112,7 @@ private[services] object TrackerImpl { capacityCounter: Counter, lengthCounter: Counter, delayTimer: Timer, - )(implicit materializer: Materializer, loggingContext: LoggingContext): TrackerImpl = { + )(implicit materializer: Materializer, loggingContext: LoggingContext): QueueBackedTracker = { val ((queue, mat), done) = InstrumentedSource .queue[QueueInput]( inputBufferSize, @@ -170,7 +160,7 @@ private[services] object TrackerImpl { )(DirectExecutionContext) }(DirectExecutionContext) - new TrackerImpl(queue, done) + new QueueBackedTracker(queue, done) } type QueueInput = Ctx[Promise[Either[CompletionFailure, CompletionSuccess]], SubmitRequest] diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/SizeCappedMap.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/SizeCappedMap.scala deleted file mode 100644 index 641abf66c73..00000000000 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/SizeCappedMap.scala +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.apiserver.services.tracking - -import java.util - -private[tracking] class SizeCappedMap[K, V](initialCapacity: Int, maxCapacity: Int) - extends util.LinkedHashMap[K, V](initialCapacity) { - - override def removeEldestEntry(eldest: util.Map.Entry[K, V]): Boolean = { - size() > maxCapacity - } - -} diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/QueueBackedTrackerSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/QueueBackedTrackerSpec.scala new file mode 100644 index 00000000000..a58e413dfcf --- /dev/null +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/QueueBackedTrackerSpec.scala @@ -0,0 +1,124 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.apiserver.services.tracking + +import akka.stream.scaladsl.{Keep, Source, SourceQueueWithComplete} +import akka.stream.testkit.TestSubscriber +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{Materializer, OverflowStrategy} +import akka.{Done, NotUsed} +import com.daml.grpc.GrpcStatus +import com.daml.ledger.api.testing.utils.{AkkaBeforeAndAfterAll, TestingException} +import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest +import com.daml.ledger.api.v1.commands.Commands +import com.daml.ledger.client.services.commands.tracker.CompletionResponse +import com.daml.logging.LoggingContext +import com.daml.platform.apiserver.services.tracking.QueueBackedTracker.QueueInput +import com.daml.platform.apiserver.services.tracking.QueueBackedTrackerSpec._ +import com.google.rpc.status.{Status => StatusProto} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AsyncWordSpec +import org.scalatest.{BeforeAndAfterEach, Inside} + +import scala.concurrent.{ExecutionContext, Future} + +class QueueBackedTrackerSpec + extends AsyncWordSpec + with Matchers + with BeforeAndAfterEach + with AkkaBeforeAndAfterAll + with Inside { + + private implicit val ec: ExecutionContext = ExecutionContext.global + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + + private var consumer: TestSubscriber.Probe[NotUsed] = _ + private var queue: SourceQueueWithComplete[QueueBackedTracker.QueueInput] = _ + + override protected def beforeEach(): Unit = { + val (q, sink) = alwaysSuccessfulQueue(bufferSize = 1) + queue = q + consumer = sink + } + + override protected def afterEach(): Unit = { + consumer.cancel() + queue.complete() + } + + "Tracker Implementation" when { + "input is submitted, and the queue is available" should { + "work successfully" in { + val tracker = new QueueBackedTracker(queue, Future.successful(Done)) + val completion1F = tracker.track(input(1)) + consumer.requestNext() + val completion2F = tracker.track(input(2)) + consumer.requestNext() + for { + _ <- completion1F + _ <- completion2F + } yield succeed + } + } + + "input is submitted, and the queue is backpressuring" should { + "return a RESOURCE_EXHAUSTED error" in { + val tracker = new QueueBackedTracker(queue, Future.successful(Done)) + tracker.track(input(1)) + tracker.track(input(2)).map { completion => + completion should matchPattern { + case Left(CompletionResponse.QueueSubmitFailure(GrpcStatus.RESOURCE_EXHAUSTED())) => + } + } + } + } + + "input is submitted, and the queue has been completed" should { + "return an ABORTED error" in { + val tracker = new QueueBackedTracker(queue, Future.successful(Done)) + queue.complete() + tracker.track(input(2)).map { completion => + completion should matchPattern { + case Left(CompletionResponse.QueueSubmitFailure(GrpcStatus.ABORTED())) => + } + } + } + } + + "input is submitted, and the queue has failed" should { + "return an ABORTED error" in { + val tracker = new QueueBackedTracker(queue, Future.successful(Done)) + queue.fail(TestingException("The queue fails with this error.")) + tracker.track(input(2)).map { completion => + completion should matchPattern { + case Left(CompletionResponse.QueueSubmitFailure(GrpcStatus.ABORTED())) => + } + } + } + } + } +} + +object QueueBackedTrackerSpec { + private def input(commandId: Int) = SubmitAndWaitRequest.of( + commands = Some(Commands(commandId = commandId.toString)) + ) + + private def alwaysSuccessfulQueue(bufferSize: Int)(implicit + materializer: Materializer + ): (SourceQueueWithComplete[QueueInput], TestSubscriber.Probe[NotUsed]) = + Source + .queue[QueueInput](bufferSize, OverflowStrategy.dropNew) + .map { in => + val completion = CompletionResponse.CompletionSuccess( + commandId = in.value.getCommands.commandId, + transactionId = "", + originalStatus = StatusProto.defaultInstance, + ) + in.context.success(Right(completion)) + NotUsed + } + .toMat(TestSink.probe[NotUsed](materializer.system))(Keep.both) + .run() +} diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/SizeCappedMapTest.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/SizeCappedMapTest.scala deleted file mode 100644 index f7e5d3776be..00000000000 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/SizeCappedMapTest.scala +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.apiserver.services.tracking - -import org.scalatest.BeforeAndAfterEach -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec - -class SizeCappedMapTest extends AnyWordSpec with BeforeAndAfterEach with Matchers { - - private var sut: SizeCappedMap[Long, Long] = _ - - override protected def beforeEach(): Unit = { - sut = new SizeCappedMap[Long, Long](1, 2) - } - - "ExpiringMap" when { - - "has spare capacity " should { - - "retain elements as a new one is inserted" in { - - sut.put(0L, 0L) - sut.put(1L, 1L) - Option(sut.get(0L)) should not be empty - Option(sut.get(1L)) should not be empty - } - } - - "does not have spare capacity" should { - - "drop the oldest element as a new one is inserted" in { - - sut.put(0L, 0L) - sut.put(1L, 1L) - sut.put(2L, 2L) - Option(sut.get(0L)) shouldBe empty - Option(sut.get(1L)) should not be empty - Option(sut.get(2L)) should not be empty - } - } - } - -} diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/TrackerImplTest.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/TrackerImplTest.scala deleted file mode 100644 index b15759ee3d0..00000000000 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/TrackerImplTest.scala +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.apiserver.services.tracking - -import akka.stream.OverflowStrategy -import akka.stream.scaladsl.{Keep, Source, SourceQueueWithComplete} -import akka.stream.testkit.TestSubscriber -import akka.stream.testkit.scaladsl.TestSink -import akka.{Done, NotUsed} -import com.daml.dec.DirectExecutionContext -import com.daml.ledger.api.testing.utils.{AkkaBeforeAndAfterAll, TestingException} -import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest -import com.daml.ledger.api.v1.commands.Commands -import com.daml.ledger.client.services.commands.tracker.CompletionResponse -import com.daml.ledger.client.services.commands.tracker.CompletionResponse.QueueSubmitFailure -import com.daml.logging.LoggingContext -import com.google.rpc.status.{Status => StatusProto} -import io.grpc.Status.Code -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec -import org.scalatest.{BeforeAndAfterEach, Inside, Succeeded} - -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future -import scala.concurrent.duration.DurationInt - -class TrackerImplTest - extends AnyWordSpec - with Matchers - with BeforeAndAfterEach - with ScalaFutures - with AkkaBeforeAndAfterAll - with Inside { - - override implicit def patienceConfig: PatienceConfig = PatienceConfig(5.seconds, 1.second) - - private var sut: Tracker = _ - private var consumer: TestSubscriber.Probe[NotUsed] = _ - private var queue: SourceQueueWithComplete[TrackerImpl.QueueInput] = _ - private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting - - private def input(cid: Int) = SubmitAndWaitRequest(Some(Commands(commandId = cid.toString))) - - override protected def beforeEach(): Unit = { - val (q, sink) = Source - .queue[TrackerImpl.QueueInput](1, OverflowStrategy.dropNew) - .map { in => - in.context.success( - Right( - CompletionResponse.CompletionSuccess(in.value.getCommands.commandId, "", StatusProto()) - ) - ) - NotUsed - } - .toMat(TestSink.probe[NotUsed])(Keep.both) - .run() - queue = q - sut = new TrackerImpl(q, Future.successful(Done)) - consumer = sink - } - - override protected def afterEach(): Unit = { - consumer.cancel() - queue.complete() - } - - "Tracker Implementation" when { - - "input is submitted, and the queue is available" should { - - "work successfully" in { - - val resultF1 = sut.track(input(1)) - consumer.requestNext() - val resultF = resultF1.flatMap(_ => sut.track(input(2)))(DirectExecutionContext) - consumer.requestNext() - whenReady(resultF)(_ => Succeeded) - } - } - - "input is submitted, and the queue is backpressuring" should { - - "return a RESOURCE_EXHAUSTED error" in { - - sut.track(input(1)) - whenReady(sut.track(input(2)))(failure => { - inside(failure) { case Left(QueueSubmitFailure(statusCode)) => - statusCode.getCode should be(Code.RESOURCE_EXHAUSTED) - } - }) - } - } - - "input is submitted, and the queue has been completed" should { - - "return an ABORTED error" in { - - queue.complete() - whenReady(sut.track(input(2)))(failure => { - inside(failure) { case Left(QueueSubmitFailure(statusCode)) => - statusCode.getCode should be(Code.ABORTED) - } - }) - } - } - - "input is submitted, and the queue has failed" should { - - "return an ABORTED error" in { - - queue.fail(TestingException("The queue fails with this error.")) - whenReady(sut.track(input(2)))(failure => { - inside(failure) { case Left(QueueSubmitFailure(statusCode)) => - statusCode.getCode should be(Code.ABORTED) - } - }) - } - } - } -}