Sandbox: Make JdbcLedgerDao create its own execution context. (#5091)

* sandbox: Don't let just anyone construct a DbDispatcher.

It's the job of the JdbcLedgerDao.

* sandbox: Clean up the DbDispatcher a little.

* sandbox: JdbcLedgerDao now creates its own execution context.

CHANGELOG_BEGIN
CHANGELOG_END

* sandbox: Make `defaultNumberOfShortLivedConnections` private.
This commit is contained in:
Samir Talwar 2020-03-20 16:30:35 +01:00 committed by GitHub
parent 18033271de
commit 1ffd23a6a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 79 additions and 89 deletions

View File

@ -32,7 +32,7 @@ import org.scalatest.{AsyncWordSpec, BeforeAndAfterEach, Matchers}
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.{Await, Future}
import scala.util.Try
class RecoveringIndexerIntegrationSpec extends AsyncWordSpec with Matchers with BeforeAndAfterEach {
@ -221,7 +221,7 @@ class RecoveringIndexerIntegrationSpec extends AsyncWordSpec with Matchers with
private def index(implicit logCtx: LoggingContext): ResourceOwner[LedgerDao] = {
val jdbcUrl =
s"jdbc:h2:mem:${getClass.getSimpleName.toLowerCase()}-$testId;db_close_delay=-1;db_close_on_exit=false"
JdbcLedgerDao.owner(jdbcUrl, new MetricRegistry, ExecutionContext.global)
JdbcLedgerDao.writeOwner(jdbcUrl, new MetricRegistry)
}
}

View File

@ -16,13 +16,8 @@ import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.ledger.api.health.HealthStatus
import com.digitalasset.logging.{ContextualizedLogger, LoggingContext}
import com.digitalasset.platform.common.LedgerIdMismatchException
import com.digitalasset.platform.store.dao.{
DbDispatcher,
JdbcLedgerDao,
LedgerReadDao,
MeteredLedgerReadDao
}
import com.digitalasset.platform.store.{BaseLedger, DbType, ReadOnlyLedger}
import com.digitalasset.platform.store.dao.{JdbcLedgerDao, LedgerReadDao}
import com.digitalasset.platform.store.{BaseLedger, ReadOnlyLedger}
import com.digitalasset.resources.ProgramResource.StartupException
import com.digitalasset.resources.ResourceOwner
@ -31,25 +26,17 @@ import scala.concurrent.{Await, ExecutionContext, Future}
object ReadOnlySqlLedger {
val maxConnections = 16
//jdbcUrl must have the user/password encoded in form of: "jdbc:postgresql://localhost/test?user=fred&password=secret"
def owner(
jdbcUrl: String,
ledgerId: LedgerId,
metrics: MetricRegistry,
)(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[ReadOnlyLedger] = {
val dbType = DbType.jdbcType(jdbcUrl)
)(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[ReadOnlyLedger] =
for {
dbDispatcher <- DbDispatcher.owner(jdbcUrl, maxConnections, metrics)
ledgerReadDao = new MeteredLedgerReadDao(
JdbcLedgerDao(dbDispatcher, dbType, mat.executionContext),
metrics,
)
ledgerReadDao <- JdbcLedgerDao.readOwner(jdbcUrl, metrics)
factory = new Factory(ledgerReadDao)
ledger <- ResourceOwner.forFutureCloseable(() => factory.createReadOnlySqlLedger(ledgerId))
} yield ledger
}
private class Factory(ledgerDao: LedgerReadDao)(implicit logCtx: LoggingContext) {

View File

@ -61,13 +61,10 @@ final class JdbcIndexerFactory(
): ResourceOwner[JdbcIndexer] =
for {
materializer <- AkkaResourceOwner.forMaterializer(() => Materializer(actorSystem))
ledgerDao <- JdbcLedgerDao.owner(jdbcUrl, metrics, actorSystem.dispatcher)
ledgerDao <- JdbcLedgerDao.writeOwner(jdbcUrl, metrics)
initialLedgerEnd <- ResourceOwner.forFuture(() =>
initializeLedger(ledgerDao)(materializer, executionContext))
} yield {
implicit val mat: Materializer = materializer
new JdbcIndexer(initialLedgerEnd, participantId, ledgerDao, metrics)
}
} yield new JdbcIndexer(initialLedgerEnd, participantId, ledgerDao, metrics)(materializer)
private def initializeLedger(dao: LedgerDao)(
implicit materializer: Materializer,

View File

@ -28,15 +28,9 @@ import com.digitalasset.platform.sandbox.LedgerIdGenerator
import com.digitalasset.platform.sandbox.stores.InMemoryActiveLedgerState
import com.digitalasset.platform.sandbox.stores.ledger.{Ledger, SandboxOffset}
import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryOrBump
import com.digitalasset.platform.store.dao.JdbcLedgerDao.defaultNumberOfShortLivedConnections
import com.digitalasset.platform.store.dao.{
DbDispatcher,
JdbcLedgerDao,
LedgerDao,
MeteredLedgerDao
}
import com.digitalasset.platform.store.dao.{JdbcLedgerDao, LedgerDao}
import com.digitalasset.platform.store.entries.{LedgerEntry, PackageLedgerEntry, PartyLedgerEntry}
import com.digitalasset.platform.store.{BaseLedger, DbType, FlywayMigrations, PersistenceEntry}
import com.digitalasset.platform.store.{BaseLedger, FlywayMigrations, PersistenceEntry}
import com.digitalasset.resources.ProgramResource.StartupException
import com.digitalasset.resources.ResourceOwner
@ -63,19 +57,10 @@ object SqlLedger {
queueDepth: Int,
startMode: SqlStartMode = SqlStartMode.ContinueIfExists,
metrics: MetricRegistry,
)(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[Ledger] = {
implicit val ec: ExecutionContext = DEC
val dbType = DbType.jdbcType(jdbcUrl)
val maxConnections =
if (dbType.supportsParallelWrites) defaultNumberOfShortLivedConnections else 1
)(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[Ledger] =
for {
_ <- ResourceOwner.forFuture(() => new FlywayMigrations(jdbcUrl).migrate())
dbDispatcher <- DbDispatcher.owner(jdbcUrl, maxConnections, metrics)
ledgerDao = new MeteredLedgerDao(
JdbcLedgerDao(dbDispatcher, dbType, mat.executionContext),
metrics,
)
_ <- ResourceOwner.forFuture(() => new FlywayMigrations(jdbcUrl).migrate()(DEC))
ledgerDao <- JdbcLedgerDao.writeOwner(jdbcUrl, metrics)
ledger <- ResourceOwner.forFutureCloseable(
() =>
new SqlLedgerFactory(ledgerDao).createSqlLedger(
@ -87,13 +72,12 @@ object SqlLedger {
packages,
initialLedgerEntries,
queueDepth,
// we use `maxConnections` for the maximum batch size, since it doesn't make sense to
// try to persist more ledger entries concurrently than we have SQL executor threads and
// SQL connections available.
maxConnections,
// we use `maxConcurrentConnections` for the maximum batch size, since it doesn't make
// sense to try to persist more ledger entries concurrently than we have SQL executor
// threads and SQL connections available.
ledgerDao.maxConcurrentConnections,
))
} yield ledger
}
}
private final class SqlLedger(

View File

@ -4,7 +4,7 @@
package com.digitalasset.platform.store.dao
import java.sql.Connection
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import java.util.concurrent.{Executor, Executors, TimeUnit}
import com.codahale.metrics.{MetricRegistry, Timer}
import com.digitalasset.ledger.api.health.{HealthStatus, ReportsHealth}
@ -15,17 +15,17 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
final class DbDispatcher(
final class DbDispatcher private (
val maxConnections: Int,
connectionProvider: HikariJdbcConnectionProvider,
sqlExecutor: ExecutorService,
executor: Executor,
metrics: MetricRegistry,
)(implicit logCtx: LoggingContext)
extends ReportsHealth {
private val logger = ContextualizedLogger.get(this.getClass)
private val sqlExecution = ExecutionContext.fromExecutorService(sqlExecutor)
private val executionContext = ExecutionContext.fromExecutor(executor)
object Metrics {
val waitAllTimer: Timer = metrics.timer("daml.index.db.all.wait")
@ -81,30 +81,29 @@ final class DbDispatcher(
.error(s"$description: Got an exception while updating timer metrics. Ignoring.", t)
}
}
}(sqlExecution)
}(executionContext)
}
}
object DbDispatcher {
private val logger = ContextualizedLogger.get(this.getClass)
def owner(
jdbcUrl: String,
maxConnections: Int,
metrics: MetricRegistry,
)(implicit logCtx: LoggingContext): ResourceOwner[DbDispatcher] = {
)(implicit logCtx: LoggingContext): ResourceOwner[DbDispatcher] =
for {
connectionProvider <- HikariJdbcConnectionProvider.owner(jdbcUrl, maxConnections, metrics)
sqlExecutor <- ResourceOwner.forExecutorService(
executor <- ResourceOwner.forExecutorService(
() =>
Executors.newFixedThreadPool(
maxConnections,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("sql-executor-%d")
.setUncaughtExceptionHandler((_, e) =>
logger.error("Got an uncaught exception in SQL executor!", e))
logger.error("Uncaught exception in the SQL executor.", e))
.build()
))
} yield new DbDispatcher(maxConnections, connectionProvider, sqlExecutor, metrics)
}
} yield new DbDispatcher(maxConnections, connectionProvider, executor, metrics)
}

View File

@ -5,6 +5,7 @@ package com.digitalasset.platform.store.dao
import java.io.InputStream
import java.sql.Connection
import java.time.Instant
import java.util.concurrent.Executors
import java.util.{Date, UUID}
import akka.NotUsed
@ -46,9 +47,11 @@ import com.digitalasset.ledger.api.domain.{
import com.digitalasset.ledger.api.health.HealthStatus
import com.digitalasset.ledger.{ApplicationId, CommandId, EventId, WorkflowId}
import com.digitalasset.logging.{ContextualizedLogger, LoggingContext}
import com.digitalasset.platform.ApiOffset.ApiOffsetConverter
import com.digitalasset.platform.events.EventIdFormatter.split
import com.digitalasset.platform.store.Contract.{ActiveContract, DivulgedContract}
import com.digitalasset.platform.store.Conversions._
import com.digitalasset.platform.store._
import com.digitalasset.platform.store.dao.JdbcLedgerDao.{H2DatabaseQueries, PostgresQueries}
import com.digitalasset.platform.store.dao.events.{TransactionsReader, TransactionsWriter}
import com.digitalasset.platform.store.entries.LedgerEntry.Transaction
@ -64,8 +67,8 @@ import com.digitalasset.platform.store.serialization.{
TransactionSerializer,
ValueSerializer
}
import com.digitalasset.platform.store._
import com.digitalasset.resources.ResourceOwner
import com.google.common.util.concurrent.ThreadFactoryBuilder
import scalaz.syntax.tag._
import scala.collection.immutable
@ -73,8 +76,6 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import scala.util.control.NonFatal
import com.digitalasset.platform.ApiOffset.ApiOffsetConverter
private final case class ParsedEntry(
typ: String,
transactionId: Option[TransactionId],
@ -105,6 +106,7 @@ private final case class ParsedPackageData(
private final case class ParsedCommandData(deduplicateUntil: Instant)
private class JdbcLedgerDao(
override val maxConcurrentConnections: Int,
dbDispatcher: DbDispatcher,
contractSerializer: ContractSerializer,
transactionSerializer: TransactionSerializer,
@ -1753,34 +1755,49 @@ private class JdbcLedgerDao(
object JdbcLedgerDao {
val defaultNumberOfShortLivedConnections = 16
private val DefaultNumberOfShortLivedConnections = 16
def owner(
private val ThreadFactory = new ThreadFactoryBuilder().setNameFormat("dao-executor-%d").build()
def readOwner(
jdbcUrl: String,
metrics: MetricRegistry,
)(implicit logCtx: LoggingContext): ResourceOwner[LedgerReadDao] = {
val maxConnections = DefaultNumberOfShortLivedConnections
owner(jdbcUrl, maxConnections, metrics)
.map(new MeteredLedgerReadDao(_, metrics))
}
def writeOwner(
jdbcUrl: String,
metrics: MetricRegistry,
executionContext: ExecutionContext,
)(implicit logCtx: LoggingContext): ResourceOwner[LedgerDao] = {
val dbType = DbType.jdbcType(jdbcUrl)
val maxConnections =
if (dbType.supportsParallelWrites) defaultNumberOfShortLivedConnections else 1
for {
dbDispatcher <- DbDispatcher.owner(jdbcUrl, maxConnections, metrics)
} yield new MeteredLedgerDao(JdbcLedgerDao(dbDispatcher, dbType, executionContext), metrics)
if (dbType.supportsParallelWrites) DefaultNumberOfShortLivedConnections else 1
owner(jdbcUrl, maxConnections, metrics)
.map(new MeteredLedgerDao(_, metrics))
}
def apply(
dbDispatcher: DbDispatcher,
dbType: DbType,
executionContext: ExecutionContext,
)(implicit logCtx: LoggingContext): LedgerDao =
new JdbcLedgerDao(
dbDispatcher,
ContractSerializer,
TransactionSerializer,
KeyHasher,
dbType,
executionContext,
)
private def owner(
jdbcUrl: String,
maxConnections: Int,
metrics: MetricRegistry,
)(implicit logCtx: LoggingContext): ResourceOwner[LedgerDao] =
for {
dbDispatcher <- DbDispatcher.owner(jdbcUrl, maxConnections, metrics)
executor <- ResourceOwner.forExecutorService(() =>
Executors.newCachedThreadPool(ThreadFactory))
} yield
new JdbcLedgerDao(
maxConnections,
dbDispatcher,
ContractSerializer,
TransactionSerializer,
KeyHasher,
DbType.jdbcType(jdbcUrl),
ExecutionContext.fromExecutor(executor),
)
private val PARTY_SEPARATOR = '%'
@ -1788,8 +1805,11 @@ object JdbcLedgerDao {
// SQL statements using the proprietary Postgres on conflict .. do nothing clause
protected[JdbcLedgerDao] def SQL_INSERT_CONTRACT_DATA: String
protected[JdbcLedgerDao] def SQL_INSERT_PACKAGE: String
protected[JdbcLedgerDao] def SQL_IMPLICITLY_INSERT_PARTIES: String
protected[JdbcLedgerDao] def SQL_INSERT_COMMAND: String
protected[JdbcLedgerDao] def SQL_SELECT_ACTIVE_CONTRACTS: String

View File

@ -32,13 +32,15 @@ import scala.concurrent.Future
trait LedgerReadDao extends ReportsHealth {
def maxConcurrentConnections: Int
/** Looks up the ledger id */
def lookupLedgerId(): Future[Option[LedgerId]]
/** Looks up the current ledger end */
def lookupLedgerEnd(): Future[Offset]
/** Looks up the current external ledger end offset*/
/** Looks up the current external ledger end offset */
def lookupInitialLedgerEnd(): Future[Option[Offset]]
/** Looks up an active or divulged contract if it is visible for the given party. Archived contracts must not be returned by this method */
@ -167,10 +169,12 @@ trait LedgerReadDao extends ReportsHealth {
trait LedgerWriteDao extends ReportsHealth {
def maxConcurrentConnections: Int
/**
* Initializes the ledger. Must be called only once.
*
* @param ledgerId the ledger id to be stored
* @param ledgerId the ledger id to be stored
*/
def initializeLedger(ledgerId: LedgerId, ledgerEnd: Offset): Future[Unit]

View File

@ -52,6 +52,8 @@ class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics: MetricRegistry)
metrics.timer("daml.index.db.remove_expired_deduplication_data")
}
override def maxConcurrentConnections: Int = ledgerDao.maxConcurrentConnections
override def currentHealth(): HealthStatus = ledgerDao.currentHealth()
override def lookupLedgerId(): Future[Option[LedgerId]] =

View File

@ -31,10 +31,7 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll { this: Su
resource = newLoggingContext { implicit logCtx =>
for {
_ <- Resource.fromFuture(new FlywayMigrations(jdbcUrl).migrate())
dbDispatcher <- DbDispatcher
.owner(jdbcUrl, 4, new MetricRegistry)
.acquire()
dao = JdbcLedgerDao(dbDispatcher, dbType, executionContext)
dao <- JdbcLedgerDao.writeOwner(jdbcUrl, new MetricRegistry).acquire()
_ <- Resource.fromFuture(dao.initializeLedger(LedgerId("test-ledger"), Offset.begin))
} yield dao
}