Removed ExecutorServiceMetrics from InstrumentedExecutors. (#18336)

* InstrumentedExecutors without execution context metrics.

* canton related changes
This commit is contained in:
Andreas Triantafyllos 2024-01-31 13:08:02 +01:00 committed by GitHub
parent ae098ce147
commit 369fab5a08
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 5 additions and 25 deletions

View File

@ -100,14 +100,12 @@ private[platform] object InMemoryStateUpdater {
InstrumentedExecutors.newWorkStealingExecutor(
metrics.daml.lapi.threadpool.indexBypass.prepareUpdates,
prepareUpdatesParallelism,
metrics.executorServiceMetrics,
)
)
updateCachesExecutor <- ResourceOwner.forExecutorService(() =>
InstrumentedExecutors.newFixedThreadPool(
metrics.daml.lapi.threadpool.indexBypass.updateInMemoryState,
1,
metrics.executorServiceMetrics,
)
)
logger = loggerFactory.getTracedLogger(getClass)

View File

@ -218,7 +218,6 @@ final class IndexServiceOwner(
InstrumentedExecutors.newWorkStealingExecutor(
metrics.daml.lapi.threadpool.inMemoryFanOut.toString,
threadPoolSize,
metrics.executorServiceMetrics,
)
)

View File

@ -36,14 +36,13 @@ object AsyncSupport {
val logger = loggerFactory.getTracedLogger(getClass)
ResourceOwner
.forExecutorService { () =>
val (executorName, executorServiceMetrics) = withMetric
val (executorName, _executorServiceMetrics) = withMetric
InstrumentedExecutors.newFixedThreadPoolWithFactory(
executorName,
size,
new ThreadFactoryBuilder()
.setNameFormat(s"$namePrefix-%d")
.build,
executorServiceMetrics,
throwable =>
logger
.error(s"ExecutionContext $namePrefix has failed with an exception", throwable),

View File

@ -82,7 +82,6 @@ object ParallelIndexerFactory {
"ha-coordinator",
1,
new ThreadFactoryBuilder().setNameFormat("ha-coordinator-%d").build,
metrics.executorServiceMetrics,
throwable =>
logger
.error(

View File

@ -169,7 +169,6 @@ object DbDispatcher {
.error("Uncaught exception in the SQL executor.", e)(TraceContext.empty)
)
.build(),
metrics.executorServiceMetrics,
),
gracefulAwaitTerminationMillis =
5000, // waiting 5s for ongoing SQL operations to finish and then forcing them with Thread.interrupt...

View File

@ -13,6 +13,5 @@ da_scala_library(
visibility = ["//visibility:public"],
deps = [
"//libs-scala/scala-utils",
"//observability/metrics",
],
)

View File

@ -6,7 +6,6 @@ package com.daml.executors
import java.util.concurrent.{ThreadFactory, Executors => JavaExecutors}
import com.daml.executors.executors.QueueAwareExecutionContextExecutorService
import com.daml.metrics.ExecutorServiceMetrics
import scala.concurrent.ExecutionContext
@ -15,14 +14,11 @@ object InstrumentedExecutors {
def newWorkStealingExecutor(
name: String,
parallelism: Int,
executorServiceMetrics: ExecutorServiceMetrics,
errorReporter: Throwable => Unit = ExecutionContext.defaultReporter,
): QueueAwareExecutionContextExecutorService = {
val executorService = JavaExecutors.newWorkStealingPool(parallelism)
val executorServiceWithMetrics = executorServiceMetrics
.monitorExecutorService(name, executorService)
new QueueAwareExecutionContextExecutorService(
executorServiceWithMetrics,
executorService,
name,
errorReporter,
)
@ -31,14 +27,11 @@ object InstrumentedExecutors {
def newFixedThreadPool(
name: String,
nThreads: Int,
executorServiceMetrics: ExecutorServiceMetrics,
errorReporter: Throwable => Unit = ExecutionContext.defaultReporter,
): QueueAwareExecutionContextExecutorService = {
val executorService = JavaExecutors.newFixedThreadPool(nThreads)
val executorServiceWithMetrics = executorServiceMetrics
.monitorExecutorService(name, executorService)
new QueueAwareExecutionContextExecutorService(
executorServiceWithMetrics,
executorService,
name,
errorReporter,
)
@ -48,14 +41,11 @@ object InstrumentedExecutors {
name: String,
nThreads: Int,
threadFactory: ThreadFactory,
executorServiceMetrics: ExecutorServiceMetrics,
errorReporter: Throwable => Unit = ExecutionContext.defaultReporter,
): QueueAwareExecutionContextExecutorService = {
val executorService = JavaExecutors.newFixedThreadPool(nThreads, threadFactory)
val executorServiceWithMetrics = executorServiceMetrics
.monitorExecutorService(name, executorService)
new QueueAwareExecutionContextExecutorService(
executorServiceWithMetrics,
executorService,
name,
errorReporter,
)
@ -64,14 +54,11 @@ object InstrumentedExecutors {
def newCachedThreadPoolWithFactory(
name: String,
threadFactory: ThreadFactory,
executorServiceMetrics: ExecutorServiceMetrics,
errorReporter: Throwable => Unit = ExecutionContext.defaultReporter,
): QueueAwareExecutionContextExecutorService = {
val executorService = JavaExecutors.newCachedThreadPool(threadFactory)
val executorServiceWithMetrics = executorServiceMetrics
.monitorExecutorService(name, executorService)
new QueueAwareExecutionContextExecutorService(
executorServiceWithMetrics,
executorService,
name,
errorReporter,
)