mirror of
https://github.com/digital-asset/daml.git
synced 2024-11-10 10:46:11 +03:00
[DPP-1141] Run VACUUM ANALYZE from Benchtool at the end of submission step (PG only) (#15385)
changelog_begin changelog_end
This commit is contained in:
parent
f240917e94
commit
ba55d37467
@ -46,6 +46,9 @@ da_scala_library(
|
||||
"@maven//:org_typelevel_cats_core",
|
||||
],
|
||||
visibility = ["//visibility:public"],
|
||||
runtime_deps = [
|
||||
"@maven//:org_postgresql_postgresql",
|
||||
],
|
||||
deps = [
|
||||
"//daml-lf/archive:daml_lf_archive_reader",
|
||||
"//daml-lf/language",
|
||||
|
@ -140,7 +140,18 @@ class LedgerApiBenchTool(
|
||||
submissionConfig = submissionConfig,
|
||||
metricRegistry = metricRegistry,
|
||||
partyAllocating = partyAllocating,
|
||||
).map(_ -> BenchtoolTestsPackageInfo.StaticDefault)
|
||||
)
|
||||
.map(_ -> BenchtoolTestsPackageInfo.StaticDefault)
|
||||
.map { v =>
|
||||
// We manually execute a 'VACUUM ANALYZE' at the end of the submission step (if IndexDB is on Postgresql),
|
||||
// to make sure query planner statistics, visibility map, etc.. are all up-to-date.
|
||||
config.ledger.indexDbJdbcUrlO.foreach { indexDbJdbcUrl =>
|
||||
if (indexDbJdbcUrl.startsWith("jdbc:postgresql:")) {
|
||||
PostgresUtils.invokeVacuumAnalyze(indexDbJdbcUrl)
|
||||
}
|
||||
}
|
||||
v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,69 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.api.benchtool
|
||||
|
||||
import java.sql.{Connection, DriverManager, Statement}
|
||||
|
||||
import com.daml.ledger.api.benchtool.LedgerApiBenchTool.logger
|
||||
|
||||
object PostgresUtils {
|
||||
|
||||
def invokeVacuumAnalyze(indexDbJdbcUrl: String): Unit = {
|
||||
val connection = DriverManager.getConnection(indexDbJdbcUrl)
|
||||
try {
|
||||
val stmt = connection.createStatement()
|
||||
val vacuumQuery = "VACUUM ANALYZE"
|
||||
try {
|
||||
logger.info(
|
||||
s"Executing '$vacuumQuery' on the IndexDB identified by JDBC URL: '${indexDbJdbcUrl}' ..."
|
||||
)
|
||||
stmt.executeUpdate(vacuumQuery)
|
||||
logger.info(s"Executed '$vacuumQuery'")
|
||||
} finally {
|
||||
stmt.close()
|
||||
inspectVacuumAndAnalyzeState(connection)
|
||||
}
|
||||
} finally {
|
||||
connection.close()
|
||||
}
|
||||
}
|
||||
|
||||
private def inspectVacuumAndAnalyzeState(connection: Connection): Unit = {
|
||||
val stmt = connection.createStatement()
|
||||
val query =
|
||||
"SELECT relname, last_vacuum, last_autovacuum, last_analyze, last_autoanalyze FROM pg_stat_user_tables ORDER BY relname"
|
||||
try {
|
||||
logger.info(
|
||||
"Executing SQL query: " + query
|
||||
)
|
||||
stmt.execute(
|
||||
query
|
||||
)
|
||||
printQueryResult(stmt)
|
||||
} finally {
|
||||
stmt.close()
|
||||
}
|
||||
}
|
||||
|
||||
private def printQueryResult(s: Statement): Unit = {
|
||||
val rs = s.getResultSet
|
||||
val meta = rs.getMetaData
|
||||
val colCount = meta.getColumnCount
|
||||
val buffer = new StringBuffer()
|
||||
try {
|
||||
while (rs.next()) {
|
||||
val text = 1
|
||||
.to(colCount)
|
||||
.map(colNumber =>
|
||||
f"${meta.getColumnName(colNumber)} ${rs.getString(colNumber) + ","}%-45s"
|
||||
)
|
||||
.mkString(" ")
|
||||
buffer.append(text).append("\n")
|
||||
}
|
||||
} finally {
|
||||
logger.info(buffer.toString)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -28,6 +28,11 @@ object Cli {
|
||||
config.copy(ledger = config.ledger.copy(hostname = hostname, port = port))
|
||||
}
|
||||
|
||||
opt[String]("indexdb-jdbc-url")
|
||||
.text("JDBC url to an IndexDB instance")
|
||||
.optional()
|
||||
.action { case (url, config) => config.withLedgerConfig(_.copy(indexDbJdbcUrlO = Some(url))) }
|
||||
|
||||
opt[WorkflowConfig.StreamConfig]("consume-stream")
|
||||
.abbr("s")
|
||||
.optional()
|
||||
|
@ -21,12 +21,15 @@ case class Config(
|
||||
authorizationTokenSecret: Option[String],
|
||||
latencyTest: Boolean,
|
||||
maxLatencyObjectiveMillis: Long,
|
||||
)
|
||||
) {
|
||||
def withLedgerConfig(f: Config.Ledger => Config.Ledger): Config = copy(ledger = f(ledger))
|
||||
}
|
||||
|
||||
object Config {
|
||||
case class Ledger(
|
||||
hostname: String,
|
||||
port: Int,
|
||||
indexDbJdbcUrlO: Option[String] = None,
|
||||
)
|
||||
|
||||
case class Concurrency(
|
||||
|
Loading…
Reference in New Issue
Block a user