Make async commit configurable and add "LOCAL" async commit option (#9144)

In replicated Postgres setups, running with SYNCHRONOUS_COMMIT=LOCAL is a
suitable option as it is a trade off to only locally commit synchronously
but not to wait for a network ack.

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Oliver Seeliger 2021-03-16 08:01:01 +01:00 committed by GitHub
parent ab90c437a7
commit 6018697fb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 75 additions and 43 deletions

View File

@ -6,6 +6,7 @@ package com.daml.platform.indexer
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.platform.configuration.IndexConfiguration
import com.daml.platform.indexer.IndexerConfig._
import com.daml.platform.store.DbType
import scala.concurrent.duration.{DurationInt, FiniteDuration}
@ -18,6 +19,7 @@ case class IndexerConfig(
eventsPageSize: Int = IndexConfiguration.DefaultEventsPageSize,
updatePreparationParallelism: Int = DefaultUpdatePreparationParallelism,
allowExistingSchema: Boolean = false,
asyncCommitMode: DbType.AsyncCommitMode = DefaultAsyncCommitMode,
)
object IndexerConfig {
@ -26,5 +28,6 @@ object IndexerConfig {
val DefaultRestartDelay: FiniteDuration = 10.seconds
// Should be greater than or equal to the number of pipline stages
val DefaultDatabaseConnectionPoolSize: Int = 3
val DefaultAsyncCommitMode: DbType.AsyncCommitMode = DbType.AsynchronousCommit
}

View File

@ -59,7 +59,7 @@ object JdbcIndexer {
servicesExecutionContext,
metrics,
lfValueTranslationCache,
jdbcAsyncCommits = true,
jdbcAsyncCommitMode = config.asyncCommitMode,
enricher = None,
),
new FlywayMigrations(config.jdbcUrl),

View File

@ -40,4 +40,17 @@ private[platform] object DbType {
case _ =>
sys.error(s"JDBC URL doesn't match any supported databases (h2, pg)")
}
sealed trait AsyncCommitMode {
def setting: String
}
object SynchronousCommit extends AsyncCommitMode {
override val setting: String = "ON"
}
object AsynchronousCommit extends AsyncCommitMode {
override val setting: String = "OFF"
}
object LocalSynchronousCommit extends AsyncCommitMode {
override val setting: String = "LOCAL"
}
}

View File

@ -65,7 +65,7 @@ private[platform] class FlywayMigrations(jdbcUrl: String)(implicit loggingContex
maxPoolSize = 2,
connectionTimeout = 5.seconds,
metrics = None,
connectionAsyncCommit = false,
connectionAsyncCommitMode = DbType.SynchronousCommit,
)
}

View File

