Add timer to track queue delay in InstrumentedSource.queue (#7601)

* Add timer to track queue delay in InstrumentedSource.queue

and use it from TrackerImpl.

CHANGELOG_BEGIN
CHANGELOG_END

* Address code review
This commit is contained in:
Jussi Mäki 2020-10-08 15:18:31 +02:00 committed by GitHub
parent be35f3a31f
commit 2031ae8645
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 86 additions and 17 deletions

View File

@ -389,6 +389,12 @@ the CommandService for a given party.
A counter. The number of currently pending submissions on
the CommandService for a given party.
``daml.commands.<party_name>.input_buffer_delay``
-------------------------------------------------
A timer. Measures the queuing delay for pending submissions
on the CommandService.
``daml.commands.<party_name>.max_in_flight_capacity``
-----------------------------------------------------

View File

@ -521,6 +521,12 @@ the CommandService for a given party.
A counter. The number of currently pending submissions on
the CommandService for a given party.
``daml.commands.<party_name>.input_buffer_delay``
-------------------------------------------------
A timer. Measures the queuing delay for pending submissions
on the CommandService.
``daml.commands.<party_name>.max_in_flight_capacity``
-------------------------------------------------

View File

@ -6,7 +6,7 @@ package com.daml.metrics
import akka.Done
import akka.stream.scaladsl.{Source, SourceQueueWithComplete}
import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult}
import com.codahale.metrics.Counter
import com.codahale.metrics.{Counter, Timer}
import com.daml.dec.DirectExecutionContext
import scala.concurrent.Future
@ -14,8 +14,9 @@ import scala.concurrent.Future
object InstrumentedSource {
final class QueueWithComplete[T](
delegate: SourceQueueWithComplete[T],
delegate: SourceQueueWithComplete[(Timer.Context, T)],
lengthCounter: Counter,
delayTimer: Timer,
) extends SourceQueueWithComplete[T] {
override def complete(): Unit = delegate.complete()
@ -25,13 +26,17 @@ object InstrumentedSource {
override def watchCompletion(): Future[Done] = delegate.watchCompletion()
override def offer(elem: T): Future[QueueOfferResult] = {
val result = delegate.offer(elem)
val result = delegate.offer(
delayTimer.time() -> elem
)
// Use the `DirectExecutionContext` to ensure that the
// counter is updated as closely as possible to the
// update of the queue, so to offer the most consistent
// reading possible via the counter
result.foreach {
case QueueOfferResult.Enqueued => lengthCounter.inc()
case QueueOfferResult.Enqueued =>
lengthCounter.inc()
case _ => // do nothing
}(DirectExecutionContext)
result
@ -61,25 +66,29 @@ object InstrumentedSource {
bufferSize: Int,
overflowStrategy: OverflowStrategy,
capacityCounter: Counter,
lengthCounter: Counter)(
lengthCounter: Counter,
delayTimer: Timer)(
implicit materializer: Materializer,
): Source[T, QueueWithComplete[T]] = {
val (queue, source) = Source.queue[T](bufferSize, overflowStrategy).preMaterialize()
val (queue, source) =
Source.queue[(Timer.Context, T)](bufferSize, overflowStrategy).preMaterialize()
val instrumentedQueue =
new QueueWithComplete[T](queue, lengthCounter)
new QueueWithComplete[T](queue, lengthCounter, delayTimer)
// Using `map` and not `wireTap` because the latter is skipped on backpressure.
capacityCounter.inc(bufferSize.toLong)
instrumentedQueue
.watchCompletion()
.map { _ =>
capacityCounter.dec(bufferSize.toLong)
.andThen {
case _ => capacityCounter.dec(bufferSize.toLong)
}(DirectExecutionContext)
source.mapMaterializedValue(_ => instrumentedQueue).map { item =>
lengthCounter.dec()
item
source.mapMaterializedValue(_ => instrumentedQueue).map {
case (timingContext, item) =>
timingContext.stop()
lengthCounter.dec()
item
}
}

View File

@ -42,6 +42,8 @@ final class Metrics(val registry: MetricRegistry) {
registry.counter(Prefix :+ party :+ "input_buffer_length")
def inputBufferCapacity(party: String): Counter =
registry.counter(Prefix :+ party :+ "input_buffer_capacity")
def inputBufferDelay(party: String): Timer =
registry.timer(Prefix :+ party :+ "input_buffer_delay")
def maxInFlightLength(party: String): Counter =
registry.counter(Prefix :+ party :+ "max_in_flight_length")
def maxInFlightCapacity(party: String): Counter =

View File

@ -7,11 +7,12 @@ import java.util.concurrent.atomic.AtomicLong
import akka.stream.scaladsl.{Keep, Sink}
import akka.stream.{OverflowStrategy, QueueOfferResult}
import com.codahale.metrics.Counter
import com.codahale.metrics.{Counter, Timer}
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import org.scalatest.{AsyncFlatSpec, Matchers}
import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.DurationInt
final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with AkkaBeforeAndAfterAll {
@ -23,10 +24,17 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka
val capacityCounter = new Counter()
val maxBuffered = new InstrumentedSourceSpec.MaxValueCounter()
val delayTimer = new Timer()
val (source, sink) =
InstrumentedSource
.queue[Int](bufferSize, OverflowStrategy.backpressure, capacityCounter, maxBuffered)
.queue[Int](
bufferSize,
OverflowStrategy.backpressure,
capacityCounter,
maxBuffered,
delayTimer,
)
.toMat(Sink.seq)(Keep.both)
.run()
@ -47,6 +55,35 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka
}
}
it should "correctly measure queue delay" in {
val capacityCounter = new Counter()
val maxBuffered = new InstrumentedSourceSpec.MaxValueCounter()
val delayTimer = new Timer()
val bufferSize = 2
val (source, sink) =
InstrumentedSource
.queue[Int](16, OverflowStrategy.backpressure, capacityCounter, maxBuffered, delayTimer)
.mapAsync(1) { x =>
akka.pattern.after(5.millis, system.scheduler)(Future(x))
}
.toMat(Sink.seq)(Keep.both)
.run()
val input = Seq.fill(bufferSize)(util.Random.nextInt)
for {
result <- Future.sequence(input.map(source.offer))
_ = source.complete()
output <- sink
} yield {
all(result) shouldBe QueueOfferResult.Enqueued
output shouldEqual input
delayTimer.getCount shouldEqual bufferSize
delayTimer.getSnapshot.getMax should be >= 5.millis.toNanos
}
}
it should "track the buffer saturation correctly when dropping items" in {
val bufferSize = 500
@ -61,12 +98,13 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka
val maxBuffered = new InstrumentedSourceSpec.MaxValueCounter()
val capacityCounter = new Counter()
val delayTimer = new Timer()
val stop = Promise[Unit]()
val (source, termination) =
InstrumentedSource
.queue[Int](bufferSize, OverflowStrategy.dropNew, capacityCounter, maxBuffered)
.queue[Int](bufferSize, OverflowStrategy.dropNew, capacityCounter, maxBuffered, delayTimer)
.mapAsync(1)(_ => stop.future) // Block until completed to overflow queue.
.watchTermination()(Keep.both)
.toMat(Sink.ignore)(Keep.left)

View File

@ -130,6 +130,7 @@ private[apiserver] final class ApiCommandService private (
configuration.inputBufferSize,
capacityCounter = metrics.daml.commands.inputBufferCapacity(submitter.party),
lengthCounter = metrics.daml.commands.inputBufferLength(submitter.party),
delayTimer = metrics.daml.commands.inputBufferDelay(submitter.party)
)
}
}

View File

@ -6,7 +6,7 @@ package com.daml.platform.apiserver.services.tracking
import akka.NotUsed
import akka.stream.scaladsl.{Flow, Keep, Sink, SourceQueueWithComplete}
import akka.stream.{Materializer, OverflowStrategy}
import com.codahale.metrics.Counter
import com.codahale.metrics.{Counter, Timer}
import com.daml.dec.DirectExecutionContext
import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
@ -80,9 +80,16 @@ private[services] object TrackerImpl {
inputBufferSize: Int,
capacityCounter: Counter,
lengthCounter: Counter,
delayTimer: Timer,
)(implicit materializer: Materializer, loggingContext: LoggingContext): TrackerImpl = {
val ((queue, mat), foreachMat) = InstrumentedSource
.queue[QueueInput](inputBufferSize, OverflowStrategy.dropNew, capacityCounter, lengthCounter)
.queue[QueueInput](
inputBufferSize,
OverflowStrategy.dropNew,
capacityCounter,
lengthCounter,
delayTimer,
)
.viaMat(tracker)(Keep.both)
.toMat(Sink.foreach {
case Ctx(promise, result) =>