diff --git a/docs/source/tools/sandbox.rst b/docs/source/tools/sandbox.rst index 3da89fd624..be975c7875 100644 --- a/docs/source/tools/sandbox.rst +++ b/docs/source/tools/sandbox.rst @@ -389,6 +389,12 @@ the CommandService for a given party. A counter. The number of currently pending submissions on the CommandService for a given party. +``daml.commands..input_buffer_delay`` +------------------------------------------------- + +A timer. Measures the queuing delay for pending submissions +on the CommandService. + ``daml.commands..max_in_flight_capacity`` ----------------------------------------------------- diff --git a/ledger/daml-on-sql/README.rst b/ledger/daml-on-sql/README.rst index 311588bb4f..5e8ad031d9 100644 --- a/ledger/daml-on-sql/README.rst +++ b/ledger/daml-on-sql/README.rst @@ -521,6 +521,12 @@ the CommandService for a given party. A counter. The number of currently pending submissions on the CommandService for a given party. +``daml.commands..input_buffer_delay`` +------------------------------------------------- + +A timer. Measures the queuing delay for pending submissions +on the CommandService. + ``daml.commands..max_in_flight_capacity`` ------------------------------------------------- diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/InstrumentedSource.scala b/ledger/metrics/src/main/scala/com/daml/metrics/InstrumentedSource.scala index 900ebb421d..7e79c911bb 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/InstrumentedSource.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/InstrumentedSource.scala @@ -6,7 +6,7 @@ package com.daml.metrics import akka.Done import akka.stream.scaladsl.{Source, SourceQueueWithComplete} import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult} -import com.codahale.metrics.Counter +import com.codahale.metrics.{Counter, Timer} import com.daml.dec.DirectExecutionContext import scala.concurrent.Future @@ -14,8 +14,9 @@ import scala.concurrent.Future object InstrumentedSource { final class QueueWithComplete[T]( - delegate: SourceQueueWithComplete[T], + delegate: SourceQueueWithComplete[(Timer.Context, T)], lengthCounter: Counter, + delayTimer: Timer, ) extends SourceQueueWithComplete[T] { override def complete(): Unit = delegate.complete() @@ -25,13 +26,17 @@ object InstrumentedSource { override def watchCompletion(): Future[Done] = delegate.watchCompletion() override def offer(elem: T): Future[QueueOfferResult] = { - val result = delegate.offer(elem) + val result = delegate.offer( + delayTimer.time() -> elem + ) // Use the `DirectExecutionContext` to ensure that the // counter is updated as closely as possible to the // update of the queue, so to offer the most consistent // reading possible via the counter result.foreach { - case QueueOfferResult.Enqueued => lengthCounter.inc() + case QueueOfferResult.Enqueued => + lengthCounter.inc() + case _ => // do nothing }(DirectExecutionContext) result @@ -61,25 +66,29 @@ object InstrumentedSource { bufferSize: Int, overflowStrategy: OverflowStrategy, capacityCounter: Counter, - lengthCounter: Counter)( + lengthCounter: Counter, + delayTimer: Timer)( implicit materializer: Materializer, ): Source[T, QueueWithComplete[T]] = { - val (queue, source) = Source.queue[T](bufferSize, overflowStrategy).preMaterialize() + val (queue, source) = + Source.queue[(Timer.Context, T)](bufferSize, overflowStrategy).preMaterialize() val instrumentedQueue = - new QueueWithComplete[T](queue, lengthCounter) + new QueueWithComplete[T](queue, lengthCounter, delayTimer) // Using `map` and not `wireTap` because the latter is skipped on backpressure. capacityCounter.inc(bufferSize.toLong) instrumentedQueue .watchCompletion() - .map { _ => - capacityCounter.dec(bufferSize.toLong) + .andThen { + case _ => capacityCounter.dec(bufferSize.toLong) }(DirectExecutionContext) - source.mapMaterializedValue(_ => instrumentedQueue).map { item => - lengthCounter.dec() - item + source.mapMaterializedValue(_ => instrumentedQueue).map { + case (timingContext, item) => + timingContext.stop() + lengthCounter.dec() + item } } diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala index c118770799..8b8e9ef8a8 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala @@ -42,6 +42,8 @@ final class Metrics(val registry: MetricRegistry) { registry.counter(Prefix :+ party :+ "input_buffer_length") def inputBufferCapacity(party: String): Counter = registry.counter(Prefix :+ party :+ "input_buffer_capacity") + def inputBufferDelay(party: String): Timer = + registry.timer(Prefix :+ party :+ "input_buffer_delay") def maxInFlightLength(party: String): Counter = registry.counter(Prefix :+ party :+ "max_in_flight_length") def maxInFlightCapacity(party: String): Counter = diff --git a/ledger/metrics/src/test/scala/com/daml/metrics/InstrumentedSourceSpec.scala b/ledger/metrics/src/test/scala/com/daml/metrics/InstrumentedSourceSpec.scala index 14a17f3789..d0735d59e1 100644 --- a/ledger/metrics/src/test/scala/com/daml/metrics/InstrumentedSourceSpec.scala +++ b/ledger/metrics/src/test/scala/com/daml/metrics/InstrumentedSourceSpec.scala @@ -7,11 +7,12 @@ import java.util.concurrent.atomic.AtomicLong import akka.stream.scaladsl.{Keep, Sink} import akka.stream.{OverflowStrategy, QueueOfferResult} -import com.codahale.metrics.Counter +import com.codahale.metrics.{Counter, Timer} import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll import org.scalatest.{AsyncFlatSpec, Matchers} import scala.concurrent.{Future, Promise} +import scala.concurrent.duration.DurationInt final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with AkkaBeforeAndAfterAll { @@ -23,10 +24,17 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka val capacityCounter = new Counter() val maxBuffered = new InstrumentedSourceSpec.MaxValueCounter() + val delayTimer = new Timer() val (source, sink) = InstrumentedSource - .queue[Int](bufferSize, OverflowStrategy.backpressure, capacityCounter, maxBuffered) + .queue[Int]( + bufferSize, + OverflowStrategy.backpressure, + capacityCounter, + maxBuffered, + delayTimer, + ) .toMat(Sink.seq)(Keep.both) .run() @@ -47,6 +55,35 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka } } + it should "correctly measure queue delay" in { + val capacityCounter = new Counter() + val maxBuffered = new InstrumentedSourceSpec.MaxValueCounter() + val delayTimer = new Timer() + val bufferSize = 2 + + val (source, sink) = + InstrumentedSource + .queue[Int](16, OverflowStrategy.backpressure, capacityCounter, maxBuffered, delayTimer) + .mapAsync(1) { x => + akka.pattern.after(5.millis, system.scheduler)(Future(x)) + } + .toMat(Sink.seq)(Keep.both) + .run() + + val input = Seq.fill(bufferSize)(util.Random.nextInt) + + for { + result <- Future.sequence(input.map(source.offer)) + _ = source.complete() + output <- sink + } yield { + all(result) shouldBe QueueOfferResult.Enqueued + output shouldEqual input + delayTimer.getCount shouldEqual bufferSize + delayTimer.getSnapshot.getMax should be >= 5.millis.toNanos + } + } + it should "track the buffer saturation correctly when dropping items" in { val bufferSize = 500 @@ -61,12 +98,13 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka val maxBuffered = new InstrumentedSourceSpec.MaxValueCounter() val capacityCounter = new Counter() + val delayTimer = new Timer() val stop = Promise[Unit]() val (source, termination) = InstrumentedSource - .queue[Int](bufferSize, OverflowStrategy.dropNew, capacityCounter, maxBuffered) + .queue[Int](bufferSize, OverflowStrategy.dropNew, capacityCounter, maxBuffered, delayTimer) .mapAsync(1)(_ => stop.future) // Block until completed to overflow queue. .watchTermination()(Keep.both) .toMat(Sink.ignore)(Keep.left) diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala index 212a49231c..2f70db71cf 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala @@ -130,6 +130,7 @@ private[apiserver] final class ApiCommandService private ( configuration.inputBufferSize, capacityCounter = metrics.daml.commands.inputBufferCapacity(submitter.party), lengthCounter = metrics.daml.commands.inputBufferLength(submitter.party), + delayTimer = metrics.daml.commands.inputBufferDelay(submitter.party) ) } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerImpl.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerImpl.scala index 24f7b9b117..9dbc7fab02 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerImpl.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerImpl.scala @@ -6,7 +6,7 @@ package com.daml.platform.apiserver.services.tracking import akka.NotUsed import akka.stream.scaladsl.{Flow, Keep, Sink, SourceQueueWithComplete} import akka.stream.{Materializer, OverflowStrategy} -import com.codahale.metrics.Counter +import com.codahale.metrics.{Counter, Timer} import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest import com.daml.ledger.api.v1.command_submission_service.SubmitRequest @@ -80,9 +80,16 @@ private[services] object TrackerImpl { inputBufferSize: Int, capacityCounter: Counter, lengthCounter: Counter, + delayTimer: Timer, )(implicit materializer: Materializer, loggingContext: LoggingContext): TrackerImpl = { val ((queue, mat), foreachMat) = InstrumentedSource - .queue[QueueInput](inputBufferSize, OverflowStrategy.dropNew, capacityCounter, lengthCounter) + .queue[QueueInput]( + inputBufferSize, + OverflowStrategy.dropNew, + capacityCounter, + lengthCounter, + delayTimer, + ) .viaMat(tracker)(Keep.both) .toMat(Sink.foreach { case Ctx(promise, result) =>