From 20d7e2becd24237cd31061f3db5eabba3f0095e2 Mon Sep 17 00:00:00 2001 From: Nicu Reut Date: Tue, 24 Jan 2023 12:06:06 +0100 Subject: [PATCH] Remove thread check dependency on metrics [PLEN-96] (#16113) Co-authored-by: Simon Maxen --- .../error-codes-inventory.rst.inc | 2 +- ledger/ledger-api-errors/BUILD.bazel | 1 + .../error/definitions/LedgerApiErrors.scala | 10 +- .../main/scala/platform/LedgerApiServer.scala | 8 +- .../RateLimitingInterceptor.scala | 16 +-- .../ratelimiting/ThreadpoolCheck.scala | 30 ++--- .../platform/store/dao/DbDispatcher.scala | 7 +- .../platform/apiserver/GrpcServerSpec.scala | 31 ++--- .../RateLimitingInterceptorSpec.scala | 81 ++++++------- .../indexer/MeteringAggregatorSpec.scala | 6 +- .../ParallelIndexerSubscriptionSpec.scala | 17 ++- .../ledger/sandbox/SandboxOnXRunner.scala | 39 ++++--- libs-scala/executors/BUILD.bazel | 1 + .../InstrumentedExecutors.scala | 38 ++++-- ...AwareExecutionContextExecutorService.scala | 109 ++++++++++++++++++ 15 files changed, 261 insertions(+), 135 deletions(-) rename libs-scala/executors/src/main/scala/com/daml/{ => executors}/InstrumentedExecutors.scala (70%) create mode 100644 libs-scala/executors/src/main/scala/com/daml/executors/QueueAwareExecutionContextExecutorService.scala diff --git a/docs/resources/generated-error-pages/error-codes-inventory.rst.inc b/docs/resources/generated-error-pages/error-codes-inventory.rst.inc index b9ddbe93a3e..505f0876a67 100755 --- a/docs/resources/generated-error-pages/error-codes-inventory.rst.inc +++ b/docs/resources/generated-error-pages/error-codes-inventory.rst.inc @@ -456,7 +456,7 @@ THREADPOOL_OVERLOADED **Conveyance**: This error is logged with log-level INFO on the server side and exposed on the API with grpc-status ABORTED including a detailed error message. - **Resolution**: The following actions can be taken: Here the 'queue size' for the threadpool = 'submitted tasks' - 'completed tasks' - 'running tasks' 1. Review the historical 'queue size' growth by inspecting the metric given in the message. 2. Review the maximum 'queue size' limits configured in the rate limiting configuration. 3. Try to space out requests that are likely to require a lot of CPU or database power. + **Resolution**: The following actions can be taken: Here the 'queue size' for the threadpool is considered as reported by the executor itself. 1. Review the historical 'queue size' growth by inspecting the metric given in the message. 2. Review the maximum 'queue size' limits configured in the rate limiting configuration. 3. Try to space out requests that are likely to require a lot of CPU or database power. diff --git a/ledger/ledger-api-errors/BUILD.bazel b/ledger/ledger-api-errors/BUILD.bazel index 40a15a1bad8..f22ddc341e5 100644 --- a/ledger/ledger-api-errors/BUILD.bazel +++ b/ledger/ledger-api-errors/BUILD.bazel @@ -22,6 +22,7 @@ da_scala_library( "//ledger/error", "//ledger/participant-integration-api:participant-integration-api-proto_scala", "//ledger/participant-state", + "//observability/metrics", "@maven//:com_google_api_grpc_proto_google_common_protos", "@maven//:io_grpc_grpc_api", "@maven//:org_slf4j_slf4j_api", diff --git a/ledger/ledger-api-errors/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala b/ledger/ledger-api-errors/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala index 6bca4fdfc5d..0811f3e328a 100644 --- a/ledger/ledger-api-errors/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala +++ b/ledger/ledger-api-errors/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala @@ -7,6 +7,7 @@ import com.daml.error._ import com.daml.error.definitions.ErrorGroups.ParticipantErrorGroup.LedgerApiErrorGroup import com.daml.lf.engine.Error.Validation.ReplayMismatch import com.daml.lf.engine.{Error => LfError} +import com.daml.metrics.ExecutorServiceMetrics import org.slf4j.event.Level @Explanation( @@ -112,7 +113,7 @@ object LedgerApiErrors extends LedgerApiErrorGroup { ) @Resolution( """The following actions can be taken: - |Here the 'queue size' for the threadpool = 'submitted tasks' - 'completed tasks' - 'running tasks' + |Here the 'queue size' for the threadpool is considered as reported by the executor itself. |1. Review the historical 'queue size' growth by inspecting the metric given in the message. |2. Review the maximum 'queue size' limits configured in the rate limiting configuration. |3. Try to space out requests that are likely to require a lot of CPU or database power. @@ -125,18 +126,19 @@ object LedgerApiErrors extends LedgerApiErrorGroup { ) { case class Rejection( name: String, + metricNameLabel: String, queued: Long, limit: Int, - metricPrefix: String, fullMethodName: String, )(implicit errorLogger: ContextualizedErrorLogger) extends DamlErrorWithDefiniteAnswer( - s"The $name queue size ($queued) has exceeded the maximum ($limit). Api services metrics are available at $metricPrefix.", + s"The $name queue size ($queued) has exceeded the maximum ($limit). Metrics for queue size available at ${ExecutorServiceMetrics.CommonMetricsName.QueuedTasks}.", extraContext = Map( "name" -> name, "queued" -> queued, "limit" -> limit, - "metricPrefix" -> metricPrefix, + "name_label" -> metricNameLabel, + "metrics" -> ExecutorServiceMetrics.CommonMetricsName.QueuedTasks, "fullMethodName" -> fullMethodName, ), ) diff --git a/ledger/participant-integration-api/src/main/scala/platform/LedgerApiServer.scala b/ledger/participant-integration-api/src/main/scala/platform/LedgerApiServer.scala index 98b1b1851c1..67b9e2738f9 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/LedgerApiServer.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/LedgerApiServer.scala @@ -6,6 +6,7 @@ package com.daml.platform import akka.actor.ActorSystem import akka.stream.Materializer import com.daml.api.util.TimeProvider +import com.daml.executors.QueueAwareExecutionContextExecutorService import com.daml.ledger.api.auth.{AuthService, JwtVerifierLoader} import com.daml.ledger.api.domain import com.daml.ledger.api.health.HealthChecks @@ -51,7 +52,9 @@ class LedgerApiServer( // Currently, we provide this flag outside the HOCON configuration objects // in order to ensure that participants cannot be configured to accept explicitly disclosed contracts. explicitDisclosureUnsafeEnabled: Boolean = false, - rateLimitingInterceptor: Option[RateLimitingInterceptor] = None, + rateLimitingInterceptor: Option[ + QueueAwareExecutionContextExecutorService => RateLimitingInterceptor + ] = None, telemetry: Telemetry, )(implicit actorSystem: ActorSystem, materializer: Materializer) { @@ -162,7 +165,8 @@ class LedgerApiServer( healthChecks = healthChecksWithIndexer + ("write" -> writeService), metrics = metrics, timeServiceBackend = timeServiceBackend, - otherInterceptors = rateLimitingInterceptor.toList, + otherInterceptors = + rateLimitingInterceptor.map(provider => provider(dbSupport.dbDispatcher.executor)).toList, engine = sharedEngine, servicesExecutionContext = servicesExecutionContext, userManagementStore = PersistentUserManagementStore.cached( diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ratelimiting/RateLimitingInterceptor.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ratelimiting/RateLimitingInterceptor.scala index 1ab70db555c..ae53c0f24e3 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ratelimiting/RateLimitingInterceptor.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ratelimiting/RateLimitingInterceptor.scala @@ -3,11 +3,7 @@ package com.daml.platform.apiserver.ratelimiting -import java.lang.management.{ManagementFactory, MemoryMXBean, MemoryPoolMXBean} -import java.util.concurrent.atomic.AtomicBoolean - import com.daml.metrics.Metrics -import com.daml.metrics.api.MetricName import com.daml.platform.apiserver.configuration.RateLimitingConfig import com.daml.platform.apiserver.ratelimiting.LimitResult.{ LimitResultCheck, @@ -15,12 +11,12 @@ import com.daml.platform.apiserver.ratelimiting.LimitResult.{ UnderLimit, } import com.daml.platform.apiserver.ratelimiting.RateLimitingInterceptor._ -import com.daml.platform.apiserver.ratelimiting.ThreadpoolCheck.ThreadpoolCount -import com.daml.platform.configuration.ServerRole import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener import io.grpc.{Metadata, ServerCall, ServerCallHandler, ServerInterceptor} import org.slf4j.LoggerFactory +import java.lang.management.{ManagementFactory, MemoryMXBean, MemoryPoolMXBean} +import java.util.concurrent.atomic.AtomicBoolean import scala.jdk.CollectionConverters.ListHasAsScala import scala.util.Try @@ -101,11 +97,6 @@ object RateLimitingInterceptor { additionalChecks: List[LimitResultCheck], ): RateLimitingInterceptor = { - val indexDbThreadpool: ThreadpoolCount = new ThreadpoolCount(metrics)( - "Index Database Connection Threadpool", - MetricName(metrics.daml.index.db.threadpool.connection, ServerRole.ApiServer.threadPoolSuffix), - ) - val activeStreamsName = metrics.daml.lapi.streams.activeName val activeStreamsCounter = metrics.daml.lapi.streams.active @@ -113,13 +104,12 @@ object RateLimitingInterceptor { metrics = metrics, checks = List[LimitResultCheck]( MemoryCheck(tenuredMemoryPools, memoryMxBean, config), - ThreadpoolCheck(indexDbThreadpool, config.maxApiServicesIndexDbQueueSize), StreamCheck(activeStreamsCounter, activeStreamsName, config.maxStreams), ) ::: additionalChecks, ) } - val doNonLimit: Set[String] = Set( + private val doNonLimit: Set[String] = Set( "grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo", "grpc.health.v1.Health/Check", "grpc.health.v1.Health/Watch", diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ratelimiting/ThreadpoolCheck.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ratelimiting/ThreadpoolCheck.scala index ad5be4b8d70..19eb3130a73 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ratelimiting/ThreadpoolCheck.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ratelimiting/ThreadpoolCheck.scala @@ -3,11 +3,9 @@ package com.daml.platform.apiserver.ratelimiting -import com.codahale.metrics.MetricRegistry import com.daml.error.definitions.LedgerApiErrors.ThreadpoolOverloaded import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger} -import com.daml.metrics.Metrics -import com.daml.metrics.api.MetricName +import com.daml.executors.QueueAwareExecutionContextExecutorService import com.daml.platform.apiserver.ratelimiting.LimitResult.{ LimitResultCheck, OverLimit, @@ -19,33 +17,21 @@ object ThreadpoolCheck { private implicit val logger: ContextualizedErrorLogger = DamlContextualizedErrorLogger.forClass(getClass) - /** Match naming in [[com.codahale.metrics.InstrumentedExecutorService]] */ - final class ThreadpoolCount(metrics: Metrics)(val name: String, val prefix: MetricName) { - private val submitted = - metrics.dropwizardFactory.registry.meter(MetricRegistry.name(prefix, "submitted")) - private val running = - metrics.dropwizardFactory.registry.counter(MetricRegistry.name(prefix, "running")) - private val completed = - metrics.dropwizardFactory.registry.meter(MetricRegistry.name(prefix, "completed")) - - def queueSize: Long = submitted.getCount - running.getCount - completed.getCount - } - - def apply(count: ThreadpoolCount, limit: Int): LimitResultCheck = { - apply(count.name, count.prefix, () => count.queueSize, limit) - } - - def apply(name: String, prefix: String, queueSize: () => Long, limit: Int): LimitResultCheck = + def apply( + name: String, + queue: QueueAwareExecutionContextExecutorService, + limit: Int, + ): LimitResultCheck = (fullMethodName, _) => { - val queued = queueSize() + val queued = queue.getQueueSize if (queued > limit) { OverLimit( ThreadpoolOverloaded.Rejection( name = name, queued = queued, limit = limit, - metricPrefix = prefix, fullMethodName = fullMethodName, + metricNameLabel = queue.name, ) ) } else UnderLimit diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/dao/DbDispatcher.scala b/ledger/participant-integration-api/src/main/scala/platform/store/dao/DbDispatcher.scala index 1f9ffdb9d01..578264f7008 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/dao/DbDispatcher.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/dao/DbDispatcher.scala @@ -4,10 +4,10 @@ package com.daml.platform.store.dao import java.sql.Connection -import java.util.concurrent.{Executor, TimeUnit} +import java.util.concurrent.TimeUnit import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger} -import com.daml.executors.InstrumentedExecutors +import com.daml.executors.{InstrumentedExecutors, QueueAwareExecutionContextExecutorService} import com.daml.ledger.api.health.{HealthStatus, ReportsHealth} import com.daml.ledger.resources.ResourceOwner import com.daml.logging.LoggingContext.withEnrichedLoggingContext @@ -24,6 +24,7 @@ import scala.concurrent.{ExecutionContext, Future} import scala.util.control.NonFatal private[platform] trait DbDispatcher { + val executor: QueueAwareExecutionContextExecutorService def executeSql[T](databaseMetrics: DatabaseMetrics)(sql: Connection => T)(implicit loggingContext: LoggingContext ): Future[T] @@ -32,7 +33,7 @@ private[platform] trait DbDispatcher { private[dao] final class DbDispatcherImpl private[dao] ( connectionProvider: JdbcConnectionProvider, - executor: Executor, + val executor: QueueAwareExecutionContextExecutorService, overallWaitTimer: Timer, overallExecutionTimer: Timer, )(implicit loggingContext: LoggingContext) diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/GrpcServerSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/GrpcServerSpec.scala index f1621963938..c04984361c9 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/GrpcServerSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/GrpcServerSpec.scala @@ -3,7 +3,8 @@ package com.daml.platform.apiserver -import com.codahale.metrics.MetricRegistry +import java.util.concurrent.Executors + import com.daml.error.DamlContextualizedErrorLogger import com.daml.error.definitions.LedgerApiErrors import com.daml.grpc.sampleservice.implementations.HelloServiceReferenceImplementation @@ -13,8 +14,7 @@ import com.daml.ledger.resources.{ResourceOwner, TestResourceContext} import com.daml.metrics.Metrics import com.daml.platform.apiserver.GrpcServerSpec._ import com.daml.platform.apiserver.configuration.RateLimitingConfig -import com.daml.platform.apiserver.ratelimiting.RateLimitingInterceptor -import com.daml.platform.configuration.ServerRole +import com.daml.platform.apiserver.ratelimiting.{LimitResult, RateLimitingInterceptor} import com.daml.platform.hello.{HelloRequest, HelloResponse, HelloServiceGrpc} import com.daml.ports.Port import com.google.protobuf.ByteString @@ -22,8 +22,6 @@ import io.grpc.{ManagedChannel, ServerInterceptor, Status, StatusRuntimeExceptio import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec -import java.util.concurrent.Executors -import com.daml.metrics.api.MetricName import scala.concurrent.Future final class GrpcServerSpec extends AsyncWordSpec with Matchers with TestResourceContext { @@ -90,15 +88,22 @@ final class GrpcServerSpec extends AsyncWordSpec with Matchers with TestResource "install rate limit interceptor" in { val metrics = Metrics.ForTesting - val rateLimitingInterceptor = RateLimitingInterceptor(metrics, rateLimitingConfig) + val rateLimitingInterceptor = RateLimitingInterceptor( + metrics, + rateLimitingConfig, + additionalChecks = List((_, _) => + LimitResult.OverLimit( + LedgerApiErrors.ThreadpoolOverloaded.Rejection( + "test", + "test", + 100, + 59, + "test", + )(DamlContextualizedErrorLogger.forTesting(getClass)) + ) + ), + ) resources(metrics, List(rateLimitingInterceptor)).use { channel => - val metricName = MetricName( - metrics.daml.index.db.threadpool.connection, - ServerRole.ApiServer.threadPoolSuffix, - ) - metrics.dropwizardFactory.registry - .meter(MetricRegistry.name(metricName, "submitted")) - .mark(rateLimitingConfig.maxApiServicesIndexDbQueueSize.toLong + 1) // Over limit val helloService = HelloServiceGrpc.stub(channel) helloService.single(HelloRequest(7)).failed.map { case s: StatusRuntimeException => diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/ratelimiting/RateLimitingInterceptorSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/ratelimiting/RateLimitingInterceptorSpec.scala index 046adcc0d4e..e0af6d7a22e 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/ratelimiting/RateLimitingInterceptorSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/ratelimiting/RateLimitingInterceptorSpec.scala @@ -5,9 +5,10 @@ package com.daml.platform.apiserver.ratelimiting import java.io.IOException import java.lang.management._ -import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} +import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit} import com.codahale.metrics.MetricRegistry +import com.daml.executors.QueueAwareExecutionContextExecutorService import com.daml.grpc.adapter.utils.implementations.HelloServiceAkkaImplementation import com.daml.grpc.sampleservice.implementations.HelloServiceReferenceImplementation import com.daml.ledger.api.health.HealthChecks.ComponentName @@ -19,9 +20,7 @@ import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import com.daml.platform.apiserver.configuration.RateLimitingConfig import com.daml.platform.apiserver.ratelimiting.LimitResult.LimitResultCheck -import com.daml.platform.apiserver.ratelimiting.ThreadpoolCheck.ThreadpoolCount import com.daml.platform.apiserver.services.GrpcClientResource -import com.daml.platform.configuration.ServerRole import com.daml.platform.hello.{HelloRequest, HelloResponse, HelloServiceGrpc} import com.daml.platform.server.api.services.grpc.GrpcHealthService import com.daml.ports.Port @@ -45,7 +44,7 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.time.{Second, Span} import org.slf4j.LoggerFactory -import scala.concurrent.{Future, Promise} +import scala.concurrent.{ExecutionContext, Future, Promise} final class RateLimitingInterceptorSpec extends AsyncFlatSpec @@ -64,25 +63,36 @@ final class RateLimitingInterceptorSpec behavior of "RateLimitingInterceptor" - it should "limit calls when apiServices DB thread pool executor service is over limit" in { + it should "support additional checks" in { val metrics = createMetrics - withChannel(metrics, new HelloServiceAkkaImplementation, config).use { channel: Channel => - val helloService = HelloServiceGrpc.stub(channel) - val submitted = metrics.dropwizardFactory.registry.meter( - MetricRegistry.name( - metrics.daml.index.db.threadpool.connection, - ServerRole.ApiServer.threadPoolSuffix, - "submitted", + val queueSizeValues = + Iterator(0L, config.maxApiServicesQueueSize.toLong + 1, 0L) + val executorWithQueueSize = new QueueAwareExecutionContextExecutorService( + ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()), + "test", + ) { + override def getQueueSize: Long = queueSizeValues.next() + } + val threadPoolHumanReadableName = "For testing" + withChannel( + metrics, + new HelloServiceAkkaImplementation, + config, + additionalChecks = List( + ThreadpoolCheck( + threadPoolHumanReadableName, + executorWithQueueSize, + config.maxApiServicesQueueSize, ) - ) + ), + ).use { channel: Channel => + val helloService = HelloServiceGrpc.stub(channel) for { _ <- helloService.single(HelloRequest(1)) - _ = submitted.mark(config.maxApiServicesIndexDbQueueSize.toLong + 1) exception <- helloService.single(HelloRequest(2)).failed - _ = submitted.mark(-config.maxApiServicesIndexDbQueueSize.toLong - 1) _ <- helloService.single(HelloRequest(3)) } yield { - exception.getMessage should include(metrics.daml.index.db.threadpool.connection) + exception.toString should include(threadPoolHumanReadableName) } } } @@ -374,37 +384,6 @@ final class RateLimitingInterceptorSpec underTest.calculateCollectionUsageThreshold(101000) shouldBe 100000 // 101000 - 1000 } - it should "support addition checks" in { - val metrics = Metrics(new MetricRegistry, GlobalOpenTelemetry.getMeter("test")) - - val apiServices: ThreadpoolCount = new ThreadpoolCount(metrics)( - "Api Services Threadpool", - metrics.daml.lapi.threadpool.apiServices, - ) - val apiServicesCheck = ThreadpoolCheck(apiServices, config.maxApiServicesQueueSize) - - withChannel( - metrics, - new HelloServiceAkkaImplementation, - config, - additionalChecks = List(apiServicesCheck), - ).use { channel: Channel => - val helloService = HelloServiceGrpc.stub(channel) - val submitted = metrics.dropwizardFactory.registry.meter( - MetricRegistry.name(metrics.daml.lapi.threadpool.apiServices, "submitted") - ) - for { - _ <- helloService.single(HelloRequest(1)) - _ = submitted.mark(config.maxApiServicesQueueSize.toLong + 1) - exception <- helloService.single(HelloRequest(2)).failed - _ = submitted.mark(-config.maxApiServicesQueueSize.toLong - 1) - _ <- helloService.single(HelloRequest(3)) - } yield { - exception.getMessage should include(metrics.daml.lapi.threadpool.apiServices) - } - } - } - } object RateLimitingInterceptorSpec extends MockitoSugar { @@ -436,7 +415,13 @@ object RateLimitingInterceptorSpec extends MockitoSugar { ): ResourceOwner[Channel] = for { server <- serverOwner( - RateLimitingInterceptor(metrics, config, pool, memoryBean, additionalChecks), + RateLimitingInterceptor( + metrics, + config, + pool, + memoryBean, + additionalChecks, + ), service, ) channel <- GrpcClientResource.owner(Port(server.getPort)) diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/MeteringAggregatorSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/MeteringAggregatorSpec.scala index 215f33d021c..4e0dbed14ae 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/MeteringAggregatorSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/MeteringAggregatorSpec.scala @@ -25,10 +25,12 @@ import org.mockito.MockitoSugar import org.mockito.captor.ArgCaptor import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike - import java.sql.Connection import java.time.temporal.ChronoUnit import java.time.{LocalDate, LocalTime, OffsetDateTime, ZoneOffset} + +import com.daml.executors.QueueAwareExecutionContextExecutorService + import scala.concurrent.Future //noinspection TypeAnnotation @@ -58,6 +60,8 @@ final class MeteringAggregatorSpec extends AnyWordSpecLike with MockitoSugar wit ): Future[T] = Future.successful { sql(conn) } + override val executor: QueueAwareExecutionContextExecutorService = + mock[QueueAwareExecutionContextExecutorService] } val parameterStore: ParameterStorageBackend = mock[ParameterStorageBackend] diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/parallel/ParallelIndexerSubscriptionSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/parallel/ParallelIndexerSubscriptionSpec.scala index 16cbdb8040a..30ad7d971a6 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/parallel/ParallelIndexerSubscriptionSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/parallel/ParallelIndexerSubscriptionSpec.scala @@ -5,7 +5,9 @@ package com.daml.platform.indexer.parallel import java.sql.Connection import java.time.Instant +import java.util.concurrent.Executors +import com.daml.executors.QueueAwareExecutionContextExecutorService import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf.crypto.Hash @@ -29,7 +31,7 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.scalatest.time.SpanSugar.convertIntToGrainOfTime -import scala.concurrent.{Await, Future} +import scala.concurrent.{Await, ExecutionContext, Future} class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { @@ -389,6 +391,12 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { loggingContext: LoggingContext ): Future[T] = Future.successful(sql(connection)) + + override val executor: QueueAwareExecutionContextExecutorService = + new QueueAwareExecutionContextExecutorService( + ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()), + "test", + ) } val batchPayload = "Some batch payload" @@ -433,10 +441,17 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { it should "apply ingestTailFunction on the last batch and forward the batch of batches" in { val connection = new TestConnection val dbDispatcher = new DbDispatcher { + override def executeSql[T](databaseMetrics: DatabaseMetrics)(sql: Connection => T)(implicit loggingContext: LoggingContext ): Future[T] = Future.successful(sql(connection)) + + override val executor: QueueAwareExecutionContextExecutorService = + new QueueAwareExecutionContextExecutorService( + ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()), + "test", + ) } val ledgerEnd = ParameterStorageBackend.LedgerEnd( diff --git a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/SandboxOnXRunner.scala b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/SandboxOnXRunner.scala index e7585dc17d0..1d0c401125a 100644 --- a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/SandboxOnXRunner.scala +++ b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/SandboxOnXRunner.scala @@ -9,20 +9,14 @@ import akka.stream.Materializer import akka.stream.scaladsl.Sink import com.daml.api.util.TimeProvider import com.daml.buildinfo.BuildInfo -import com.daml.executors.InstrumentedExecutors +import com.daml.executors.{InstrumentedExecutors, QueueAwareExecutionContextExecutorService} import com.daml.ledger.api.auth.{ AuthServiceJWT, AuthServiceNone, AuthServiceStatic, AuthServiceWildcard, } -import com.daml.ledger.api.v1.experimental_features.{ - CommandDeduplicationFeatures, - CommandDeduplicationPeriodSupport, - CommandDeduplicationType, - ExperimentalContractIds, - ExperimentalExplicitDisclosure, -} +import com.daml.ledger.api.v1.experimental_features._ import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.index.v2.IndexService import com.daml.ledger.participant.state.v2.{Update, WriteService} @@ -37,7 +31,6 @@ import com.daml.metrics.api.MetricHandle.Factory import com.daml.metrics.Metrics import com.daml.platform.LedgerApiServer import com.daml.platform.apiserver.configuration.RateLimitingConfig -import com.daml.platform.apiserver.ratelimiting.ThreadpoolCheck.ThreadpoolCount import com.daml.platform.apiserver.ratelimiting.{RateLimitingInterceptor, ThreadpoolCheck} import com.daml.platform.apiserver.{LedgerFeatures, TimeServiceBackend} import com.daml.platform.config.ParticipantConfig @@ -142,8 +135,10 @@ object SandboxOnXRunner { servicesExecutionContext = servicesExecutionContext, metrics = metrics, explicitDisclosureUnsafeEnabled = true, - rateLimitingInterceptor = - participantConfig.apiServer.rateLimit.map(buildRateLimitingInterceptor(metrics)), + rateLimitingInterceptor = participantConfig.apiServer.rateLimit.map(config => + (dbExecutor: QueueAwareExecutionContextExecutorService) => + buildRateLimitingInterceptor(metrics, dbExecutor, servicesExecutionContext)(config) + ), telemetry = new DefaultOpenTelemetry(openTelemetry), )(actorSystem, materializer).owner } yield { @@ -235,7 +230,7 @@ object SandboxOnXRunner { private def buildServicesExecutionContext( metrics: Metrics, servicesThreadPoolSize: Int, - ): ResourceOwner[ExecutionContextExecutorService] = + ): ResourceOwner[QueueAwareExecutionContextExecutorService] = ResourceOwner .forExecutorService(() => InstrumentedExecutors.newWorkStealingExecutor( @@ -290,16 +285,24 @@ object SandboxOnXRunner { } def buildRateLimitingInterceptor( - metrics: Metrics + metrics: Metrics, + indexDbExecutor: QueueAwareExecutionContextExecutorService, + apiServicesExecutor: QueueAwareExecutionContextExecutorService, )(config: RateLimitingConfig): RateLimitingInterceptor = { - val apiServices: ThreadpoolCount = new ThreadpoolCount(metrics)( - "Api Services Threadpool", - metrics.daml.lapi.threadpool.apiServices, + val indexDbCheck = ThreadpoolCheck( + "Index DB Threadpool", + indexDbExecutor, + config.maxApiServicesIndexDbQueueSize, ) - val apiServicesCheck = ThreadpoolCheck(apiServices, config.maxApiServicesQueueSize) - RateLimitingInterceptor(metrics, config, List(apiServicesCheck)) + val apiServicesCheck = ThreadpoolCheck( + "Api Services Threadpool", + apiServicesExecutor, + config.maxApiServicesQueueSize, + ) + + RateLimitingInterceptor(metrics, config, List(indexDbCheck, apiServicesCheck)) } diff --git a/libs-scala/executors/BUILD.bazel b/libs-scala/executors/BUILD.bazel index 5ec7f5603e7..f4a59a9f0f6 100644 --- a/libs-scala/executors/BUILD.bazel +++ b/libs-scala/executors/BUILD.bazel @@ -12,6 +12,7 @@ da_scala_library( tags = ["maven_coordinates=com.daml:executors:__VERSION__"], visibility = ["//visibility:public"], deps = [ + "//libs-scala/scala-utils", "//observability/metrics", "@maven//:io_dropwizard_metrics_metrics_core", ], diff --git a/libs-scala/executors/src/main/scala/com/daml/InstrumentedExecutors.scala b/libs-scala/executors/src/main/scala/com/daml/executors/InstrumentedExecutors.scala similarity index 70% rename from libs-scala/executors/src/main/scala/com/daml/InstrumentedExecutors.scala rename to libs-scala/executors/src/main/scala/com/daml/executors/InstrumentedExecutors.scala index 87fbe1164f2..6c65fb3d5bb 100644 --- a/libs-scala/executors/src/main/scala/com/daml/InstrumentedExecutors.scala +++ b/libs-scala/executors/src/main/scala/com/daml/executors/InstrumentedExecutors.scala @@ -8,7 +8,7 @@ import java.util.concurrent.{ExecutorService, ThreadFactory, Executors => JavaEx import com.codahale.metrics.{InstrumentedExecutorService, MetricRegistry} import com.daml.metrics.ExecutorServiceMetrics -import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService} +import scala.concurrent.ExecutionContext object InstrumentedExecutors { @@ -18,11 +18,16 @@ object InstrumentedExecutors { registry: MetricRegistry, executorServiceMetrics: ExecutorServiceMetrics, errorReporter: Throwable => Unit = ExecutionContext.defaultReporter, - ): ExecutionContextExecutorService = { + ): QueueAwareExecutionContextExecutorService = { val executorService = JavaExecutors.newWorkStealingPool(parallelism) val executorServiceWithMetrics = addMetricsToExecutorService(name, registry, executorServiceMetrics, executorService) - ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter) + val executionContext = + ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter) + new QueueAwareExecutionContextExecutorService( + executionContext, + name, + ) } def newFixedThreadPool( @@ -31,11 +36,16 @@ object InstrumentedExecutors { registry: MetricRegistry, executorServiceMetrics: ExecutorServiceMetrics, errorReporter: Throwable => Unit = ExecutionContext.defaultReporter, - ): ExecutionContextExecutorService = { + ): QueueAwareExecutionContextExecutorService = { val executorService = JavaExecutors.newFixedThreadPool(nThreads) val executorServiceWithMetrics = addMetricsToExecutorService(name, registry, executorServiceMetrics, executorService) - ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter) + val executionContext = + ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter) + new QueueAwareExecutionContextExecutorService( + executionContext, + name, + ) } def newFixedThreadPoolWithFactory( @@ -45,11 +55,16 @@ object InstrumentedExecutors { registry: MetricRegistry, executorServiceMetrics: ExecutorServiceMetrics, errorReporter: Throwable => Unit = ExecutionContext.defaultReporter, - ): ExecutionContextExecutorService = { + ): QueueAwareExecutionContextExecutorService = { val executorService = JavaExecutors.newFixedThreadPool(nThreads, threadFactory) val executorServiceWithMetrics = addMetricsToExecutorService(name, registry, executorServiceMetrics, executorService) - ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter) + val executionContext = + ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter) + new QueueAwareExecutionContextExecutorService( + executionContext, + name, + ) } def newCachedThreadPoolWithFactory( @@ -58,11 +73,16 @@ object InstrumentedExecutors { registry: MetricRegistry, executorServiceMetrics: ExecutorServiceMetrics, errorReporter: Throwable => Unit = ExecutionContext.defaultReporter, - ): ExecutionContextExecutorService = { + ): QueueAwareExecutionContextExecutorService = { val executorService = JavaExecutors.newCachedThreadPool(threadFactory) val executorServiceWithMetrics = addMetricsToExecutorService(name, registry, executorServiceMetrics, executorService) - ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter) + val executionContext = + ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter) + new QueueAwareExecutionContextExecutorService( + executionContext, + name, + ) } private def addMetricsToExecutorService( diff --git a/libs-scala/executors/src/main/scala/com/daml/executors/QueueAwareExecutionContextExecutorService.scala b/libs-scala/executors/src/main/scala/com/daml/executors/QueueAwareExecutionContextExecutorService.scala new file mode 100644 index 00000000000..39b18d43ada --- /dev/null +++ b/libs-scala/executors/src/main/scala/com/daml/executors/QueueAwareExecutionContextExecutorService.scala @@ -0,0 +1,109 @@ +// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.executors + +import java.util +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{Callable, Future, TimeUnit} + +import com.daml.scalautil.Statement.discard + +import scala.concurrent.ExecutionContextExecutorService +import scala.jdk.CollectionConverters.{CollectionHasAsScala, SeqHasAsJava} + +/** Keeps track of the number of tasks submitted to the executor but that have not started execution just yet. + * We use this wrapper to access the queue size in performance critical code paths because + * reading the queue size from the executor itself can take an amount of time linear with the number of tasks waiting + * in the queues. + */ +class QueueAwareExecutionContextExecutorService( + delegate: ExecutionContextExecutorService, + val name: String, +) extends ExecutionContextExecutorService { + + private val queueTracking = new AtomicLong(0) + + def getQueueSize: Long = queueTracking.get() + + override def shutdown(): Unit = delegate.shutdown() + override def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() + override def isShutdown: Boolean = delegate.isShutdown + override def isTerminated: Boolean = delegate.isTerminated + override def awaitTermination(l: Long, timeUnit: TimeUnit): Boolean = + delegate.awaitTermination(l, timeUnit) + + override def reportFailure(cause: Throwable): Unit = delegate.reportFailure(cause) + + override def submit[T]( + callable: Callable[T] + ): Future[T] = { + discard { queueTracking.incrementAndGet() } + delegate.submit(new TrackingCallable[T](callable)) + } + + override def submit[T]( + runnable: Runnable, + t: T, + ): Future[T] = { + discard { queueTracking.incrementAndGet() } + delegate.submit(new TrackingRunnable(runnable), t) + } + + override def submit(runnable: Runnable): Future[_] = { + discard { queueTracking.incrementAndGet() } + delegate.submit(new TrackingRunnable(runnable)) + } + + override def invokeAll[T]( + collection: util.Collection[_ <: Callable[T]] + ): util.List[Future[T]] = { + discard { queueTracking.updateAndGet(_ + collection.size()) } + delegate.invokeAll(collection.asScala.map(new TrackingCallable[T](_)).toSeq.asJava) + } + + override def invokeAll[T]( + collection: util.Collection[_ <: Callable[T]], + l: Long, + timeUnit: TimeUnit, + ): util.List[Future[T]] = { + discard { queueTracking.updateAndGet(_ + collection.size()) } + delegate.invokeAll(collection.asScala.map(new TrackingCallable[T](_)).toSeq.asJava, l, timeUnit) + } + + override def invokeAny[T]( + collection: util.Collection[_ <: Callable[T]] + ): T = { + discard { queueTracking.updateAndGet(_ + collection.size()) } + delegate.invokeAny(collection.asScala.map(new TrackingCallable[T](_)).toSeq.asJava) + } + + override def invokeAny[T]( + collection: util.Collection[_ <: Callable[T]], + l: Long, + timeUnit: TimeUnit, + ): T = { + discard { queueTracking.updateAndGet(_ + collection.size()) } + delegate.invokeAny(collection.asScala.map(new TrackingCallable[T](_)).toSeq.asJava, l, timeUnit) + } + + override def execute(runnable: Runnable): Unit = { + discard { queueTracking.incrementAndGet() } + delegate.execute(new TrackingRunnable(runnable)) + } + + class TrackingRunnable(delegate: Runnable) extends Runnable { + override def run(): Unit = { + discard { queueTracking.decrementAndGet() } + delegate.run() + } + } + + class TrackingCallable[T](delegate: Callable[T]) extends Callable[T] { + override def call(): T = { + discard { queueTracking.decrementAndGet() } + delegate.call() + } + } + +}