mirror of
https://github.com/digital-asset/daml.git
synced 2024-11-13 00:16:19 +03:00
Remove thread check dependency on metrics [PLEN-96] (#16113)
Co-authored-by: Simon Maxen <simon.maxen@digitalasset.com>
This commit is contained in:
parent
85eff2930f
commit
20d7e2becd
@ -456,7 +456,7 @@ THREADPOOL_OVERLOADED
|
||||
|
||||
**Conveyance**: This error is logged with log-level INFO on the server side and exposed on the API with grpc-status ABORTED including a detailed error message.
|
||||
|
||||
**Resolution**: The following actions can be taken: Here the 'queue size' for the threadpool = 'submitted tasks' - 'completed tasks' - 'running tasks' 1. Review the historical 'queue size' growth by inspecting the metric given in the message. 2. Review the maximum 'queue size' limits configured in the rate limiting configuration. 3. Try to space out requests that are likely to require a lot of CPU or database power.
|
||||
**Resolution**: The following actions can be taken: Here the 'queue size' for the threadpool is considered as reported by the executor itself. 1. Review the historical 'queue size' growth by inspecting the metric given in the message. 2. Review the maximum 'queue size' limits configured in the rate limiting configuration. 3. Try to space out requests that are likely to require a lot of CPU or database power.
|
||||
|
||||
|
||||
|
||||
|
@ -22,6 +22,7 @@ da_scala_library(
|
||||
"//ledger/error",
|
||||
"//ledger/participant-integration-api:participant-integration-api-proto_scala",
|
||||
"//ledger/participant-state",
|
||||
"//observability/metrics",
|
||||
"@maven//:com_google_api_grpc_proto_google_common_protos",
|
||||
"@maven//:io_grpc_grpc_api",
|
||||
"@maven//:org_slf4j_slf4j_api",
|
||||
|
@ -7,6 +7,7 @@ import com.daml.error._
|
||||
import com.daml.error.definitions.ErrorGroups.ParticipantErrorGroup.LedgerApiErrorGroup
|
||||
import com.daml.lf.engine.Error.Validation.ReplayMismatch
|
||||
import com.daml.lf.engine.{Error => LfError}
|
||||
import com.daml.metrics.ExecutorServiceMetrics
|
||||
import org.slf4j.event.Level
|
||||
|
||||
@Explanation(
|
||||
@ -112,7 +113,7 @@ object LedgerApiErrors extends LedgerApiErrorGroup {
|
||||
)
|
||||
@Resolution(
|
||||
"""The following actions can be taken:
|
||||
|Here the 'queue size' for the threadpool = 'submitted tasks' - 'completed tasks' - 'running tasks'
|
||||
|Here the 'queue size' for the threadpool is considered as reported by the executor itself.
|
||||
|1. Review the historical 'queue size' growth by inspecting the metric given in the message.
|
||||
|2. Review the maximum 'queue size' limits configured in the rate limiting configuration.
|
||||
|3. Try to space out requests that are likely to require a lot of CPU or database power.
|
||||
@ -125,18 +126,19 @@ object LedgerApiErrors extends LedgerApiErrorGroup {
|
||||
) {
|
||||
case class Rejection(
|
||||
name: String,
|
||||
metricNameLabel: String,
|
||||
queued: Long,
|
||||
limit: Int,
|
||||
metricPrefix: String,
|
||||
fullMethodName: String,
|
||||
)(implicit errorLogger: ContextualizedErrorLogger)
|
||||
extends DamlErrorWithDefiniteAnswer(
|
||||
s"The $name queue size ($queued) has exceeded the maximum ($limit). Api services metrics are available at $metricPrefix.",
|
||||
s"The $name queue size ($queued) has exceeded the maximum ($limit). Metrics for queue size available at ${ExecutorServiceMetrics.CommonMetricsName.QueuedTasks}.",
|
||||
extraContext = Map(
|
||||
"name" -> name,
|
||||
"queued" -> queued,
|
||||
"limit" -> limit,
|
||||
"metricPrefix" -> metricPrefix,
|
||||
"name_label" -> metricNameLabel,
|
||||
"metrics" -> ExecutorServiceMetrics.CommonMetricsName.QueuedTasks,
|
||||
"fullMethodName" -> fullMethodName,
|
||||
),
|
||||
)
|
||||
|
@ -6,6 +6,7 @@ package com.daml.platform
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.Materializer
|
||||
import com.daml.api.util.TimeProvider
|
||||
import com.daml.executors.QueueAwareExecutionContextExecutorService
|
||||
import com.daml.ledger.api.auth.{AuthService, JwtVerifierLoader}
|
||||
import com.daml.ledger.api.domain
|
||||
import com.daml.ledger.api.health.HealthChecks
|
||||
@ -51,7 +52,9 @@ class LedgerApiServer(
|
||||
// Currently, we provide this flag outside the HOCON configuration objects
|
||||
// in order to ensure that participants cannot be configured to accept explicitly disclosed contracts.
|
||||
explicitDisclosureUnsafeEnabled: Boolean = false,
|
||||
rateLimitingInterceptor: Option[RateLimitingInterceptor] = None,
|
||||
rateLimitingInterceptor: Option[
|
||||
QueueAwareExecutionContextExecutorService => RateLimitingInterceptor
|
||||
] = None,
|
||||
telemetry: Telemetry,
|
||||
)(implicit actorSystem: ActorSystem, materializer: Materializer) {
|
||||
|
||||
@ -162,7 +165,8 @@ class LedgerApiServer(
|
||||
healthChecks = healthChecksWithIndexer + ("write" -> writeService),
|
||||
metrics = metrics,
|
||||
timeServiceBackend = timeServiceBackend,
|
||||
otherInterceptors = rateLimitingInterceptor.toList,
|
||||
otherInterceptors =
|
||||
rateLimitingInterceptor.map(provider => provider(dbSupport.dbDispatcher.executor)).toList,
|
||||
engine = sharedEngine,
|
||||
servicesExecutionContext = servicesExecutionContext,
|
||||
userManagementStore = PersistentUserManagementStore.cached(
|
||||
|
@ -3,11 +3,7 @@
|
||||
|
||||
package com.daml.platform.apiserver.ratelimiting
|
||||
|
||||
import java.lang.management.{ManagementFactory, MemoryMXBean, MemoryPoolMXBean}
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
import com.daml.metrics.Metrics
|
||||
import com.daml.metrics.api.MetricName
|
||||
import com.daml.platform.apiserver.configuration.RateLimitingConfig
|
||||
import com.daml.platform.apiserver.ratelimiting.LimitResult.{
|
||||
LimitResultCheck,
|
||||
@ -15,12 +11,12 @@ import com.daml.platform.apiserver.ratelimiting.LimitResult.{
|
||||
UnderLimit,
|
||||
}
|
||||
import com.daml.platform.apiserver.ratelimiting.RateLimitingInterceptor._
|
||||
import com.daml.platform.apiserver.ratelimiting.ThreadpoolCheck.ThreadpoolCount
|
||||
import com.daml.platform.configuration.ServerRole
|
||||
import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener
|
||||
import io.grpc.{Metadata, ServerCall, ServerCallHandler, ServerInterceptor}
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import java.lang.management.{ManagementFactory, MemoryMXBean, MemoryPoolMXBean}
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import scala.jdk.CollectionConverters.ListHasAsScala
|
||||
import scala.util.Try
|
||||
|
||||
@ -101,11 +97,6 @@ object RateLimitingInterceptor {
|
||||
additionalChecks: List[LimitResultCheck],
|
||||
): RateLimitingInterceptor = {
|
||||
|
||||
val indexDbThreadpool: ThreadpoolCount = new ThreadpoolCount(metrics)(
|
||||
"Index Database Connection Threadpool",
|
||||
MetricName(metrics.daml.index.db.threadpool.connection, ServerRole.ApiServer.threadPoolSuffix),
|
||||
)
|
||||
|
||||
val activeStreamsName = metrics.daml.lapi.streams.activeName
|
||||
val activeStreamsCounter = metrics.daml.lapi.streams.active
|
||||
|
||||
@ -113,13 +104,12 @@ object RateLimitingInterceptor {
|
||||
metrics = metrics,
|
||||
checks = List[LimitResultCheck](
|
||||
MemoryCheck(tenuredMemoryPools, memoryMxBean, config),
|
||||
ThreadpoolCheck(indexDbThreadpool, config.maxApiServicesIndexDbQueueSize),
|
||||
StreamCheck(activeStreamsCounter, activeStreamsName, config.maxStreams),
|
||||
) ::: additionalChecks,
|
||||
)
|
||||
}
|
||||
|
||||
val doNonLimit: Set[String] = Set(
|
||||
private val doNonLimit: Set[String] = Set(
|
||||
"grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo",
|
||||
"grpc.health.v1.Health/Check",
|
||||
"grpc.health.v1.Health/Watch",
|
||||
|
@ -3,11 +3,9 @@
|
||||
|
||||
package com.daml.platform.apiserver.ratelimiting
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.error.definitions.LedgerApiErrors.ThreadpoolOverloaded
|
||||
import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger}
|
||||
import com.daml.metrics.Metrics
|
||||
import com.daml.metrics.api.MetricName
|
||||
import com.daml.executors.QueueAwareExecutionContextExecutorService
|
||||
import com.daml.platform.apiserver.ratelimiting.LimitResult.{
|
||||
LimitResultCheck,
|
||||
OverLimit,
|
||||
@ -19,33 +17,21 @@ object ThreadpoolCheck {
|
||||
private implicit val logger: ContextualizedErrorLogger =
|
||||
DamlContextualizedErrorLogger.forClass(getClass)
|
||||
|
||||
/** Match naming in [[com.codahale.metrics.InstrumentedExecutorService]] */
|
||||
final class ThreadpoolCount(metrics: Metrics)(val name: String, val prefix: MetricName) {
|
||||
private val submitted =
|
||||
metrics.dropwizardFactory.registry.meter(MetricRegistry.name(prefix, "submitted"))
|
||||
private val running =
|
||||
metrics.dropwizardFactory.registry.counter(MetricRegistry.name(prefix, "running"))
|
||||
private val completed =
|
||||
metrics.dropwizardFactory.registry.meter(MetricRegistry.name(prefix, "completed"))
|
||||
|
||||
def queueSize: Long = submitted.getCount - running.getCount - completed.getCount
|
||||
}
|
||||
|
||||
def apply(count: ThreadpoolCount, limit: Int): LimitResultCheck = {
|
||||
apply(count.name, count.prefix, () => count.queueSize, limit)
|
||||
}
|
||||
|
||||
def apply(name: String, prefix: String, queueSize: () => Long, limit: Int): LimitResultCheck =
|
||||
def apply(
|
||||
name: String,
|
||||
queue: QueueAwareExecutionContextExecutorService,
|
||||
limit: Int,
|
||||
): LimitResultCheck =
|
||||
(fullMethodName, _) => {
|
||||
val queued = queueSize()
|
||||
val queued = queue.getQueueSize
|
||||
if (queued > limit) {
|
||||
OverLimit(
|
||||
ThreadpoolOverloaded.Rejection(
|
||||
name = name,
|
||||
queued = queued,
|
||||
limit = limit,
|
||||
metricPrefix = prefix,
|
||||
fullMethodName = fullMethodName,
|
||||
metricNameLabel = queue.name,
|
||||
)
|
||||
)
|
||||
} else UnderLimit
|
||||
|
@ -4,10 +4,10 @@
|
||||
package com.daml.platform.store.dao
|
||||
|
||||
import java.sql.Connection
|
||||
import java.util.concurrent.{Executor, TimeUnit}
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger}
|
||||
import com.daml.executors.InstrumentedExecutors
|
||||
import com.daml.executors.{InstrumentedExecutors, QueueAwareExecutionContextExecutorService}
|
||||
import com.daml.ledger.api.health.{HealthStatus, ReportsHealth}
|
||||
import com.daml.ledger.resources.ResourceOwner
|
||||
import com.daml.logging.LoggingContext.withEnrichedLoggingContext
|
||||
@ -24,6 +24,7 @@ import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
private[platform] trait DbDispatcher {
|
||||
val executor: QueueAwareExecutionContextExecutorService
|
||||
def executeSql[T](databaseMetrics: DatabaseMetrics)(sql: Connection => T)(implicit
|
||||
loggingContext: LoggingContext
|
||||
): Future[T]
|
||||
@ -32,7 +33,7 @@ private[platform] trait DbDispatcher {
|
||||
|
||||
private[dao] final class DbDispatcherImpl private[dao] (
|
||||
connectionProvider: JdbcConnectionProvider,
|
||||
executor: Executor,
|
||||
val executor: QueueAwareExecutionContextExecutorService,
|
||||
overallWaitTimer: Timer,
|
||||
overallExecutionTimer: Timer,
|
||||
)(implicit loggingContext: LoggingContext)
|
||||
|
@ -3,7 +3,8 @@
|
||||
|
||||
package com.daml.platform.apiserver
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import java.util.concurrent.Executors
|
||||
|
||||
import com.daml.error.DamlContextualizedErrorLogger
|
||||
import com.daml.error.definitions.LedgerApiErrors
|
||||
import com.daml.grpc.sampleservice.implementations.HelloServiceReferenceImplementation
|
||||
@ -13,8 +14,7 @@ import com.daml.ledger.resources.{ResourceOwner, TestResourceContext}
|
||||
import com.daml.metrics.Metrics
|
||||
import com.daml.platform.apiserver.GrpcServerSpec._
|
||||
import com.daml.platform.apiserver.configuration.RateLimitingConfig
|
||||
import com.daml.platform.apiserver.ratelimiting.RateLimitingInterceptor
|
||||
import com.daml.platform.configuration.ServerRole
|
||||
import com.daml.platform.apiserver.ratelimiting.{LimitResult, RateLimitingInterceptor}
|
||||
import com.daml.platform.hello.{HelloRequest, HelloResponse, HelloServiceGrpc}
|
||||
import com.daml.ports.Port
|
||||
import com.google.protobuf.ByteString
|
||||
@ -22,8 +22,6 @@ import io.grpc.{ManagedChannel, ServerInterceptor, Status, StatusRuntimeExceptio
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AsyncWordSpec
|
||||
|
||||
import java.util.concurrent.Executors
|
||||
import com.daml.metrics.api.MetricName
|
||||
import scala.concurrent.Future
|
||||
|
||||
final class GrpcServerSpec extends AsyncWordSpec with Matchers with TestResourceContext {
|
||||
@ -90,15 +88,22 @@ final class GrpcServerSpec extends AsyncWordSpec with Matchers with TestResource
|
||||
|
||||
"install rate limit interceptor" in {
|
||||
val metrics = Metrics.ForTesting
|
||||
val rateLimitingInterceptor = RateLimitingInterceptor(metrics, rateLimitingConfig)
|
||||
val rateLimitingInterceptor = RateLimitingInterceptor(
|
||||
metrics,
|
||||
rateLimitingConfig,
|
||||
additionalChecks = List((_, _) =>
|
||||
LimitResult.OverLimit(
|
||||
LedgerApiErrors.ThreadpoolOverloaded.Rejection(
|
||||
"test",
|
||||
"test",
|
||||
100,
|
||||
59,
|
||||
"test",
|
||||
)(DamlContextualizedErrorLogger.forTesting(getClass))
|
||||
)
|
||||
),
|
||||
)
|
||||
resources(metrics, List(rateLimitingInterceptor)).use { channel =>
|
||||
val metricName = MetricName(
|
||||
metrics.daml.index.db.threadpool.connection,
|
||||
ServerRole.ApiServer.threadPoolSuffix,
|
||||
)
|
||||
metrics.dropwizardFactory.registry
|
||||
.meter(MetricRegistry.name(metricName, "submitted"))
|
||||
.mark(rateLimitingConfig.maxApiServicesIndexDbQueueSize.toLong + 1) // Over limit
|
||||
val helloService = HelloServiceGrpc.stub(channel)
|
||||
helloService.single(HelloRequest(7)).failed.map {
|
||||
case s: StatusRuntimeException =>
|
||||
|
@ -5,9 +5,10 @@ package com.daml.platform.apiserver.ratelimiting
|
||||
|
||||
import java.io.IOException
|
||||
import java.lang.management._
|
||||
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
|
||||
import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit}
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.executors.QueueAwareExecutionContextExecutorService
|
||||
import com.daml.grpc.adapter.utils.implementations.HelloServiceAkkaImplementation
|
||||
import com.daml.grpc.sampleservice.implementations.HelloServiceReferenceImplementation
|
||||
import com.daml.ledger.api.health.HealthChecks.ComponentName
|
||||
@ -19,9 +20,7 @@ import com.daml.logging.LoggingContext
|
||||
import com.daml.metrics.Metrics
|
||||
import com.daml.platform.apiserver.configuration.RateLimitingConfig
|
||||
import com.daml.platform.apiserver.ratelimiting.LimitResult.LimitResultCheck
|
||||
import com.daml.platform.apiserver.ratelimiting.ThreadpoolCheck.ThreadpoolCount
|
||||
import com.daml.platform.apiserver.services.GrpcClientResource
|
||||
import com.daml.platform.configuration.ServerRole
|
||||
import com.daml.platform.hello.{HelloRequest, HelloResponse, HelloServiceGrpc}
|
||||
import com.daml.platform.server.api.services.grpc.GrpcHealthService
|
||||
import com.daml.ports.Port
|
||||
@ -45,7 +44,7 @@ import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.time.{Second, Span}
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import scala.concurrent.{Future, Promise}
|
||||
import scala.concurrent.{ExecutionContext, Future, Promise}
|
||||
|
||||
final class RateLimitingInterceptorSpec
|
||||
extends AsyncFlatSpec
|
||||
@ -64,25 +63,36 @@ final class RateLimitingInterceptorSpec
|
||||
|
||||
behavior of "RateLimitingInterceptor"
|
||||
|
||||
it should "limit calls when apiServices DB thread pool executor service is over limit" in {
|
||||
it should "support additional checks" in {
|
||||
val metrics = createMetrics
|
||||
withChannel(metrics, new HelloServiceAkkaImplementation, config).use { channel: Channel =>
|
||||
val helloService = HelloServiceGrpc.stub(channel)
|
||||
val submitted = metrics.dropwizardFactory.registry.meter(
|
||||
MetricRegistry.name(
|
||||
metrics.daml.index.db.threadpool.connection,
|
||||
ServerRole.ApiServer.threadPoolSuffix,
|
||||
"submitted",
|
||||
val queueSizeValues =
|
||||
Iterator(0L, config.maxApiServicesQueueSize.toLong + 1, 0L)
|
||||
val executorWithQueueSize = new QueueAwareExecutionContextExecutorService(
|
||||
ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()),
|
||||
"test",
|
||||
) {
|
||||
override def getQueueSize: Long = queueSizeValues.next()
|
||||
}
|
||||
val threadPoolHumanReadableName = "For testing"
|
||||
withChannel(
|
||||
metrics,
|
||||
new HelloServiceAkkaImplementation,
|
||||
config,
|
||||
additionalChecks = List(
|
||||
ThreadpoolCheck(
|
||||
threadPoolHumanReadableName,
|
||||
executorWithQueueSize,
|
||||
config.maxApiServicesQueueSize,
|
||||
)
|
||||
)
|
||||
),
|
||||
).use { channel: Channel =>
|
||||
val helloService = HelloServiceGrpc.stub(channel)
|
||||
for {
|
||||
_ <- helloService.single(HelloRequest(1))
|
||||
_ = submitted.mark(config.maxApiServicesIndexDbQueueSize.toLong + 1)
|
||||
exception <- helloService.single(HelloRequest(2)).failed
|
||||
_ = submitted.mark(-config.maxApiServicesIndexDbQueueSize.toLong - 1)
|
||||
_ <- helloService.single(HelloRequest(3))
|
||||
} yield {
|
||||
exception.getMessage should include(metrics.daml.index.db.threadpool.connection)
|
||||
exception.toString should include(threadPoolHumanReadableName)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -374,37 +384,6 @@ final class RateLimitingInterceptorSpec
|
||||
underTest.calculateCollectionUsageThreshold(101000) shouldBe 100000 // 101000 - 1000
|
||||
}
|
||||
|
||||
it should "support addition checks" in {
|
||||
val metrics = Metrics(new MetricRegistry, GlobalOpenTelemetry.getMeter("test"))
|
||||
|
||||
val apiServices: ThreadpoolCount = new ThreadpoolCount(metrics)(
|
||||
"Api Services Threadpool",
|
||||
metrics.daml.lapi.threadpool.apiServices,
|
||||
)
|
||||
val apiServicesCheck = ThreadpoolCheck(apiServices, config.maxApiServicesQueueSize)
|
||||
|
||||
withChannel(
|
||||
metrics,
|
||||
new HelloServiceAkkaImplementation,
|
||||
config,
|
||||
additionalChecks = List(apiServicesCheck),
|
||||
).use { channel: Channel =>
|
||||
val helloService = HelloServiceGrpc.stub(channel)
|
||||
val submitted = metrics.dropwizardFactory.registry.meter(
|
||||
MetricRegistry.name(metrics.daml.lapi.threadpool.apiServices, "submitted")
|
||||
)
|
||||
for {
|
||||
_ <- helloService.single(HelloRequest(1))
|
||||
_ = submitted.mark(config.maxApiServicesQueueSize.toLong + 1)
|
||||
exception <- helloService.single(HelloRequest(2)).failed
|
||||
_ = submitted.mark(-config.maxApiServicesQueueSize.toLong - 1)
|
||||
_ <- helloService.single(HelloRequest(3))
|
||||
} yield {
|
||||
exception.getMessage should include(metrics.daml.lapi.threadpool.apiServices)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object RateLimitingInterceptorSpec extends MockitoSugar {
|
||||
@ -436,7 +415,13 @@ object RateLimitingInterceptorSpec extends MockitoSugar {
|
||||
): ResourceOwner[Channel] =
|
||||
for {
|
||||
server <- serverOwner(
|
||||
RateLimitingInterceptor(metrics, config, pool, memoryBean, additionalChecks),
|
||||
RateLimitingInterceptor(
|
||||
metrics,
|
||||
config,
|
||||
pool,
|
||||
memoryBean,
|
||||
additionalChecks,
|
||||
),
|
||||
service,
|
||||
)
|
||||
channel <- GrpcClientResource.owner(Port(server.getPort))
|
||||
|
@ -25,10 +25,12 @@ import org.mockito.MockitoSugar
|
||||
import org.mockito.captor.ArgCaptor
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AnyWordSpecLike
|
||||
|
||||
import java.sql.Connection
|
||||
import java.time.temporal.ChronoUnit
|
||||
import java.time.{LocalDate, LocalTime, OffsetDateTime, ZoneOffset}
|
||||
|
||||
import com.daml.executors.QueueAwareExecutionContextExecutorService
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
//noinspection TypeAnnotation
|
||||
@ -58,6 +60,8 @@ final class MeteringAggregatorSpec extends AnyWordSpecLike with MockitoSugar wit
|
||||
): Future[T] = Future.successful {
|
||||
sql(conn)
|
||||
}
|
||||
override val executor: QueueAwareExecutionContextExecutorService =
|
||||
mock[QueueAwareExecutionContextExecutorService]
|
||||
}
|
||||
|
||||
val parameterStore: ParameterStorageBackend = mock[ParameterStorageBackend]
|
||||
|
@ -5,7 +5,9 @@ package com.daml.platform.indexer.parallel
|
||||
|
||||
import java.sql.Connection
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.Executors
|
||||
|
||||
import com.daml.executors.QueueAwareExecutionContextExecutorService
|
||||
import com.daml.ledger.offset.Offset
|
||||
import com.daml.ledger.participant.state.{v2 => state}
|
||||
import com.daml.lf.crypto.Hash
|
||||
@ -29,7 +31,7 @@ import org.scalatest.flatspec.AnyFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
|
||||
|
||||
import scala.concurrent.{Await, Future}
|
||||
import scala.concurrent.{Await, ExecutionContext, Future}
|
||||
|
||||
class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
|
||||
|
||||
@ -389,6 +391,12 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
|
||||
loggingContext: LoggingContext
|
||||
): Future[T] =
|
||||
Future.successful(sql(connection))
|
||||
|
||||
override val executor: QueueAwareExecutionContextExecutorService =
|
||||
new QueueAwareExecutionContextExecutorService(
|
||||
ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()),
|
||||
"test",
|
||||
)
|
||||
}
|
||||
|
||||
val batchPayload = "Some batch payload"
|
||||
@ -433,10 +441,17 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
|
||||
it should "apply ingestTailFunction on the last batch and forward the batch of batches" in {
|
||||
val connection = new TestConnection
|
||||
val dbDispatcher = new DbDispatcher {
|
||||
|
||||
override def executeSql[T](databaseMetrics: DatabaseMetrics)(sql: Connection => T)(implicit
|
||||
loggingContext: LoggingContext
|
||||
): Future[T] =
|
||||
Future.successful(sql(connection))
|
||||
|
||||
override val executor: QueueAwareExecutionContextExecutorService =
|
||||
new QueueAwareExecutionContextExecutorService(
|
||||
ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()),
|
||||
"test",
|
||||
)
|
||||
}
|
||||
|
||||
val ledgerEnd = ParameterStorageBackend.LedgerEnd(
|
||||
|
@ -9,20 +9,14 @@ import akka.stream.Materializer
|
||||
import akka.stream.scaladsl.Sink
|
||||
import com.daml.api.util.TimeProvider
|
||||
import com.daml.buildinfo.BuildInfo
|
||||
import com.daml.executors.InstrumentedExecutors
|
||||
import com.daml.executors.{InstrumentedExecutors, QueueAwareExecutionContextExecutorService}
|
||||
import com.daml.ledger.api.auth.{
|
||||
AuthServiceJWT,
|
||||
AuthServiceNone,
|
||||
AuthServiceStatic,
|
||||
AuthServiceWildcard,
|
||||
}
|
||||
import com.daml.ledger.api.v1.experimental_features.{
|
||||
CommandDeduplicationFeatures,
|
||||
CommandDeduplicationPeriodSupport,
|
||||
CommandDeduplicationType,
|
||||
ExperimentalContractIds,
|
||||
ExperimentalExplicitDisclosure,
|
||||
}
|
||||
import com.daml.ledger.api.v1.experimental_features._
|
||||
import com.daml.ledger.offset.Offset
|
||||
import com.daml.ledger.participant.state.index.v2.IndexService
|
||||
import com.daml.ledger.participant.state.v2.{Update, WriteService}
|
||||
@ -37,7 +31,6 @@ import com.daml.metrics.api.MetricHandle.Factory
|
||||
import com.daml.metrics.Metrics
|
||||
import com.daml.platform.LedgerApiServer
|
||||
import com.daml.platform.apiserver.configuration.RateLimitingConfig
|
||||
import com.daml.platform.apiserver.ratelimiting.ThreadpoolCheck.ThreadpoolCount
|
||||
import com.daml.platform.apiserver.ratelimiting.{RateLimitingInterceptor, ThreadpoolCheck}
|
||||
import com.daml.platform.apiserver.{LedgerFeatures, TimeServiceBackend}
|
||||
import com.daml.platform.config.ParticipantConfig
|
||||
@ -142,8 +135,10 @@ object SandboxOnXRunner {
|
||||
servicesExecutionContext = servicesExecutionContext,
|
||||
metrics = metrics,
|
||||
explicitDisclosureUnsafeEnabled = true,
|
||||
rateLimitingInterceptor =
|
||||
participantConfig.apiServer.rateLimit.map(buildRateLimitingInterceptor(metrics)),
|
||||
rateLimitingInterceptor = participantConfig.apiServer.rateLimit.map(config =>
|
||||
(dbExecutor: QueueAwareExecutionContextExecutorService) =>
|
||||
buildRateLimitingInterceptor(metrics, dbExecutor, servicesExecutionContext)(config)
|
||||
),
|
||||
telemetry = new DefaultOpenTelemetry(openTelemetry),
|
||||
)(actorSystem, materializer).owner
|
||||
} yield {
|
||||
@ -235,7 +230,7 @@ object SandboxOnXRunner {
|
||||
private def buildServicesExecutionContext(
|
||||
metrics: Metrics,
|
||||
servicesThreadPoolSize: Int,
|
||||
): ResourceOwner[ExecutionContextExecutorService] =
|
||||
): ResourceOwner[QueueAwareExecutionContextExecutorService] =
|
||||
ResourceOwner
|
||||
.forExecutorService(() =>
|
||||
InstrumentedExecutors.newWorkStealingExecutor(
|
||||
@ -290,16 +285,24 @@ object SandboxOnXRunner {
|
||||
}
|
||||
|
||||
def buildRateLimitingInterceptor(
|
||||
metrics: Metrics
|
||||
metrics: Metrics,
|
||||
indexDbExecutor: QueueAwareExecutionContextExecutorService,
|
||||
apiServicesExecutor: QueueAwareExecutionContextExecutorService,
|
||||
)(config: RateLimitingConfig): RateLimitingInterceptor = {
|
||||
|
||||
val apiServices: ThreadpoolCount = new ThreadpoolCount(metrics)(
|
||||
"Api Services Threadpool",
|
||||
metrics.daml.lapi.threadpool.apiServices,
|
||||
val indexDbCheck = ThreadpoolCheck(
|
||||
"Index DB Threadpool",
|
||||
indexDbExecutor,
|
||||
config.maxApiServicesIndexDbQueueSize,
|
||||
)
|
||||
val apiServicesCheck = ThreadpoolCheck(apiServices, config.maxApiServicesQueueSize)
|
||||
|
||||
RateLimitingInterceptor(metrics, config, List(apiServicesCheck))
|
||||
val apiServicesCheck = ThreadpoolCheck(
|
||||
"Api Services Threadpool",
|
||||
apiServicesExecutor,
|
||||
config.maxApiServicesQueueSize,
|
||||
)
|
||||
|
||||
RateLimitingInterceptor(metrics, config, List(indexDbCheck, apiServicesCheck))
|
||||
|
||||
}
|
||||
|
||||
|
@ -12,6 +12,7 @@ da_scala_library(
|
||||
tags = ["maven_coordinates=com.daml:executors:__VERSION__"],
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//libs-scala/scala-utils",
|
||||
"//observability/metrics",
|
||||
"@maven//:io_dropwizard_metrics_metrics_core",
|
||||
],
|
||||
|
@ -8,7 +8,7 @@ import java.util.concurrent.{ExecutorService, ThreadFactory, Executors => JavaEx
|
||||
import com.codahale.metrics.{InstrumentedExecutorService, MetricRegistry}
|
||||
import com.daml.metrics.ExecutorServiceMetrics
|
||||
|
||||
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
object InstrumentedExecutors {
|
||||
|
||||
@ -18,11 +18,16 @@ object InstrumentedExecutors {
|
||||
registry: MetricRegistry,
|
||||
executorServiceMetrics: ExecutorServiceMetrics,
|
||||
errorReporter: Throwable => Unit = ExecutionContext.defaultReporter,
|
||||
): ExecutionContextExecutorService = {
|
||||
): QueueAwareExecutionContextExecutorService = {
|
||||
val executorService = JavaExecutors.newWorkStealingPool(parallelism)
|
||||
val executorServiceWithMetrics =
|
||||
addMetricsToExecutorService(name, registry, executorServiceMetrics, executorService)
|
||||
ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter)
|
||||
val executionContext =
|
||||
ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter)
|
||||
new QueueAwareExecutionContextExecutorService(
|
||||
executionContext,
|
||||
name,
|
||||
)
|
||||
}
|
||||
|
||||
def newFixedThreadPool(
|
||||
@ -31,11 +36,16 @@ object InstrumentedExecutors {
|
||||
registry: MetricRegistry,
|
||||
executorServiceMetrics: ExecutorServiceMetrics,
|
||||
errorReporter: Throwable => Unit = ExecutionContext.defaultReporter,
|
||||
): ExecutionContextExecutorService = {
|
||||
): QueueAwareExecutionContextExecutorService = {
|
||||
val executorService = JavaExecutors.newFixedThreadPool(nThreads)
|
||||
val executorServiceWithMetrics =
|
||||
addMetricsToExecutorService(name, registry, executorServiceMetrics, executorService)
|
||||
ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter)
|
||||
val executionContext =
|
||||
ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter)
|
||||
new QueueAwareExecutionContextExecutorService(
|
||||
executionContext,
|
||||
name,
|
||||
)
|
||||
}
|
||||
|
||||
def newFixedThreadPoolWithFactory(
|
||||
@ -45,11 +55,16 @@ object InstrumentedExecutors {
|
||||
registry: MetricRegistry,
|
||||
executorServiceMetrics: ExecutorServiceMetrics,
|
||||
errorReporter: Throwable => Unit = ExecutionContext.defaultReporter,
|
||||
): ExecutionContextExecutorService = {
|
||||
): QueueAwareExecutionContextExecutorService = {
|
||||
val executorService = JavaExecutors.newFixedThreadPool(nThreads, threadFactory)
|
||||
val executorServiceWithMetrics =
|
||||
addMetricsToExecutorService(name, registry, executorServiceMetrics, executorService)
|
||||
ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter)
|
||||
val executionContext =
|
||||
ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter)
|
||||
new QueueAwareExecutionContextExecutorService(
|
||||
executionContext,
|
||||
name,
|
||||
)
|
||||
}
|
||||
|
||||
def newCachedThreadPoolWithFactory(
|
||||
@ -58,11 +73,16 @@ object InstrumentedExecutors {
|
||||
registry: MetricRegistry,
|
||||
executorServiceMetrics: ExecutorServiceMetrics,
|
||||
errorReporter: Throwable => Unit = ExecutionContext.defaultReporter,
|
||||
): ExecutionContextExecutorService = {
|
||||
): QueueAwareExecutionContextExecutorService = {
|
||||
val executorService = JavaExecutors.newCachedThreadPool(threadFactory)
|
||||
val executorServiceWithMetrics =
|
||||
addMetricsToExecutorService(name, registry, executorServiceMetrics, executorService)
|
||||
ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter)
|
||||
val executionContext =
|
||||
ExecutionContext.fromExecutorService(executorServiceWithMetrics, errorReporter)
|
||||
new QueueAwareExecutionContextExecutorService(
|
||||
executionContext,
|
||||
name,
|
||||
)
|
||||
}
|
||||
|
||||
private def addMetricsToExecutorService(
|
@ -0,0 +1,109 @@
|
||||
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.executors
|
||||
|
||||
import java.util
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import java.util.concurrent.{Callable, Future, TimeUnit}
|
||||
|
||||
import com.daml.scalautil.Statement.discard
|
||||
|
||||
import scala.concurrent.ExecutionContextExecutorService
|
||||
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SeqHasAsJava}
|
||||
|
||||
/** Keeps track of the number of tasks submitted to the executor but that have not started execution just yet.
|
||||
* We use this wrapper to access the queue size in performance critical code paths because
|
||||
* reading the queue size from the executor itself can take an amount of time linear with the number of tasks waiting
|
||||
* in the queues.
|
||||
*/
|
||||
class QueueAwareExecutionContextExecutorService(
|
||||
delegate: ExecutionContextExecutorService,
|
||||
val name: String,
|
||||
) extends ExecutionContextExecutorService {
|
||||
|
||||
private val queueTracking = new AtomicLong(0)
|
||||
|
||||
def getQueueSize: Long = queueTracking.get()
|
||||
|
||||
override def shutdown(): Unit = delegate.shutdown()
|
||||
override def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
|
||||
override def isShutdown: Boolean = delegate.isShutdown
|
||||
override def isTerminated: Boolean = delegate.isTerminated
|
||||
override def awaitTermination(l: Long, timeUnit: TimeUnit): Boolean =
|
||||
delegate.awaitTermination(l, timeUnit)
|
||||
|
||||
override def reportFailure(cause: Throwable): Unit = delegate.reportFailure(cause)
|
||||
|
||||
override def submit[T](
|
||||
callable: Callable[T]
|
||||
): Future[T] = {
|
||||
discard { queueTracking.incrementAndGet() }
|
||||
delegate.submit(new TrackingCallable[T](callable))
|
||||
}
|
||||
|
||||
override def submit[T](
|
||||
runnable: Runnable,
|
||||
t: T,
|
||||
): Future[T] = {
|
||||
discard { queueTracking.incrementAndGet() }
|
||||
delegate.submit(new TrackingRunnable(runnable), t)
|
||||
}
|
||||
|
||||
override def submit(runnable: Runnable): Future[_] = {
|
||||
discard { queueTracking.incrementAndGet() }
|
||||
delegate.submit(new TrackingRunnable(runnable))
|
||||
}
|
||||
|
||||
override def invokeAll[T](
|
||||
collection: util.Collection[_ <: Callable[T]]
|
||||
): util.List[Future[T]] = {
|
||||
discard { queueTracking.updateAndGet(_ + collection.size()) }
|
||||
delegate.invokeAll(collection.asScala.map(new TrackingCallable[T](_)).toSeq.asJava)
|
||||
}
|
||||
|
||||
override def invokeAll[T](
|
||||
collection: util.Collection[_ <: Callable[T]],
|
||||
l: Long,
|
||||
timeUnit: TimeUnit,
|
||||
): util.List[Future[T]] = {
|
||||
discard { queueTracking.updateAndGet(_ + collection.size()) }
|
||||
delegate.invokeAll(collection.asScala.map(new TrackingCallable[T](_)).toSeq.asJava, l, timeUnit)
|
||||
}
|
||||
|
||||
override def invokeAny[T](
|
||||
collection: util.Collection[_ <: Callable[T]]
|
||||
): T = {
|
||||
discard { queueTracking.updateAndGet(_ + collection.size()) }
|
||||
delegate.invokeAny(collection.asScala.map(new TrackingCallable[T](_)).toSeq.asJava)
|
||||
}
|
||||
|
||||
override def invokeAny[T](
|
||||
collection: util.Collection[_ <: Callable[T]],
|
||||
l: Long,
|
||||
timeUnit: TimeUnit,
|
||||
): T = {
|
||||
discard { queueTracking.updateAndGet(_ + collection.size()) }
|
||||
delegate.invokeAny(collection.asScala.map(new TrackingCallable[T](_)).toSeq.asJava, l, timeUnit)
|
||||
}
|
||||
|
||||
override def execute(runnable: Runnable): Unit = {
|
||||
discard { queueTracking.incrementAndGet() }
|
||||
delegate.execute(new TrackingRunnable(runnable))
|
||||
}
|
||||
|
||||
class TrackingRunnable(delegate: Runnable) extends Runnable {
|
||||
override def run(): Unit = {
|
||||
discard { queueTracking.decrementAndGet() }
|
||||
delegate.run()
|
||||
}
|
||||
}
|
||||
|
||||
class TrackingCallable[T](delegate: Callable[T]) extends Callable[T] {
|
||||
override def call(): T = {
|
||||
discard { queueTracking.decrementAndGet() }
|
||||
delegate.call()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user