Enable asynchronous commit per connection level (#8419)

* Enable asynchronous commit per connection level

CHANGELOG_BEGIN
CHANGELOG_END

* Added HikariConnectionSpec test for asserting async commits in connection
This commit is contained in:
tudor-da 2021-01-13 08:28:53 +02:00 committed by GitHub
parent e6312fa52c
commit 13a5715213
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 153 additions and 49 deletions

View File

@ -237,6 +237,7 @@ test_deps = [
"@maven//:org_scalatestplus_scalacheck_1_14_2_12",
"@maven//:org_scalaz_scalaz_core_2_12",
"@maven//:org_slf4j_slf4j_api",
"@maven//:com_zaxxer_HikariCP",
]
openssl_executable = "@openssl_dev_env//:bin/openssl" if not is_windows else "@openssl_dev_env//:usr/bin/openssl.exe"

View File

@ -7,16 +7,29 @@ private[platform] sealed abstract class DbType(
val name: String,
val driver: String,
val supportsParallelWrites: Boolean,
val supportsAsynchronousCommits: Boolean,
)
private[platform] object DbType {
object Postgres extends DbType("postgres", "org.postgresql.Driver", true)
object Postgres
extends DbType(
"postgres",
"org.postgresql.Driver",
supportsParallelWrites = true,
supportsAsynchronousCommits = true,
)
// H2 does not support concurrent, conditional updates to the ledger_end at read committed isolation
// level: "It is possible that a transaction from one connection overtakes a transaction from a different
// connection. Depending on the operations, this might result in different results, for example when conditionally
// incrementing a value in a row." - from http://www.h2database.com/html/advanced.html
object H2Database extends DbType("h2database", "org.h2.Driver", false)
object H2Database
extends DbType(
"h2database",
"org.h2.Driver",
supportsParallelWrites = false,
supportsAsynchronousCommits = false,
)
def jdbcType(jdbcUrl: String): DbType = jdbcUrl match {
case h2 if h2.startsWith("jdbc:h2:") => H2Database

View File

@ -65,6 +65,7 @@ private[platform] class FlywayMigrations(jdbcUrl: String)(implicit loggingContex
maxPoolSize = 2,
connectionTimeout = 5.seconds,
metrics = None,
connectionAsyncCommit = false,
)
}

View File

@ -84,6 +84,7 @@ private[platform] object DbDispatcher {
jdbcUrl: String,
maxConnections: Int,
metrics: Metrics,
connectionAsyncCommit: Boolean,
)(implicit loggingContext: LoggingContext): ResourceOwner[DbDispatcher] =
for {
connectionProvider <- HikariJdbcConnectionProvider.owner(
@ -91,6 +92,7 @@ private[platform] object DbDispatcher {
jdbcUrl,
maxConnections,
metrics.registry,
connectionAsyncCommit,
)
executor <- ResourceOwner.forExecutorService(() =>
Executors.newFixedThreadPool(

View File

@ -31,6 +31,7 @@ private[platform] final class HikariConnection(
metrics: Option[MetricRegistry],
connectionPoolPrefix: String,
maxInitialConnectRetryAttempts: Int,
connectionAsyncCommit: Boolean,
)(implicit loggingContext: LoggingContext)
extends ResourceOwner[HikariDataSource] {
@ -38,8 +39,10 @@ private[platform] final class HikariConnection(
override def acquire()(implicit context: ResourceContext): Resource[HikariDataSource] = {
val config = new HikariConfig
val dbType = DbType.jdbcType(jdbcUrl)
config.setJdbcUrl(jdbcUrl)
config.setDriverClassName(DbType.jdbcType(jdbcUrl).driver)
config.setDriverClassName(dbType.driver)
config.addDataSourceProperty("cachePrepStmts", "true")
config.addDataSourceProperty("prepStmtCacheSize", "128")
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048")
@ -50,6 +53,8 @@ private[platform] final class HikariConnection(
config.setPoolName(s"$connectionPoolPrefix.${serverRole.threadPoolSuffix}")
metrics.foreach(config.setMetricRegistry)
configureAsyncCommit(config, dbType)
// Hikari dies if a database connection could not be opened almost immediately
// regardless of any connection timeout settings. We retry connections so that
// Postgres and Sandbox can be started in any order.
@ -67,6 +72,17 @@ private[platform] final class HikariConnection(
}
)(conn => Future { conn.close() })
}
private def configureAsyncCommit(config: HikariConfig, dbType: DbType): Unit =
if (connectionAsyncCommit && dbType.supportsAsynchronousCommits) {
logger.info("Creating Hikari connections with asynchronous commit enabled")
config.setConnectionInitSql("SET synchronous_commit=OFF")
} else if (dbType.supportsAsynchronousCommits) {
logger.info("Creating Hikari connections with asynchronous commit disabled")
config.setConnectionInitSql("SET synchronous_commit=ON")
} else if (connectionAsyncCommit) {
logger.warn(s"Asynchronous commits are not compatible with ${dbType.name} database backend")
}
}
private[platform] object HikariConnection {
@ -80,6 +96,7 @@ private[platform] object HikariConnection {
maxPoolSize: Int,
connectionTimeout: FiniteDuration,
metrics: Option[MetricRegistry],
connectionAsyncCommit: Boolean,
)(implicit
loggingContext: LoggingContext
): HikariConnection =
@ -92,6 +109,7 @@ private[platform] object HikariConnection {
metrics,
ConnectionPoolPrefix,
MaxInitialConnectRetryAttempts,
connectionAsyncCommit,
)
}
@ -157,6 +175,7 @@ private[platform] object HikariJdbcConnectionProvider {
jdbcUrl: String,
maxConnections: Int,
metrics: MetricRegistry,
connectionAsyncCommit: Boolean = false,
)(implicit loggingContext: LoggingContext): ResourceOwner[HikariJdbcConnectionProvider] =
for {
// these connections should never time out as we have the same number of threads as connections
@ -167,6 +186,7 @@ private[platform] object HikariJdbcConnectionProvider {
maxConnections,
250.millis,
Some(metrics),
connectionAsyncCommit,
)
healthPoller <- ResourceOwner.forTimer(() =>
new Timer(s"${classOf[HikariJdbcConnectionProvider].getName}#healthPoller")

View File

@ -85,7 +85,6 @@ private class JdbcLedgerDao(
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
validatePartyAllocation: Boolean,
enableAsyncCommits: Boolean = false,
) extends LedgerDao {
import JdbcLedgerDao._
@ -97,13 +96,6 @@ private class JdbcLedgerDao(
private val logger = ContextualizedLogger.get(this.getClass)
LoggingContext.newLoggingContext { implicit loggingContext =>
if (enableAsyncCommits)
logger.info("Starting JdbcLedgerDao with async commit enabled")
else
logger.info("Starting JdbcLedgerDao with async commit disabled")
}
override def currentHealth(): HealthStatus = dbDispatcher.currentHealth()
override def lookupLedgerId()(implicit loggingContext: LoggingContext): Future[Option[LedgerId]] =
@ -131,16 +123,19 @@ private class JdbcLedgerDao(
override def initializeLedger(ledgerId: LedgerId)(implicit
loggingContext: LoggingContext
): Future[Unit] =
dbDispatcher.executeSql(metrics.daml.index.db.initializeLedgerParameters)(
ParametersTable.setLedgerId(ledgerId.unwrap)
)
dbDispatcher.executeSql(metrics.daml.index.db.initializeLedgerParameters) {
implicit connection =>
queries.enforceSynchronousCommit
ParametersTable.setLedgerId(ledgerId.unwrap)(connection)
}
override def initializeParticipantId(participantId: ParticipantId)(implicit
loggingContext: LoggingContext
): Future[Unit] =
dbDispatcher.executeSql(metrics.daml.index.db.initializeParticipantId)(
ParametersTable.setParticipantId(participantId.unwrap)
)
dbDispatcher.executeSql(metrics.daml.index.db.initializeParticipantId) { implicit connection =>
queries.enforceSynchronousCommit
ParametersTable.setParticipantId(participantId.unwrap)(connection)
}
private val SQL_GET_CONFIGURATION_ENTRIES = SQL(
"select * from configuration_entries where ledger_offset > {startExclusive} and ledger_offset <= {endInclusive} order by ledger_offset asc limit {pageSize} offset {queryOffset}"
@ -223,9 +218,6 @@ private class JdbcLedgerDao(
dbDispatcher.executeSql(
metrics.daml.index.db.storeConfigurationEntryDbMetrics
) { implicit conn =>
if (enableAsyncCommits) {
queries.enableAsyncCommit
}
val optCurrentConfig = ParametersTable.getLedgerEndAndConfiguration(conn)
val optExpectedGeneration: Option[Long] =
optCurrentConfig.map { case (_, c) => c.generation + 1 }
@ -300,9 +292,6 @@ private class JdbcLedgerDao(
partyEntry: PartyLedgerEntry,
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] = {
dbDispatcher.executeSql(metrics.daml.index.db.storePartyEntryDbMetrics) { implicit conn =>
if (enableAsyncCommits) {
queries.enableAsyncCommit
}
ParametersTable.updateLedgerEnd(offsetStep)
val savepoint = conn.setSavepoint()
@ -471,9 +460,6 @@ private class JdbcLedgerDao(
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] =
dbDispatcher
.executeSql(metrics.daml.index.db.storeTransactionDbMetrics) { implicit conn =>
if (enableAsyncCommits) {
queries.enableAsyncCommit
}
val error =
Timed.value(
metrics.daml.index.db.storeTransactionDbMetrics.commitValidation,
@ -508,9 +494,6 @@ private class JdbcLedgerDao(
reason: RejectionReason,
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] =
dbDispatcher.executeSql(metrics.daml.index.db.storeRejectionDbMetrics) { implicit conn =>
if (enableAsyncCommits) {
queries.enableAsyncCommit
}
for (info <- submitterInfo) {
handleError(offsetStep.offset, info, recordTime, reason)
}
@ -524,6 +507,7 @@ private class JdbcLedgerDao(
)(implicit loggingContext: LoggingContext): Future[Unit] =
dbDispatcher.executeSql(metrics.daml.index.db.storeInitialStateFromScenario) {
implicit connection =>
queries.enforceSynchronousCommit
ledgerEntries.foreach { case (offset, entry) =>
entry match {
case tx: LedgerEntry.Transaction =>
@ -687,9 +671,6 @@ private class JdbcLedgerDao(
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] =
dbDispatcher.executeSql(metrics.daml.index.db.storePackageEntryDbMetrics) {
implicit connection =>
if (enableAsyncCommits) {
queries.enableAsyncCommit
}
ParametersTable.updateLedgerEnd(offsetStep)
if (packages.nonEmpty) {
@ -1038,7 +1019,13 @@ private[platform] object JdbcLedgerDao {
jdbcAsyncCommits: Boolean,
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] =
for {
dbDispatcher <- DbDispatcher.owner(serverRole, jdbcUrl, maxConnections, metrics)
dbDispatcher <- DbDispatcher.owner(
serverRole,
jdbcUrl,
maxConnections,
metrics,
jdbcAsyncCommits,
)
executor <- ResourceOwner.forExecutorService(() => Executors.newWorkStealingPool())
} yield new JdbcLedgerDao(
maxConnections,
@ -1050,15 +1037,11 @@ private[platform] object JdbcLedgerDao {
metrics,
lfValueTranslationCache,
validatePartyAllocation,
jdbcAsyncCommits,
)
sealed trait Queries {
/** Performance optimization for transactions that don't
* require strong durability guarantees.
*/
protected[JdbcLedgerDao] def enableAsyncCommit(implicit conn: Connection): Unit
protected[JdbcLedgerDao] def enforceSynchronousCommit(implicit conn: Connection): Unit
protected[JdbcLedgerDao] def SQL_INSERT_PACKAGE: String
@ -1101,8 +1084,10 @@ private[platform] object JdbcLedgerDao {
|truncate table party_entries cascade;
""".stripMargin
override protected[JdbcLedgerDao] def enableAsyncCommit(implicit conn: Connection): Unit = {
val statement = conn.prepareStatement("SET LOCAL synchronous_commit = 'off'")
override protected[JdbcLedgerDao] def enforceSynchronousCommit(implicit
conn: Connection
): Unit = {
val statement = conn.prepareStatement("SET LOCAL synchronous_commit = 'on'")
try {
statement.execute()
()
@ -1146,7 +1131,9 @@ private[platform] object JdbcLedgerDao {
|set referential_integrity true;
""".stripMargin
/** Async commit not supported for H2 */
override protected[JdbcLedgerDao] def enableAsyncCommit(implicit conn: Connection): Unit = ()
/** H2 does not support asynchronous commits */
override protected[JdbcLedgerDao] def enforceSynchronousCommit(implicit
conn: Connection
): Unit = ()
}
}

View File

@ -25,6 +25,14 @@
<appender-ref ref="MEM" />
</logger>
<appender name="JdbcIndexerSpecAppender" class="com.daml.platform.testing.LogCollector">
<test>com.daml.platform.indexer.JdbcIndexerSpec</test>
</appender>
<logger name="com.daml.platform.store.dao.HikariConnection" level="INFO">
<appender-ref ref="JdbcIndexerSpecAppender" />
</logger>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="ASYNC"/>

View File

@ -6,6 +6,7 @@ package com.daml.platform.indexer
import java.time.Duration
import akka.stream.scaladsl.Source
import ch.qos.logback.classic.Level
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.participant.state.v1
@ -21,8 +22,10 @@ import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.common.MismatchException
import com.daml.platform.configuration.ServerRole
import com.daml.platform.indexer
import com.daml.platform.store.IndexMetadata
import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.platform.testing.LogCollector
import com.daml.testing.postgresql.PostgresAroundEach
import org.mockito.MockitoSugar
import org.scalatest.flatspec.AsyncFlatSpec
@ -74,11 +77,29 @@ final class JdbcIndexerSpec
}
}
it should "use asynchronous commits with PostgreSQL" in {
val participantId = "the-participant"
for {
indexer <- initializeIndexer(participantId)
_ = LogCollector.clear[this.type]
_ <- runAndShutdown(indexer)
} yield {
val hikariDataSourceLogs =
LogCollector.read[this.type]("com.daml.platform.store.dao.HikariConnection")
hikariDataSourceLogs should contain(
Level.INFO -> "Creating Hikari connections with asynchronous commit enabled"
)
}
}
private def runAndShutdown[A](owner: ResourceOwner[A]): Future[Unit] =
owner.use(_ => Future.unit)
private def runAndShutdownIndexer(participantId: String): Future[Unit] =
new JdbcIndexer.Factory(
initializeIndexer(participantId).flatMap(runAndShutdown)
private def initializeIndexer(participantId: String): Future[ResourceOwner[JdbcIndexer]] = {
new indexer.JdbcIndexer.Factory(
ServerRole.Indexer,
IndexerConfig(
participantId = v1.ParticipantId.assertFromString(participantId),
@ -88,7 +109,8 @@ final class JdbcIndexerSpec
mockedReadService,
new Metrics(new MetricRegistry),
LfValueTranslation.Cache.none,
).migrateSchema(allowExistingSchema = true).flatMap(runAndShutdown)
).migrateSchema(allowExistingSchema = true)
}
private val mockedReadService: ReadService =
when(mock[ReadService].getLedgerInitialConditions())

View File

@ -0,0 +1,50 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.dao
import com.daml.ledger.resources.TestResourceContext
import com.daml.logging.LoggingContext
import com.daml.platform.configuration.ServerRole
import com.daml.testing.postgresql.PostgresAroundAll
import org.mockito.MockitoSugar
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import scala.concurrent.duration._
class HikariConnectionSpec
extends AsyncFlatSpec
with Matchers
with MockitoSugar
with TestResourceContext
with PostgresAroundAll {
it should "use asynchronous commits if enabled" in {
assertAsyncCommit(asyncCommitEnabled = true, expectedQueryResponse = "off")
}
it should "not use asynchronous commits if disabled" in {
assertAsyncCommit(asyncCommitEnabled = false, expectedQueryResponse = "on")
}
private def assertAsyncCommit(asyncCommitEnabled: Boolean, expectedQueryResponse: String) =
newHikariConnection(asyncCommitEnabled)
.use { hikariDataSource =>
val rs =
hikariDataSource.getConnection.createStatement().executeQuery("SHOW synchronous_commit")
rs.next()
rs.getString(1) shouldBe expectedQueryResponse
}
private def newHikariConnection(asyncCommitEnabled: Boolean) =
HikariConnection.owner(
serverRole = ServerRole.Testing(this.getClass),
jdbcUrl = postgresDatabase.url,
minimumIdle = 2,
maxPoolSize = 2,
connectionTimeout = 5.seconds,
metrics = None,
connectionAsyncCommit = asyncCommitEnabled,
)(LoggingContext.ForTesting)
}

View File

@ -13,7 +13,7 @@
<appender-ref ref="SqlLedgerSpecAppender" />
</logger>
<logger name="com.daml.platform.store.dao.JdbcLedgerDao" level="INFO">
<logger name="com.daml.platform.store.dao.HikariConnection" level="INFO">
<appender-ref ref="SqlLedgerSpecAppender" />
</logger>
</configuration>

View File

@ -220,10 +220,10 @@ final class SqlLedgerSpec
for {
_ <- createSqlLedger(validatePartyAllocation = false)
} yield {
val jdbcLedgerDaoLogs =
LogCollector.read[this.type]("com.daml.platform.store.dao.JdbcLedgerDao")
jdbcLedgerDaoLogs should contain(
Level.INFO -> "Starting JdbcLedgerDao with async commit disabled"
val hikariDataSourceLogs =
LogCollector.read[this.type]("com.daml.platform.store.dao.HikariConnection")
hikariDataSourceLogs should contain(
Level.INFO -> "Creating Hikari connections with asynchronous commit disabled"
)
}
}