mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-19 08:48:21 +03:00
Instrument Ledger API and DbDispatcher threadpools (#8789)
CHANGELOG_BEGIN CHANGELOG_END
This commit is contained in:
parent
75fded8b75
commit
19bf4031f5
@ -253,6 +253,12 @@ final class Metrics(val registry: MetricRegistry) {
|
||||
private val Prefix: MetricName = daml.Prefix :+ "lapi"
|
||||
|
||||
def forMethod(name: String): Timer = registry.timer(Prefix :+ name)
|
||||
|
||||
object threadpool {
|
||||
private val Prefix: MetricName = lapi.Prefix :+ "threadpool"
|
||||
|
||||
val apiServices: MetricName = Prefix :+ "api-services"
|
||||
}
|
||||
}
|
||||
|
||||
object ledger {
|
||||
|
@ -6,7 +6,7 @@ package com.daml.platform.store.dao
|
||||
import java.sql.Connection
|
||||
import java.util.concurrent.{Executor, Executors, TimeUnit}
|
||||
|
||||
import com.codahale.metrics.Timer
|
||||
import com.codahale.metrics.{InstrumentedExecutorService, Timer}
|
||||
import com.daml.ledger.api.health.{HealthStatus, ReportsHealth}
|
||||
import com.daml.ledger.resources.ResourceOwner
|
||||
import com.daml.logging.LoggingContext.withEnrichedLoggingContext
|
||||
@ -31,7 +31,8 @@ private[platform] final class DbDispatcher private (
|
||||
|
||||
private val executionContext = ExecutionContext.fromExecutor(executor)
|
||||
|
||||
override def currentHealth(): HealthStatus = connectionProvider.currentHealth()
|
||||
override def currentHealth(): HealthStatus =
|
||||
connectionProvider.currentHealth()
|
||||
|
||||
/** Runs an SQL statement in a dedicated Executor. The whole block will be run in a single database transaction.
|
||||
*
|
||||
@ -97,15 +98,20 @@ private[platform] object DbDispatcher {
|
||||
metrics.registry,
|
||||
connectionAsyncCommit,
|
||||
)
|
||||
threadPoolName = s"daml.index.db.threadpool.connection.${serverRole.threadPoolSuffix}"
|
||||
executor <- ResourceOwner.forExecutorService(() =>
|
||||
Executors.newFixedThreadPool(
|
||||
maxConnections,
|
||||
new ThreadFactoryBuilder()
|
||||
.setNameFormat(s"daml.index.db.connection.${serverRole.threadPoolSuffix}-%d")
|
||||
.setUncaughtExceptionHandler((_, e) =>
|
||||
logger.error("Uncaught exception in the SQL executor.", e)
|
||||
)
|
||||
.build(),
|
||||
new InstrumentedExecutorService(
|
||||
Executors.newFixedThreadPool(
|
||||
maxConnections,
|
||||
new ThreadFactoryBuilder()
|
||||
.setNameFormat(s"$threadPoolName-%d")
|
||||
.setUncaughtExceptionHandler((_, e) =>
|
||||
logger.error("Uncaught exception in the SQL executor.", e)
|
||||
)
|
||||
.build(),
|
||||
),
|
||||
metrics.registry,
|
||||
threadPoolName,
|
||||
)
|
||||
)
|
||||
} yield new DbDispatcher(
|
||||
|
@ -9,6 +9,7 @@ import java.util.concurrent.{Executors, TimeUnit}
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.Materializer
|
||||
import com.codahale.metrics.InstrumentedExecutorService
|
||||
import com.daml.daml_lf_dev.DamlLf.Archive
|
||||
import com.daml.ledger.api.health.HealthChecks
|
||||
import com.daml.ledger.participant.state.kvutils.app.Config.EngineMode
|
||||
@ -128,7 +129,13 @@ final class Runner[T <: ReadWriteService, Extra](
|
||||
)
|
||||
)
|
||||
servicesExecutionContext <- ResourceOwner
|
||||
.forExecutorService(() => Executors.newWorkStealingPool())
|
||||
.forExecutorService(() =>
|
||||
new InstrumentedExecutorService(
|
||||
Executors.newWorkStealingPool(),
|
||||
metrics.registry,
|
||||
metrics.daml.lapi.threadpool.apiServices.toString,
|
||||
)
|
||||
)
|
||||
.map(ExecutionContext.fromExecutorService)
|
||||
.acquire()
|
||||
_ <- participantConfig.mode match {
|
||||
|
Loading…
Reference in New Issue
Block a user