mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-19 16:57:40 +03:00
Centralize index DB metrics naming (#5927)
* Centralize and review index DB metrics naming Closes #5922 changelog_begin changelog_end * Remove unnecessary factory * Fix compilation issue
This commit is contained in:
parent
1615a79aea
commit
dd6af4e276
@ -0,0 +1,27 @@
|
||||
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.metrics
|
||||
|
||||
import com.codahale.metrics.{MetricRegistry, Timer}
|
||||
|
||||
final class DatabaseMetrics private[metrics] (
|
||||
registry: MetricRegistry,
|
||||
prefix: MetricName,
|
||||
val name: String,
|
||||
) {
|
||||
|
||||
val waitTimer: Timer = registry.timer(prefix :+ name :+ "wait")
|
||||
val executionTimer: Timer = registry.timer(prefix :+ name :+ "exec")
|
||||
val deserializationTimer: Timer = registry.timer(prefix :+ name :+ "deserialization")
|
||||
|
||||
}
|
||||
|
||||
object DatabaseMetrics {
|
||||
|
||||
private[metrics] def apply(registry: MetricRegistry, prefix: MetricName)(
|
||||
name: String,
|
||||
): DatabaseMetrics =
|
||||
new DatabaseMetrics(registry, prefix, name)
|
||||
|
||||
}
|
@ -244,6 +244,7 @@ class Metrics(val registry: MetricRegistry) {
|
||||
val uploadPackages: Timer = registry.timer(prefix :+ "upload_packages")
|
||||
val publishConfiguration: Timer = registry.timer(prefix :+ "publish_configuration")
|
||||
|
||||
// FIXME Name mushing and inconsistencies here, tracked by https://github.com/digital-asset/daml/issues/5926
|
||||
object db {
|
||||
|
||||
val prefix: MetricName = index.prefix :+ "db"
|
||||
@ -273,12 +274,59 @@ class Metrics(val registry: MetricRegistry) {
|
||||
val stopDeduplicatingCommand: Timer =
|
||||
registry.timer(prefix :+ "stop_deduplicating_command")
|
||||
|
||||
def deserialization(description: String): Timer =
|
||||
registry.timer(prefix :+ description :+ "deserialization")
|
||||
def wait(description: String): Timer = registry.timer(prefix :+ description :+ "wait")
|
||||
def exec(description: String): Timer = registry.timer(prefix :+ description :+ "exec")
|
||||
val waitAll: Timer = wait("all")
|
||||
val execAll: Timer = exec("all")
|
||||
private val createDatabaseMetrics: String => DatabaseMetrics =
|
||||
DatabaseMetrics(registry, prefix)(_)
|
||||
|
||||
private val overall = createDatabaseMetrics("all")
|
||||
val waitAll: Timer = overall.waitTimer
|
||||
val execAll: Timer = overall.executionTimer
|
||||
|
||||
val getCompletions: DatabaseMetrics = createDatabaseMetrics("get_completions")
|
||||
val getLedgerId: DatabaseMetrics = createDatabaseMetrics("get_ledger_id")
|
||||
val getLedgerEnd: DatabaseMetrics = createDatabaseMetrics("get_ledger_end")
|
||||
val getInitialLedgerEnd: DatabaseMetrics = createDatabaseMetrics("get_initial_ledger_end")
|
||||
val initializeLedgerParameters: DatabaseMetrics = createDatabaseMetrics(
|
||||
"initialize_ledger_parameters")
|
||||
val lookupConfiguration: DatabaseMetrics = createDatabaseMetrics("lookup_configuration")
|
||||
val loadConfigurationEntries: DatabaseMetrics = createDatabaseMetrics(
|
||||
"load_configuration_entries")
|
||||
val storeConfigurationEntryDao: DatabaseMetrics = createDatabaseMetrics(
|
||||
"store_configuration_entry") // FIXME Base name conflicts with storeConfigurationEntry
|
||||
val storePartyEntryDao
|
||||
: DatabaseMetrics = createDatabaseMetrics("store_party_entry") // FIXME Base name conflicts with storePartyEntry
|
||||
val loadPartyEntries: DatabaseMetrics = createDatabaseMetrics("load_party_entries")
|
||||
val storeTransactionDao
|
||||
: DatabaseMetrics = createDatabaseMetrics("store_ledger_entry") // FIXME Base name conflicts with storeTransaction
|
||||
val storeRejectionDao
|
||||
: DatabaseMetrics = createDatabaseMetrics("store_rejection") // FIXME Base name conflicts with storeRejection
|
||||
val storeInitialStateFromScenario: DatabaseMetrics = createDatabaseMetrics(
|
||||
"store_initial_state_from_scenario")
|
||||
val loadParties: DatabaseMetrics = createDatabaseMetrics("load_parties")
|
||||
val loadAllParties: DatabaseMetrics = createDatabaseMetrics("load_all_parties")
|
||||
val loadPackages: DatabaseMetrics = createDatabaseMetrics("load_packages")
|
||||
val loadArchive: DatabaseMetrics = createDatabaseMetrics("load_archive")
|
||||
val storePackageEntryDao
|
||||
: DatabaseMetrics = createDatabaseMetrics("store_package_entry") // FIXME Base name conflicts with storePackageEntry
|
||||
val loadPackageEntries: DatabaseMetrics = createDatabaseMetrics("load_package_entries")
|
||||
val deduplicateCommandDao
|
||||
: DatabaseMetrics = createDatabaseMetrics("deduplicate_command") // FIXME Base name conflicts with deduplicateCommand
|
||||
val removeExpiredDeduplicationDataDao: DatabaseMetrics = createDatabaseMetrics(
|
||||
"remove_expired_deduplication_data") // FIXME Base name conflicts with removeExpiredDeduplicationData
|
||||
val stopDeduplicatingCommandDao: DatabaseMetrics = createDatabaseMetrics(
|
||||
"stop_deduplicating_command") // FIXME Base name conflicts with stopDeduplicatingCommand
|
||||
val truncateAllTables: DatabaseMetrics = createDatabaseMetrics("truncate_all_tables")
|
||||
val lookupActiveContractDao: DatabaseMetrics = createDatabaseMetrics(
|
||||
"lookup_active_contract") // FIXME Base name conflicts with lookupActiveContract
|
||||
val lookupContractByKey: DatabaseMetrics = createDatabaseMetrics("lookup_contract_by_key")
|
||||
val lookupMaximumLedgerTimeDao: DatabaseMetrics = createDatabaseMetrics(
|
||||
"lookup_maximum_ledger_time") // FIXME Base name conflicts with lookupActiveContract
|
||||
val getFlatTransactions: DatabaseMetrics = createDatabaseMetrics("get_flat_transactions")
|
||||
val lookupFlatTransactionById: DatabaseMetrics = createDatabaseMetrics(
|
||||
"lookup_flat_transaction_by_id")
|
||||
val getTransactionTrees: DatabaseMetrics = createDatabaseMetrics("get_transaction_trees")
|
||||
val lookupTransactionTreeById: DatabaseMetrics = createDatabaseMetrics(
|
||||
"lookup_transaction_tree_by_id")
|
||||
val getActiveContracts: DatabaseMetrics = createDatabaseMetrics("get_active_contracts")
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -9,9 +9,10 @@ import com.daml.ledger.participant.state.v1.Offset
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.ledger.ApplicationId
|
||||
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
|
||||
import com.daml.metrics.Metrics
|
||||
import com.daml.platform.ApiOffset
|
||||
|
||||
private[dao] final class CommandCompletionsReader(dispatcher: DbDispatcher) {
|
||||
private[dao] final class CommandCompletionsReader(dispatcher: DbDispatcher, metrics: Metrics) {
|
||||
|
||||
private def offsetFor(response: CompletionStreamResponse): Offset =
|
||||
ApiOffset.assertFromString(response.checkpoint.get.offset.get.getAbsolute)
|
||||
@ -28,7 +29,7 @@ private[dao] final class CommandCompletionsReader(dispatcher: DbDispatcher) {
|
||||
parties = parties,
|
||||
)
|
||||
Source
|
||||
.future(dispatcher.executeSql("get_completions") { implicit connection =>
|
||||
.future(dispatcher.executeSql(metrics.daml.index.db.getCompletions) { implicit connection =>
|
||||
query.as(CommandCompletionsTable.parser.*)
|
||||
})
|
||||
.mapConcat(_.map(response => offsetFor(response) -> response))
|
||||
|
@ -6,9 +6,10 @@ package com.daml.platform.store.dao
|
||||
import java.sql.Connection
|
||||
import java.util.concurrent.{Executor, Executors, TimeUnit}
|
||||
|
||||
import com.codahale.metrics.Timer
|
||||
import com.daml.ledger.api.health.{HealthStatus, ReportsHealth}
|
||||
import com.daml.logging.{ContextualizedLogger, LoggingContext}
|
||||
import com.daml.metrics.Metrics
|
||||
import com.daml.metrics.{DatabaseMetrics, Metrics}
|
||||
import com.daml.platform.configuration.ServerRole
|
||||
import com.daml.resources.ResourceOwner
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
||||
@ -20,7 +21,8 @@ final class DbDispatcher private (
|
||||
val maxConnections: Int,
|
||||
connectionProvider: HikariJdbcConnectionProvider,
|
||||
executor: Executor,
|
||||
metrics: Metrics,
|
||||
overallWaitTimer: Timer,
|
||||
overallExecutionTimer: Timer,
|
||||
)(implicit logCtx: LoggingContext)
|
||||
extends ReportsHealth {
|
||||
|
||||
@ -35,19 +37,17 @@ final class DbDispatcher private (
|
||||
* The isolation level by default is the one defined in the JDBC driver, it can be however overridden per query on
|
||||
* the Connection. See further details at: https://docs.oracle.com/cd/E19830-01/819-4721/beamv/index.html
|
||||
*/
|
||||
def executeSql[T](description: String, extraLog: => Option[String] = None)(
|
||||
def executeSql[T](databaseMetrics: DatabaseMetrics, extraLog: => Option[String] = None)(
|
||||
sql: Connection => T
|
||||
): Future[T] = {
|
||||
lazy val extraLogMemoized = extraLog
|
||||
val waitTimer = metrics.daml.index.db.wait(description)
|
||||
val execTimer = metrics.daml.index.db.exec(description)
|
||||
val startWait = System.nanoTime()
|
||||
Future {
|
||||
val waitNanos = System.nanoTime() - startWait
|
||||
extraLogMemoized.foreach(log =>
|
||||
logger.trace(s"$description: $log wait ${(waitNanos / 1E6).toLong} ms"))
|
||||
waitTimer.update(waitNanos, TimeUnit.NANOSECONDS)
|
||||
metrics.daml.index.db.waitAll.update(waitNanos, TimeUnit.NANOSECONDS)
|
||||
logger.trace(s"${databaseMetrics.name}: $log wait ${(waitNanos / 1E6).toLong} ms"))
|
||||
databaseMetrics.waitTimer.update(waitNanos, TimeUnit.NANOSECONDS)
|
||||
overallWaitTimer.update(waitNanos, TimeUnit.NANOSECONDS)
|
||||
val startExec = System.nanoTime()
|
||||
try {
|
||||
// Actual execution
|
||||
@ -56,25 +56,27 @@ final class DbDispatcher private (
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
logger.error(
|
||||
s"$description: Got an exception while executing a SQL query. Rolled back the transaction.",
|
||||
s"${databaseMetrics.name}: Got an exception while executing a SQL query. Rolled back the transaction.",
|
||||
e)
|
||||
throw e
|
||||
// fatal errors don't make it for some reason to the setUncaughtExceptionHandler
|
||||
case t: Throwable =>
|
||||
logger.error(s"$description: got a fatal error!", t)
|
||||
logger.error(s"${databaseMetrics.name}: got a fatal error!", t)
|
||||
throw t
|
||||
} finally {
|
||||
// decouple metrics updating from sql execution above
|
||||
try {
|
||||
val execNanos = System.nanoTime() - startExec
|
||||
extraLogMemoized.foreach(log =>
|
||||
logger.trace(s"$description: $log exec ${(execNanos / 1E6).toLong} ms"))
|
||||
execTimer.update(execNanos, TimeUnit.NANOSECONDS)
|
||||
metrics.daml.index.db.execAll.update(execNanos, TimeUnit.NANOSECONDS)
|
||||
logger.trace(s"${databaseMetrics.name}: $log exec ${(execNanos / 1E6).toLong} ms"))
|
||||
databaseMetrics.executionTimer.update(execNanos, TimeUnit.NANOSECONDS)
|
||||
overallExecutionTimer.update(execNanos, TimeUnit.NANOSECONDS)
|
||||
} catch {
|
||||
case t: Throwable =>
|
||||
case NonFatal(e) =>
|
||||
logger
|
||||
.error(s"$description: Got an exception while updating timer metrics. Ignoring.", t)
|
||||
.error(
|
||||
s"${databaseMetrics.name}: Got an exception while updating timer metrics. Ignoring.",
|
||||
e)
|
||||
}
|
||||
}
|
||||
}(executionContext)
|
||||
@ -106,5 +108,12 @@ object DbDispatcher {
|
||||
logger.error("Uncaught exception in the SQL executor.", e))
|
||||
.build()
|
||||
))
|
||||
} yield new DbDispatcher(maxConnections, connectionProvider, executor, metrics)
|
||||
} yield
|
||||
new DbDispatcher(
|
||||
maxConnections = maxConnections,
|
||||
connectionProvider = connectionProvider,
|
||||
executor = executor,
|
||||
overallWaitTimer = metrics.daml.index.db.waitAll,
|
||||
overallExecutionTimer = metrics.daml.index.db.execAll,
|
||||
)
|
||||
}
|
||||
|
@ -101,7 +101,7 @@ private class JdbcLedgerDao(
|
||||
|
||||
override def lookupLedgerId(): Future[Option[LedgerId]] =
|
||||
dbDispatcher
|
||||
.executeSql("get_ledger_id") { implicit conn =>
|
||||
.executeSql(metrics.daml.index.db.getLedgerId) { implicit conn =>
|
||||
SQL_SELECT_LEDGER_ID
|
||||
.as(ledgerString("ledger_id").map(id => LedgerId(id.toString)).singleOpt)
|
||||
}
|
||||
@ -109,7 +109,7 @@ private class JdbcLedgerDao(
|
||||
private val SQL_SELECT_LEDGER_END = SQL("select ledger_end from parameters")
|
||||
|
||||
override def lookupLedgerEnd(): Future[Offset] =
|
||||
dbDispatcher.executeSql("get_ledger_end") { implicit conn =>
|
||||
dbDispatcher.executeSql(metrics.daml.index.db.getLedgerEnd) { implicit conn =>
|
||||
SQL_SELECT_LEDGER_END
|
||||
.as(offset("ledger_end").single)
|
||||
}
|
||||
@ -117,7 +117,7 @@ private class JdbcLedgerDao(
|
||||
private val SQL_SELECT_INITIAL_LEDGER_END = SQL("select ledger_end from parameters")
|
||||
|
||||
override def lookupInitialLedgerEnd(): Future[Option[Offset]] =
|
||||
dbDispatcher.executeSql("get_initial_ledger_end") { implicit conn =>
|
||||
dbDispatcher.executeSql(metrics.daml.index.db.getInitialLedgerEnd) { implicit conn =>
|
||||
SQL_SELECT_INITIAL_LEDGER_END
|
||||
.as(offset("ledger_end").?.single)
|
||||
}
|
||||
@ -126,7 +126,7 @@ private class JdbcLedgerDao(
|
||||
"insert into parameters(ledger_id, ledger_end) VALUES({LedgerId}, {LedgerEnd})")
|
||||
|
||||
override def initializeLedger(ledgerId: LedgerId, ledgerEnd: Offset): Future[Unit] =
|
||||
dbDispatcher.executeSql("initialize_ledger_parameters") { implicit conn =>
|
||||
dbDispatcher.executeSql(metrics.daml.index.db.initializeLedgerParameters) { implicit conn =>
|
||||
val _ = SQL_INITIALIZE
|
||||
.on("LedgerId" -> ledgerId.unwrap, "LedgerEnd" -> ledgerEnd)
|
||||
.execute()
|
||||
@ -179,7 +179,8 @@ private class JdbcLedgerDao(
|
||||
SQL_SELECT_CURRENT_CONFIGURATION.as(currentConfigurationParser)
|
||||
|
||||
override def lookupLedgerConfiguration(): Future[Option[(Offset, Configuration)]] =
|
||||
dbDispatcher.executeSql("lookup_configuration")(implicit conn => selectLedgerConfiguration)
|
||||
dbDispatcher.executeSql(metrics.daml.index.db.lookupConfiguration)(implicit conn =>
|
||||
selectLedgerConfiguration)
|
||||
|
||||
private val acceptType = "accept"
|
||||
private val rejectType = "reject"
|
||||
@ -229,7 +230,7 @@ private class JdbcLedgerDao(
|
||||
endInclusive: Offset): Source[(Offset, ConfigurationEntry), NotUsed] =
|
||||
PaginatingAsyncStream(PageSize) { queryOffset =>
|
||||
dbDispatcher.executeSql(
|
||||
"load_configuration_entries",
|
||||
metrics.daml.index.db.loadConfigurationEntries,
|
||||
Some(
|
||||
s"bounds: ]${startExclusive.toApiString}, ${endInclusive.toApiString}] queryOffset $queryOffset")) {
|
||||
implicit conn =>
|
||||
@ -257,60 +258,62 @@ private class JdbcLedgerDao(
|
||||
configuration: Configuration,
|
||||
rejectionReason: Option[String]
|
||||
): Future[PersistenceResponse] = {
|
||||
dbDispatcher.executeSql("store_configuration_entry", Some(s"submissionId=$submissionId")) {
|
||||
implicit conn =>
|
||||
val optCurrentConfig = selectLedgerConfiguration
|
||||
val optExpectedGeneration: Option[Long] =
|
||||
optCurrentConfig.map { case (_, c) => c.generation + 1 }
|
||||
val finalRejectionReason: Option[String] =
|
||||
optExpectedGeneration match {
|
||||
case Some(expGeneration)
|
||||
if rejectionReason.isEmpty && expGeneration != configuration.generation =>
|
||||
// If we're not storing a rejection and the new generation is not succ of current configuration, then
|
||||
// we store a rejection. This code path is only expected to be taken in sandbox. This follows the same
|
||||
// pattern as with transactions.
|
||||
Some(
|
||||
s"Generation mismatch: expected=$expGeneration, actual=${configuration.generation}")
|
||||
dbDispatcher.executeSql(
|
||||
metrics.daml.index.db.storeConfigurationEntryDao,
|
||||
Some(s"submissionId=$submissionId"),
|
||||
) { implicit conn =>
|
||||
val optCurrentConfig = selectLedgerConfiguration
|
||||
val optExpectedGeneration: Option[Long] =
|
||||
optCurrentConfig.map { case (_, c) => c.generation + 1 }
|
||||
val finalRejectionReason: Option[String] =
|
||||
optExpectedGeneration match {
|
||||
case Some(expGeneration)
|
||||
if rejectionReason.isEmpty && expGeneration != configuration.generation =>
|
||||
// If we're not storing a rejection and the new generation is not succ of current configuration, then
|
||||
// we store a rejection. This code path is only expected to be taken in sandbox. This follows the same
|
||||
// pattern as with transactions.
|
||||
Some(
|
||||
s"Generation mismatch: expected=$expGeneration, actual=${configuration.generation}")
|
||||
|
||||
case _ =>
|
||||
// Rejection reason was set, or we have no previous configuration generation, in which case we accept any
|
||||
// generation.
|
||||
rejectionReason
|
||||
}
|
||||
|
||||
updateLedgerEnd(offset)
|
||||
val configurationBytes = Configuration.encode(configuration).toByteArray
|
||||
val typ = if (finalRejectionReason.isEmpty) {
|
||||
acceptType
|
||||
} else {
|
||||
rejectType
|
||||
case _ =>
|
||||
// Rejection reason was set, or we have no previous configuration generation, in which case we accept any
|
||||
// generation.
|
||||
rejectionReason
|
||||
}
|
||||
|
||||
Try({
|
||||
SQL_INSERT_CONFIGURATION_ENTRY
|
||||
.on(
|
||||
"ledger_offset" -> offset,
|
||||
"recorded_at" -> recordedAt,
|
||||
"submission_id" -> submissionId,
|
||||
"participant_id" -> participantId,
|
||||
"typ" -> typ,
|
||||
"rejection_reason" -> finalRejectionReason.orNull,
|
||||
"configuration" -> configurationBytes
|
||||
)
|
||||
.execute()
|
||||
updateLedgerEnd(offset)
|
||||
val configurationBytes = Configuration.encode(configuration).toByteArray
|
||||
val typ = if (finalRejectionReason.isEmpty) {
|
||||
acceptType
|
||||
} else {
|
||||
rejectType
|
||||
}
|
||||
|
||||
if (typ == acceptType) {
|
||||
updateCurrentConfiguration(configurationBytes)
|
||||
}
|
||||
Try({
|
||||
SQL_INSERT_CONFIGURATION_ENTRY
|
||||
.on(
|
||||
"ledger_offset" -> offset,
|
||||
"recorded_at" -> recordedAt,
|
||||
"submission_id" -> submissionId,
|
||||
"participant_id" -> participantId,
|
||||
"typ" -> typ,
|
||||
"rejection_reason" -> finalRejectionReason.orNull,
|
||||
"configuration" -> configurationBytes
|
||||
)
|
||||
.execute()
|
||||
|
||||
PersistenceResponse.Ok
|
||||
}).recover {
|
||||
case NonFatal(e) if e.getMessage.contains(queries.DUPLICATE_KEY_ERROR) =>
|
||||
logger.warn(
|
||||
s"Ignoring duplicate configuration submission, submissionId=$submissionId, participantId=$participantId")
|
||||
conn.rollback()
|
||||
PersistenceResponse.Duplicate
|
||||
}.get
|
||||
if (typ == acceptType) {
|
||||
updateCurrentConfiguration(configurationBytes)
|
||||
}
|
||||
|
||||
PersistenceResponse.Ok
|
||||
}).recover {
|
||||
case NonFatal(e) if e.getMessage.contains(queries.DUPLICATE_KEY_ERROR) =>
|
||||
logger.warn(
|
||||
s"Ignoring duplicate configuration submission, submissionId=$submissionId, participantId=$participantId")
|
||||
conn.rollback()
|
||||
PersistenceResponse.Duplicate
|
||||
}.get
|
||||
|
||||
}
|
||||
}
|
||||
@ -331,7 +334,7 @@ private class JdbcLedgerDao(
|
||||
offset: Offset,
|
||||
partyEntry: PartyLedgerEntry,
|
||||
): Future[PersistenceResponse] = {
|
||||
dbDispatcher.executeSql("store_party_entry") { implicit conn =>
|
||||
dbDispatcher.executeSql(metrics.daml.index.db.storePartyEntryDao) { implicit conn =>
|
||||
updateLedgerEnd(offset)
|
||||
|
||||
partyEntry match {
|
||||
@ -438,7 +441,7 @@ private class JdbcLedgerDao(
|
||||
endInclusive: Offset): Source[(Offset, PartyLedgerEntry), NotUsed] = {
|
||||
PaginatingAsyncStream(PageSize) { queryOffset =>
|
||||
dbDispatcher.executeSql(
|
||||
"load_party_entries",
|
||||
metrics.daml.index.db.loadPartyEntries,
|
||||
Some(
|
||||
s"bounds: ]${startExclusive.toApiString}, ${endInclusive.toApiString}] queryOffset $queryOffset")) {
|
||||
implicit conn =>
|
||||
@ -470,7 +473,7 @@ private class JdbcLedgerDao(
|
||||
divulged: Iterable[DivulgedContract],
|
||||
): Future[PersistenceResponse] =
|
||||
dbDispatcher
|
||||
.executeSql("store_ledger_entry") { implicit conn =>
|
||||
.executeSql(metrics.daml.index.db.storeTransactionDao) { implicit conn =>
|
||||
val error =
|
||||
postCommitValidation.validate(
|
||||
transaction = transaction,
|
||||
@ -506,7 +509,7 @@ private class JdbcLedgerDao(
|
||||
offset: Offset,
|
||||
reason: RejectionReason,
|
||||
): Future[PersistenceResponse] =
|
||||
dbDispatcher.executeSql("store_rejection") { implicit conn =>
|
||||
dbDispatcher.executeSql(metrics.daml.index.db.storeRejectionDao) { implicit conn =>
|
||||
for (info @ SubmitterInfo(submitter, _, commandId, _) <- submitterInfo) {
|
||||
stopDeduplicatingCommandSync(domain.CommandId(commandId), submitter)
|
||||
prepareRejectionInsert(info, offset, recordTime, reason).execute()
|
||||
@ -521,7 +524,7 @@ private class JdbcLedgerDao(
|
||||
): Future[Unit] = {
|
||||
dbDispatcher
|
||||
.executeSql(
|
||||
"store_initial_state_from_scenario",
|
||||
metrics.daml.index.db.storeInitialStateFromScenario,
|
||||
Some(s"saving ${ledgerEntries.size} ledger entries")) { implicit conn =>
|
||||
ledgerEntries.foreach {
|
||||
case (offset, entry) =>
|
||||
@ -605,7 +608,7 @@ private class JdbcLedgerDao(
|
||||
Future.successful(List.empty)
|
||||
else
|
||||
dbDispatcher
|
||||
.executeSql("load_parties") { implicit conn =>
|
||||
.executeSql(metrics.daml.index.db.loadParties) { implicit conn =>
|
||||
SQL_SELECT_MULTIPLE_PARTIES
|
||||
.on("parties" -> parties)
|
||||
.as(PartyDataParser.*)
|
||||
@ -614,7 +617,7 @@ private class JdbcLedgerDao(
|
||||
|
||||
override def listKnownParties(): Future[List[PartyDetails]] =
|
||||
dbDispatcher
|
||||
.executeSql("load_all_parties") { implicit conn =>
|
||||
.executeSql(metrics.daml.index.db.loadAllParties) { implicit conn =>
|
||||
SQL_SELECT_ALL_PARTIES
|
||||
.as(PartyDataParser.*)
|
||||
}
|
||||
@ -651,7 +654,7 @@ private class JdbcLedgerDao(
|
||||
|
||||
override def listLfPackages: Future[Map[PackageId, PackageDetails]] =
|
||||
dbDispatcher
|
||||
.executeSql("load_packages") { implicit conn =>
|
||||
.executeSql(metrics.daml.index.db.loadPackages) { implicit conn =>
|
||||
SQL_SELECT_PACKAGES
|
||||
.as(PackageDataParser.*)
|
||||
}
|
||||
@ -665,7 +668,7 @@ private class JdbcLedgerDao(
|
||||
|
||||
override def getLfArchive(packageId: PackageId): Future[Option[Archive]] =
|
||||
dbDispatcher
|
||||
.executeSql("load_archive", Some(s"pkg id: $packageId")) { implicit conn =>
|
||||
.executeSql(metrics.daml.index.db.loadArchive, Some(s"pkg id: $packageId")) { implicit conn =>
|
||||
SQL_SELECT_PACKAGE
|
||||
.on(
|
||||
"package_id" -> packageId
|
||||
@ -692,7 +695,7 @@ private class JdbcLedgerDao(
|
||||
optEntry: Option[PackageLedgerEntry]
|
||||
): Future[PersistenceResponse] = {
|
||||
dbDispatcher.executeSql(
|
||||
"store_package_entry",
|
||||
metrics.daml.index.db.storePackageEntryDao,
|
||||
Some(s"packages: ${packages.map(_._1.getHash).mkString(", ")}")) { implicit conn =>
|
||||
updateLedgerEnd(offset)
|
||||
|
||||
@ -767,7 +770,7 @@ private class JdbcLedgerDao(
|
||||
endInclusive: Offset): Source[(Offset, PackageLedgerEntry), NotUsed] = {
|
||||
PaginatingAsyncStream(PageSize) { queryOffset =>
|
||||
dbDispatcher.executeSql(
|
||||
"load_package_entries",
|
||||
metrics.daml.index.db.loadPackageEntries,
|
||||
Some(
|
||||
s"bounds: ]${startExclusive.toApiString}, ${endInclusive.toApiString}] queryOffset $queryOffset")) {
|
||||
implicit conn =>
|
||||
@ -803,7 +806,7 @@ private class JdbcLedgerDao(
|
||||
submitter: Ref.Party,
|
||||
submittedAt: Instant,
|
||||
deduplicateUntil: Instant): Future[CommandDeduplicationResult] =
|
||||
dbDispatcher.executeSql("deduplicate_command") { implicit conn =>
|
||||
dbDispatcher.executeSql(metrics.daml.index.db.deduplicateCommandDao) { implicit conn =>
|
||||
val key = deduplicationKey(commandId, submitter)
|
||||
// Insert a new deduplication entry, or update an expired entry
|
||||
val updated = SQL(queries.SQL_INSERT_COMMAND)
|
||||
@ -832,11 +835,12 @@ private class JdbcLedgerDao(
|
||||
""".stripMargin)
|
||||
|
||||
override def removeExpiredDeduplicationData(currentTime: Instant): Future[Unit] =
|
||||
dbDispatcher.executeSql("remove_expired_deduplication_data") { implicit conn =>
|
||||
SQL_DELETE_EXPIRED_COMMANDS
|
||||
.on("currentTime" -> currentTime)
|
||||
.execute()
|
||||
()
|
||||
dbDispatcher.executeSql(metrics.daml.index.db.removeExpiredDeduplicationDataDao) {
|
||||
implicit conn =>
|
||||
SQL_DELETE_EXPIRED_COMMANDS
|
||||
.on("currentTime" -> currentTime)
|
||||
.execute()
|
||||
()
|
||||
}
|
||||
|
||||
private val SQL_DELETE_COMMAND = SQL("""
|
||||
@ -856,12 +860,12 @@ private class JdbcLedgerDao(
|
||||
override def stopDeduplicatingCommand(
|
||||
commandId: domain.CommandId,
|
||||
submitter: Party): Future[Unit] =
|
||||
dbDispatcher.executeSql("stop_deduplicating_command") { implicit conn =>
|
||||
dbDispatcher.executeSql(metrics.daml.index.db.stopDeduplicatingCommandDao) { implicit conn =>
|
||||
stopDeduplicatingCommandSync(commandId, submitter)
|
||||
}
|
||||
|
||||
override def reset(): Future[Unit] =
|
||||
dbDispatcher.executeSql("truncate_all_tables") { implicit conn =>
|
||||
dbDispatcher.executeSql(metrics.daml.index.db.truncateAllTables) { implicit conn =>
|
||||
val _ = SQL(queries.SQL_TRUNCATE_TABLES).execute()
|
||||
}
|
||||
|
||||
@ -875,7 +879,7 @@ private class JdbcLedgerDao(
|
||||
ContractsReader(dbDispatcher, executionContext, dbType, metrics)
|
||||
|
||||
override val completions: CommandCompletionsReader =
|
||||
new CommandCompletionsReader(dbDispatcher)
|
||||
new CommandCompletionsReader(dbDispatcher, metrics)
|
||||
|
||||
private val postCommitValidation =
|
||||
if (performPostCommitValidation)
|
||||
|
@ -30,11 +30,6 @@ private[dao] sealed abstract class ContractsReader(
|
||||
|
||||
import ContractsReader._
|
||||
|
||||
private val lookupActiveContract = "lookup_active_contract"
|
||||
|
||||
private val lookupActiveContractDeserializationTimer =
|
||||
metrics.daml.index.db.deserialization(lookupActiveContract)
|
||||
|
||||
private val contractRowParser: RowParser[(String, InputStream)] =
|
||||
str("template_id") ~ binaryStream("create_argument") map SqlParser.flatten
|
||||
|
||||
@ -45,7 +40,7 @@ private[dao] sealed abstract class ContractsReader(
|
||||
contractId: ContractId,
|
||||
): Future[Option[Contract]] =
|
||||
dispatcher
|
||||
.executeSql(lookupActiveContract) { implicit connection =>
|
||||
.executeSql(metrics.daml.index.db.lookupActiveContractDao) { implicit connection =>
|
||||
SQL"select participant_contracts.contract_id, template_id, create_argument from #$contractsTable where contract_witness = $submitter and participant_contracts.contract_id = $contractId"
|
||||
.as(contractRowParser.singleOpt)
|
||||
}
|
||||
@ -55,7 +50,8 @@ private[dao] sealed abstract class ContractsReader(
|
||||
contractId = contractId,
|
||||
templateId = templateId,
|
||||
createArgument = createArgument,
|
||||
deserializationTimer = lookupActiveContractDeserializationTimer,
|
||||
deserializationTimer =
|
||||
metrics.daml.index.db.lookupActiveContractDao.deserializationTimer,
|
||||
)
|
||||
})(executionContext)
|
||||
|
||||
@ -63,13 +59,13 @@ private[dao] sealed abstract class ContractsReader(
|
||||
submitter: Party,
|
||||
key: Key,
|
||||
): Future[Option[ContractId]] =
|
||||
dispatcher.executeSql("lookup_contract_by_key") { implicit connection =>
|
||||
dispatcher.executeSql(metrics.daml.index.db.lookupContractByKey) { implicit connection =>
|
||||
lookupContractKeyQuery(submitter, key).as(contractId("contract_id").singleOpt)
|
||||
}
|
||||
|
||||
override def lookupMaximumLedgerTime(ids: Set[ContractId]): Future[Option[Instant]] =
|
||||
dispatcher
|
||||
.executeSql("lookup_maximum_ledger_time") { implicit connection =>
|
||||
.executeSql(metrics.daml.index.db.lookupMaximumLedgerTimeDao) { implicit connection =>
|
||||
committedContracts.lookupMaximumLedgerTime(ids)
|
||||
}
|
||||
.map(_.get)(executionContext)
|
||||
|
@ -33,24 +33,7 @@ private[dao] final class TransactionsReader(
|
||||
metrics: Metrics,
|
||||
) {
|
||||
|
||||
// Metrics names
|
||||
private val getFlatTransactions: String = "get_flat_transactions"
|
||||
private val lookupFlatTransactionById: String = "lookup_flat_transaction_by_id"
|
||||
private val getTransactionTrees: String = "get_transaction_trees"
|
||||
private val lookupTransactionTreeById: String = "lookup_transaction_tree_by_id"
|
||||
private val getActiveContracts: String = "get_active_contracts"
|
||||
|
||||
// Timers for deserialization on per-query basis
|
||||
private val getFlatTransactionsDeserializationTimer =
|
||||
metrics.daml.index.db.deserialization(getFlatTransactions)
|
||||
private val lookupFlatTransactionByIdDeserializationTimer =
|
||||
metrics.daml.index.db.deserialization(lookupFlatTransactionById)
|
||||
private val getTransactionTreesDeserializationTimer =
|
||||
metrics.daml.index.db.deserialization(getTransactionTrees)
|
||||
private val lookupTransactionTreeByIdDeserializationTimer =
|
||||
metrics.daml.index.db.deserialization(lookupTransactionTreeById)
|
||||
private val getActiveContractsDeserializationTimer =
|
||||
metrics.daml.index.db.deserialization(getActiveContracts)
|
||||
private val dbMetrics = metrics.daml.index.db
|
||||
|
||||
private def offsetFor(response: GetTransactionsResponse): Offset =
|
||||
ApiOffset.assertFromString(response.transactions.head.offset)
|
||||
@ -76,7 +59,7 @@ private[dao] final class TransactionsReader(
|
||||
rowOffset = offset,
|
||||
)
|
||||
.withFetchSize(Some(pageSize))
|
||||
dispatcher.executeSql(getFlatTransactions) { implicit connection =>
|
||||
dispatcher.executeSql(dbMetrics.getFlatTransactions) { implicit connection =>
|
||||
query.asVectorOf(EventsTable.rawFlatEventParser)
|
||||
}
|
||||
}
|
||||
@ -85,7 +68,7 @@ private[dao] final class TransactionsReader(
|
||||
.flatMapConcat { events =>
|
||||
val response = EventsTable.Entry.toGetTransactionsResponse(
|
||||
verbose = verbose,
|
||||
deserializationTimer = getFlatTransactionsDeserializationTimer,
|
||||
deserializationTimer = dbMetrics.getFlatTransactions.deserializationTimer,
|
||||
)(events)
|
||||
Source(response.map(r => offsetFor(r) -> r))
|
||||
}
|
||||
@ -98,7 +81,7 @@ private[dao] final class TransactionsReader(
|
||||
val query = EventsTable.prepareLookupFlatTransactionById(transactionId, requestingParties)
|
||||
dispatcher
|
||||
.executeSql(
|
||||
description = lookupFlatTransactionById,
|
||||
databaseMetrics = dbMetrics.lookupFlatTransactionById,
|
||||
extraLog = Some(s"tx: $transactionId, parties = ${requestingParties.mkString(", ")}"),
|
||||
) { implicit connection =>
|
||||
query.asVectorOf(EventsTable.rawFlatEventParser)
|
||||
@ -106,7 +89,7 @@ private[dao] final class TransactionsReader(
|
||||
.map(
|
||||
EventsTable.Entry.toGetFlatTransactionResponse(
|
||||
verbose = true,
|
||||
deserializationTimer = lookupFlatTransactionByIdDeserializationTimer,
|
||||
deserializationTimer = dbMetrics.lookupFlatTransactionById.deserializationTimer,
|
||||
)
|
||||
)(executionContext)
|
||||
}
|
||||
@ -129,7 +112,7 @@ private[dao] final class TransactionsReader(
|
||||
rowOffset = offset,
|
||||
)
|
||||
.withFetchSize(Some(pageSize))
|
||||
dispatcher.executeSql(getTransactionTrees) { implicit connection =>
|
||||
dispatcher.executeSql(dbMetrics.getTransactionTrees) { implicit connection =>
|
||||
query.asVectorOf(EventsTable.rawTreeEventParser)
|
||||
}
|
||||
}
|
||||
@ -138,7 +121,7 @@ private[dao] final class TransactionsReader(
|
||||
.flatMapConcat { events =>
|
||||
val response = EventsTable.Entry.toGetTransactionTreesResponse(
|
||||
verbose = verbose,
|
||||
deserializationTimer = getTransactionTreesDeserializationTimer,
|
||||
deserializationTimer = dbMetrics.getTransactionTrees.deserializationTimer,
|
||||
)(events)
|
||||
Source(response.map(r => offsetFor(r) -> r))
|
||||
}
|
||||
@ -151,7 +134,7 @@ private[dao] final class TransactionsReader(
|
||||
val query = EventsTable.prepareLookupTransactionTreeById(transactionId, requestingParties)
|
||||
dispatcher
|
||||
.executeSql(
|
||||
description = lookupTransactionTreeById,
|
||||
databaseMetrics = dbMetrics.lookupTransactionTreeById,
|
||||
extraLog = Some(s"tx: $transactionId, parties = ${requestingParties.mkString(", ")}"),
|
||||
) { implicit connection =>
|
||||
query.asVectorOf(EventsTable.rawTreeEventParser)
|
||||
@ -159,7 +142,7 @@ private[dao] final class TransactionsReader(
|
||||
.map(
|
||||
EventsTable.Entry.toGetTransactionResponse(
|
||||
verbose = true,
|
||||
deserializationTimer = lookupTransactionTreeByIdDeserializationTimer,
|
||||
deserializationTimer = dbMetrics.lookupTransactionTreeById.deserializationTimer,
|
||||
))(executionContext)
|
||||
}
|
||||
|
||||
@ -179,7 +162,7 @@ private[dao] final class TransactionsReader(
|
||||
rowOffset = offset,
|
||||
)
|
||||
.withFetchSize(Some(pageSize))
|
||||
dispatcher.executeSql(getActiveContracts) { implicit connection =>
|
||||
dispatcher.executeSql(dbMetrics.getActiveContracts) { implicit connection =>
|
||||
query.asVectorOf(EventsTable.rawFlatEventParser)
|
||||
}
|
||||
}
|
||||
@ -189,7 +172,7 @@ private[dao] final class TransactionsReader(
|
||||
Source(
|
||||
EventsTable.Entry.toGetActiveContractsResponse(
|
||||
verbose = verbose,
|
||||
deserializationTimer = getActiveContractsDeserializationTimer,
|
||||
deserializationTimer = dbMetrics.getActiveContracts.deserializationTimer,
|
||||
)(events)
|
||||
)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user