@ -13,6 +13,7 @@ import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.{DatabaseMetrics, Metrics}
import com.daml.platform.configuration.ServerRole
import com.daml.platform.store.DbType
import com.google.common.util.concurrent.ThreadFactoryBuilder
import scala.concurrent.{ExecutionContext, Future}
@ -87,7 +88,7 @@ private[platform] object DbDispatcher {
connectionPoolSize: Int,
connectionTimeout: FiniteDuration,
metrics: Metrics,
connectionAsyncCommit: Boolean,
connectionAsyncCommitMode: DbType.AsyncCommitMode,
)(implicit loggingContext: LoggingContext): ResourceOwner[DbDispatcher] =
for {
connectionProvider <- HikariJdbcConnectionProvider.owner(
@ -96,7 +97,7 @@ private[platform] object DbDispatcher {
connectionPoolSize,
connectionTimeout,
metrics.registry,
connectionAsyncCommit,
connectionAsyncCommitMode,
)
threadPoolName = s"daml.index.db.threadpool.connection.${serverRole.threadPoolSuffix}"
executor <- ResourceOwner.forExecutorService(() =>

View File

@ -31,7 +31,7 @@ private[platform] final class HikariConnection(
metrics: Option[MetricRegistry],
connectionPoolPrefix: String,
maxInitialConnectRetryAttempts: Int,
connectionAsyncCommit: Boolean,
connectionAsyncCommitMode: DbType.AsyncCommitMode,
)(implicit loggingContext: LoggingContext)
extends ResourceOwner[HikariDataSource] {
@ -74,14 +74,15 @@ private[platform] final class HikariConnection(
}
private def configureAsyncCommit(config: HikariConfig, dbType: DbType): Unit =
if (connectionAsyncCommit && dbType.supportsAsynchronousCommits) {
logger.info("Creating Hikari connections with asynchronous commit enabled")
config.setConnectionInitSql("SET synchronous_commit=OFF")
} else if (dbType.supportsAsynchronousCommits) {
logger.info("Creating Hikari connections with asynchronous commit disabled")
config.setConnectionInitSql("SET synchronous_commit=ON")
} else if (connectionAsyncCommit) {
logger.warn(s"Asynchronous commits are not compatible with ${dbType.name} database backend")
if (dbType.supportsAsynchronousCommits) {
logger.info(
s"Creating Hikari connections with synchronous commit ${connectionAsyncCommitMode.setting}"
)
config.setConnectionInitSql(s"SET synchronous_commit=${connectionAsyncCommitMode.setting}")
} else if (connectionAsyncCommitMode != DbType.SynchronousCommit) {
logger.warn(
s"Asynchronous commit setting ${connectionAsyncCommitMode.setting} is not compatible with ${dbType.name} database backend"
)
}
}
@ -96,10 +97,8 @@ private[platform] object HikariConnection {
maxPoolSize: Int,
connectionTimeout: FiniteDuration,
metrics: Option[MetricRegistry],
connectionAsyncCommit: Boolean,
)(implicit
loggingContext: LoggingContext
): HikariConnection =
connectionAsyncCommitMode: DbType.AsyncCommitMode,
)(implicit loggingContext: LoggingContext): HikariConnection =
new HikariConnection(
serverRole,
jdbcUrl,
@ -109,7 +108,7 @@ private[platform] object HikariConnection {
metrics,
ConnectionPoolPrefix,
MaxInitialConnectRetryAttempts,
connectionAsyncCommit,
connectionAsyncCommitMode,
)
}
@ -176,7 +175,7 @@ private[platform] object HikariJdbcConnectionProvider {
maxConnections: Int,
connectionTimeout: FiniteDuration,
metrics: MetricRegistry,
connectionAsyncCommit: Boolean = false,
connectionAsyncCommitMode: DbType.AsyncCommitMode = DbType.SynchronousCommit,
)(implicit loggingContext: LoggingContext): ResourceOwner[HikariJdbcConnectionProvider] =
for {
// these connections should never time out as we have the same number of threads as connections
@ -187,7 +186,7 @@ private[platform] object HikariJdbcConnectionProvider {
maxConnections,
connectionTimeout,
Some(metrics),
connectionAsyncCommit,
connectionAsyncCommitMode,
)
healthPoller <- ResourceOwner.forTimer(() =>
new Timer(s"${classOf[HikariJdbcConnectionProvider].getName}#healthPoller")

View File

@ -1027,7 +1027,6 @@ private[platform] object JdbcLedgerDao {
servicesExecutionContext,
metrics,
lfValueTranslationCache,
jdbcAsyncCommits = false,
idempotentEventInserts = false,
enricher = enricher,
).map(new MeteredLedgerReadDao(_, metrics))
@ -1041,7 +1040,7 @@ private[platform] object JdbcLedgerDao {
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
jdbcAsyncCommits: Boolean,
jdbcAsyncCommitMode: DbType.AsyncCommitMode,
enricher: Option[ValueEnricher],
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] = {
val dbType = DbType.jdbcType(jdbcUrl)
@ -1054,7 +1053,8 @@ private[platform] object JdbcLedgerDao {
servicesExecutionContext,
metrics,
lfValueTranslationCache,
jdbcAsyncCommits = jdbcAsyncCommits && dbType.supportsAsynchronousCommits,
jdbcAsyncCommitMode =
if (dbType.supportsAsynchronousCommits) jdbcAsyncCommitMode else DbType.SynchronousCommit,
idempotentEventInserts = dbType == DbType.Postgres,
enricher = enricher,
).map(new MeteredLedgerDao(_, metrics))
@ -1082,7 +1082,6 @@ private[platform] object JdbcLedgerDao {
metrics,
lfValueTranslationCache,
validatePartyAllocation,
jdbcAsyncCommits = false,
idempotentEventInserts = false,
enricher = enricher,
).map(new MeteredLedgerDao(_, metrics))
@ -1122,7 +1121,7 @@ private[platform] object JdbcLedgerDao {
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
validatePartyAllocation: Boolean = false,
jdbcAsyncCommits: Boolean,
jdbcAsyncCommitMode: DbType.AsyncCommitMode = DbType.SynchronousCommit,
idempotentEventInserts: Boolean,
enricher: Option[ValueEnricher],
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] =
@ -1133,7 +1132,7 @@ private[platform] object JdbcLedgerDao {
connectionPoolSize,
250.millis,
metrics,
jdbcAsyncCommits,
jdbcAsyncCommitMode,
)
} yield new JdbcLedgerDao(
connectionPoolSize,

View File

@ -51,7 +51,7 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
servicesExecutionContext = executionContext,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = LfValueTranslation.Cache.none,
jdbcAsyncCommits = true,
jdbcAsyncCommitMode = DbType.AsynchronousCommit,
enricher = Some(new ValueEnricher(new Engine())),
)

View File

@ -33,6 +33,7 @@ import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.platform.testing.LogCollector
import com.daml.testing.postgresql.PostgresAroundEach
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
import org.scalatest.Assertion
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
@ -110,18 +111,15 @@ final class JdbcIndexerSpec
}
it should "use asynchronous commits with PostgreSQL" in {
val participantId = "the-participant"
for {
indexer <- initializeIndexer(participantId)
_ = LogCollector.clear[this.type]
_ <- runAndShutdown(indexer)
} yield {
val hikariDataSourceLogs =
LogCollector.read[this.type]("com.daml.platform.store.dao.HikariConnection")
hikariDataSourceLogs should contain(
Level.INFO -> "Creating Hikari connections with asynchronous commit enabled"
)
}
asyncCommitTest(DbType.AsynchronousCommit)
}
it should "use local synchronous commits with PostgreSQL when configured to do so" in {
asyncCommitTest(DbType.LocalSynchronousCommit)
}
it should "use synchronous commits with PostgreSQL when configured to do so" in {
asyncCommitTest(DbType.SynchronousCommit)
}
/** This test is agnostic of the PostgreSQL LedgerDao and can be factored out */
@ -155,6 +153,21 @@ final class JdbcIndexerSpec
.map(_ => captureBuffer should contain theSameElementsInOrderAs expected)
}
private def asyncCommitTest(mode: DbType.AsyncCommitMode): Future[Assertion] = {
val participantId = "the-participant"
for {
indexer <- initializeIndexer(participantId, jdbcAsyncCommitMode = mode)
_ = LogCollector.clear[this.type]
_ <- runAndShutdown(indexer)
} yield {
val hikariDataSourceLogs =
LogCollector.read[this.type]("com.daml.platform.store.dao.HikariConnection")
hikariDataSourceLogs should contain(
Level.INFO -> s"Creating Hikari connections with synchronous commit ${mode.setting}"
)
}
}
private def runAndShutdown[A](owner: ResourceOwner[A]): Future[Unit] =
owner.use(_ => Future.unit)
@ -164,6 +177,7 @@ final class JdbcIndexerSpec
private def initializeIndexer(
participantId: String,
mockFlow: Flow[OffsetUpdate, Unit, NotUsed] = noOpFlow,
jdbcAsyncCommitMode: DbType.AsyncCommitMode = DbType.AsynchronousCommit,
): Future[ResourceOwner[JdbcIndexer]] = {
val config = IndexerConfig(
participantId = v1.ParticipantId.assertFromString(participantId),
@ -179,7 +193,7 @@ final class JdbcIndexerSpec
materializer.executionContext,
metrics,
LfValueTranslation.Cache.none,
jdbcAsyncCommits = true,
jdbcAsyncCommitMode = jdbcAsyncCommitMode,
enricher = None,
)
new indexer.JdbcIndexer.Factory(

View File

@ -6,6 +6,7 @@ package com.daml.platform.store.dao
import com.daml.ledger.resources.TestResourceContext
import com.daml.logging.LoggingContext
import com.daml.platform.configuration.ServerRole
import com.daml.platform.store.DbType
import com.daml.testing.postgresql.PostgresAroundAll
import org.mockito.MockitoSugar
import org.scalatest.flatspec.AsyncFlatSpec
@ -45,6 +46,7 @@ class HikariConnectionSpec
maxPoolSize = 2,
connectionTimeout = 5.seconds,
metrics = None,
connectionAsyncCommit = asyncCommitEnabled,
connectionAsyncCommitMode =
if (asyncCommitEnabled) DbType.AsynchronousCommit else DbType.SynchronousCommit,
)(LoggingContext.ForTesting)
}

View File

@ -26,6 +26,7 @@ import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
import com.daml.platform.indexer.RecoveringIndexerIntegrationSpec._
import com.daml.platform.store.DbType
import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerDao}
import com.daml.platform.testing.LogCollector
@ -227,7 +228,7 @@ class RecoveringIndexerIntegrationSpec
servicesExecutionContext = executionContext,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = LfValueTranslation.Cache.none,
jdbcAsyncCommits = true,
jdbcAsyncCommitMode = DbType.AsynchronousCommit,
enricher = None,
)
}

View File

@ -224,7 +224,7 @@ final class SqlLedgerSpec
val hikariDataSourceLogs =
LogCollector.read[this.type]("com.daml.platform.store.dao.HikariConnection")
hikariDataSourceLogs should contain(
Level.INFO -> "Creating Hikari connections with asynchronous commit disabled"
Level.INFO -> "Creating Hikari connections with synchronous commit ON"
)
}
}