Refactoring store factories [DPP-709] (#11572)

* Pulling up DbDispatcher factory form JdbcLedgerDao
* Adapt usage
* Moving ReadStorageBackend factory
* Minor SQL fixing (parameter join remnants)

changelog_begin
changelog_end
This commit is contained in:
Marton Nagy 2021-11-08 22:53:32 +01:00 committed by GitHub
parent 41fb2890d6
commit 5dee88b038
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 358 additions and 294 deletions

View File

@ -24,12 +24,14 @@ import com.daml.platform.common.{LedgerIdNotFoundException, MismatchException}
import com.daml.platform.configuration.ServerRole
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.appendonlydao.{
DbDispatcher,
JdbcLedgerDao,
LedgerDaoTransactionsReader,
LedgerReadDao,
}
import com.daml.platform.store.backend.StorageBackendFactory
import com.daml.platform.store.cache.{LedgerEndCache, MutableLedgerEndCache}
import com.daml.platform.store.{BaseLedger, LfValueTranslationCache}
import com.daml.platform.store.{BaseLedger, DbType, LfValueTranslationCache}
import com.daml.resources.ProgramResource.StartupException
import com.daml.timer.RetryStrategy
@ -65,9 +67,25 @@ private[platform] object ReadOnlySqlLedger {
override def acquire()(implicit context: ResourceContext): Resource[ReadOnlySqlLedger] = {
val ledgerEndCache = MutableLedgerEndCache()
val storageBackendFactory = StorageBackendFactory.of(DbType.jdbcType(jdbcUrl))
for {
ledgerDao <- ledgerDaoOwner(servicesExecutionContext, errorFactories, ledgerEndCache)
dbDispatcher <- DbDispatcher
.owner(
dataSource =
storageBackendFactory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
serverRole = serverRole,
connectionPoolSize = databaseConnectionPoolSize,
connectionTimeout = databaseConnectionTimeout,
metrics = metrics,
)
.acquire()
ledgerDao = ledgerDaoOwner(
servicesExecutionContext,
errorFactories,
ledgerEndCache,
dbDispatcher,
storageBackendFactory,
)
ledgerId <- Resource.fromFuture(verifyLedgerId(ledgerDao, initialLedgerId))
ledger <- ledgerOwner(ledgerDao, ledgerId, ledgerEndCache).acquire()
} yield ledger
@ -144,21 +162,21 @@ private[platform] object ReadOnlySqlLedger {
servicesExecutionContext: ExecutionContext,
errorFactories: ErrorFactories,
ledgerEndCache: LedgerEndCache,
): ResourceOwner[LedgerReadDao] =
JdbcLedgerDao.readOwner(
serverRole,
jdbcUrl,
databaseConnectionPoolSize,
databaseConnectionTimeout,
eventsPageSize,
eventsProcessingParallelism,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
Some(enricher),
participantId,
errorFactories,
ledgerEndCache,
dbDispatcher: DbDispatcher,
storageBackendFactory: StorageBackendFactory,
): LedgerReadDao =
JdbcLedgerDao.read(
dbDispatcher = dbDispatcher,
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
enricher = Some(enricher),
participantId = participantId,
storageBackendFactory = storageBackendFactory,
ledgerEndCache = ledgerEndCache,
errorFactories = errorFactories,
)
}

View File

@ -13,7 +13,7 @@ private[platform] sealed abstract class DbType(
if (supportsParallelWrites) maxConnections else 1
}
private[platform] object DbType {
object DbType {
object Postgres
extends DbType(
"postgres",

View File

@ -14,6 +14,8 @@ import com.daml.metrics.Metrics
import com.daml.platform.ApiOffset
import com.daml.platform.configuration.ServerRole
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.appendonlydao.{DbDispatcher, JdbcLedgerDao}
import com.daml.platform.store.backend.StorageBackendFactory
import com.daml.platform.store.cache.MutableLedgerEndCache
import scalaz.Tag
@ -44,24 +46,33 @@ object IndexMetadata {
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
) =
com.daml.platform.store.appendonlydao.JdbcLedgerDao.readOwner(
serverRole = ServerRole.ReadIndexMetadata,
jdbcUrl = jdbcUrl,
connectionPoolSize = 1,
connectionTimeout = 250.millis,
eventsPageSize = 1000,
eventsProcessingParallelism = 8,
servicesExecutionContext = executionContext,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
enricher = None,
// No participant ID is available for the dump index meta path,
// and this property is not needed for the used ReadDao.
participantId = Ref.ParticipantId.assertFromString("1"),
errorFactories = errorFactories,
ledgerEndCache = MutableLedgerEndCache(), // not used
)
) = {
val storageBackendFactory = StorageBackendFactory.of(DbType.jdbcType(jdbcUrl))
val metrics = new Metrics(new MetricRegistry)
DbDispatcher
.owner(
dataSource = storageBackendFactory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
serverRole = ServerRole.ReadIndexMetadata,
connectionPoolSize = 1,
connectionTimeout = 250.millis,
metrics = metrics,
)
.map(dbDispatcher =>
JdbcLedgerDao.read(
dbDispatcher = dbDispatcher,
eventsPageSize = 1000,
eventsProcessingParallelism = 8,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
enricher = None,
participantId = Ref.ParticipantId.assertFromString("1"),
storageBackendFactory = storageBackendFactory,
ledgerEndCache = MutableLedgerEndCache(), // not used
errorFactories = errorFactories,
)
)
}
private val Empty = "<empty>"

View File

@ -88,7 +88,7 @@ private[platform] final class DbDispatcher private (
}
}
private[platform] object DbDispatcher {
object DbDispatcher {
private val logger = ContextualizedLogger.get(this.getClass)
def owner(

View File

@ -20,7 +20,6 @@ import com.daml.ledger.participant.state.index.v2.{
PackageDetails,
}
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.ledger.resources.ResourceOwner
import com.daml.lf.archive.ArchiveParser
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
@ -30,7 +29,6 @@ import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.logging.entries.LoggingEntry
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.{Metrics, Timed}
import com.daml.platform.configuration.ServerRole
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.Conversions._
import com.daml.platform.store._
@ -41,9 +39,8 @@ import com.daml.platform.store.backend.{
ReadStorageBackend,
ResetStorageBackend,
StorageBackendFactory,
UpdateToDbDto,
}
import com.daml.platform.store.cache.{LedgerEndCache, MutableLedgerEndCache}
import com.daml.platform.store.cache.LedgerEndCache
import com.daml.platform.store.entries.{
ConfigurationEntry,
LedgerEntry,
@ -51,7 +48,6 @@ import com.daml.platform.store.entries.{
PartyLedgerEntry,
}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
@ -759,11 +755,8 @@ private[platform] object JdbcLedgerDao {
"transactionId" -> id
}
def readOwner(
serverRole: ServerRole,
jdbcUrl: String,
connectionPoolSize: Int,
connectionTimeout: FiniteDuration,
def read(
dbDispatcher: DbDispatcher,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
servicesExecutionContext: ExecutionContext,
@ -772,186 +765,103 @@ private[platform] object JdbcLedgerDao {
enricher: Option[ValueEnricher],
participantId: Ref.ParticipantId,
errorFactories: ErrorFactories,
storageBackendFactory: StorageBackendFactory,
ledgerEndCache: LedgerEndCache,
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerReadDao] =
owner(
serverRole,
jdbcUrl,
connectionPoolSize,
connectionTimeout,
eventsPageSize,
eventsProcessingParallelism,
validate = false,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
enricher = enricher,
participantId = participantId,
sequentialWriteDao = NoopSequentialWriteDao,
errorFactories = errorFactories,
ledgerEndCache = ledgerEndCache,
).map(new MeteredLedgerReadDao(_, metrics))
def writeOwner(
serverRole: ServerRole,
jdbcUrl: String,
connectionPoolSize: Int,
connectionTimeout: FiniteDuration,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
enricher: Option[ValueEnricher],
participantId: Ref.ParticipantId,
ledgerEndCache: MutableLedgerEndCache,
errorFactories: ErrorFactories,
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] = {
val dbType = DbType.jdbcType(jdbcUrl)
owner(
serverRole,
jdbcUrl,
dbType.maxSupportedWriteConnections(connectionPoolSize),
connectionTimeout,
eventsPageSize,
eventsProcessingParallelism,
validate = false,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
enricher = enricher,
participantId = participantId,
sequentialWriteDao = sequentialWriteDao(
participantId,
lfValueTranslationCache,
): LedgerReadDao =
new MeteredLedgerReadDao(
new JdbcLedgerDao(
dbDispatcher,
servicesExecutionContext,
eventsPageSize,
eventsProcessingParallelism,
false,
metrics,
CompressionStrategy.none(metrics),
DbType.jdbcType(jdbcUrl),
ledgerEndCache,
),
errorFactories = errorFactories,
ledgerEndCache = ledgerEndCache,
).map(new MeteredLedgerDao(_, metrics))
}
def validatingWriteOwner(
serverRole: ServerRole,
jdbcUrl: String,
connectionPoolSize: Int,
connectionTimeout: FiniteDuration,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
validatePartyAllocation: Boolean = false,
enricher: Option[ValueEnricher],
participantId: Ref.ParticipantId,
compressionStrategy: CompressionStrategy,
ledgerEndCache: MutableLedgerEndCache,
errorFactories: ErrorFactories,
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] = {
val dbType = DbType.jdbcType(jdbcUrl)
owner(
serverRole,
jdbcUrl,
dbType.maxSupportedWriteConnections(connectionPoolSize),
connectionTimeout,
eventsPageSize,
eventsProcessingParallelism,
validate = true,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
validatePartyAllocation,
enricher = enricher,
participantId = participantId,
sequentialWriteDao = sequentialWriteDao(
participantId,
lfValueTranslationCache,
metrics,
compressionStrategy,
dbType,
ledgerEndCache,
false,
enricher,
SequentialWriteDao.noop,
participantId,
storageBackendFactory.readStorageBackend(ledgerEndCache),
storageBackendFactory.createParameterStorageBackend,
storageBackendFactory.createDeduplicationStorageBackend,
storageBackendFactory.createResetStorageBackend,
errorFactories,
),
errorFactories = errorFactories,
ledgerEndCache = ledgerEndCache,
).map(new MeteredLedgerDao(_, metrics))
}
private def sequentialWriteDao(
participantId: Ref.ParticipantId,
lfValueTranslationCache: LfValueTranslationCache.Cache,
metrics: Metrics,
compressionStrategy: CompressionStrategy,
dbType: DbType,
ledgerEndCache: MutableLedgerEndCache,
): SequentialWriteDao = {
val factory = StorageBackendFactory.of(dbType)
SequentialWriteDaoImpl(
ingestionStorageBackend = factory.createIngestionStorageBackend,
parameterStorageBackend = factory.createParameterStorageBackend,
updateToDbDtos = UpdateToDbDto(
participantId = participantId,
translation = new LfValueTranslation(
cache = lfValueTranslationCache,
metrics = metrics,
enricherO = None,
loadPackage = (_, _) => Future.successful(None),
),
compressionStrategy = compressionStrategy,
),
ledgerEndCache = ledgerEndCache,
metrics,
)
}
private def owner(
serverRole: ServerRole,
jdbcUrl: String,
connectionPoolSize: Int,
connectionTimeout: FiniteDuration,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
validate: Boolean,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
validatePartyAllocation: Boolean = false,
enricher: Option[ValueEnricher],
participantId: Ref.ParticipantId,
def write(
dbDispatcher: DbDispatcher,
sequentialWriteDao: SequentialWriteDao,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
enricher: Option[ValueEnricher],
participantId: Ref.ParticipantId,
errorFactories: ErrorFactories,
storageBackendFactory: StorageBackendFactory,
ledgerEndCache: LedgerEndCache,
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] = {
val dbType = DbType.jdbcType(jdbcUrl)
val factory = StorageBackendFactory.of(dbType)
for {
dbDispatcher <- DbDispatcher.owner(
factory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
serverRole,
connectionPoolSize,
connectionTimeout,
): LedgerDao =
new MeteredLedgerDao(
new JdbcLedgerDao(
dbDispatcher,
servicesExecutionContext,
eventsPageSize,
eventsProcessingParallelism,
false,
metrics,
)
} yield new JdbcLedgerDao(
dbDispatcher,
servicesExecutionContext,
eventsPageSize,
eventsProcessingParallelism,
validate,
lfValueTranslationCache,
false,
enricher,
sequentialWriteDao,
participantId,
storageBackendFactory.readStorageBackend(ledgerEndCache),
storageBackendFactory.createParameterStorageBackend,
storageBackendFactory.createDeduplicationStorageBackend,
storageBackendFactory.createResetStorageBackend,
errorFactories,
),
metrics,
)
def validatingWrite(
dbDispatcher: DbDispatcher,
sequentialWriteDao: SequentialWriteDao,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
validatePartyAllocation: Boolean = false,
enricher: Option[ValueEnricher],
participantId: Ref.ParticipantId,
errorFactories: ErrorFactories,
storageBackendFactory: StorageBackendFactory,
ledgerEndCache: LedgerEndCache,
): LedgerDao =
new MeteredLedgerDao(
new JdbcLedgerDao(
dbDispatcher,
servicesExecutionContext,
eventsPageSize,
eventsProcessingParallelism,
true,
metrics,
lfValueTranslationCache,
validatePartyAllocation,
enricher,
sequentialWriteDao,
participantId,
storageBackendFactory.readStorageBackend(ledgerEndCache),
storageBackendFactory.createParameterStorageBackend,
storageBackendFactory.createDeduplicationStorageBackend,
storageBackendFactory.createResetStorageBackend,
errorFactories,
),
metrics,
lfValueTranslationCache,
validatePartyAllocation,
enricher,
sequentialWriteDao,
participantId,
StorageBackendFactory.readStorageBackendFor(dbType, ledgerEndCache),
factory.createParameterStorageBackend,
factory.createDeduplicationStorageBackend,
factory.createResetStorageBackend,
errorFactories,
)
}
val acceptType = "accept"
val rejectType = "reject"

View File

@ -8,21 +8,60 @@ import java.sql.Connection
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v2.Update
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.platform.store.backend.{DbDto, IngestionStorageBackend, ParameterStorageBackend}
import com.daml.lf.data.Ref
import com.daml.metrics.Metrics
import com.daml.platform.store.LfValueTranslationCache
import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValueTranslation}
import com.daml.platform.store.backend.{
DbDto,
IngestionStorageBackend,
ParameterStorageBackend,
UpdateToDbDto,
}
import com.daml.platform.store.cache.MutableLedgerEndCache
import scala.concurrent.Future
import scala.util.chaining.scalaUtilChainingOps
trait SequentialWriteDao {
def store(connection: Connection, offset: Offset, update: Option[state.Update]): Unit
}
object NoopSequentialWriteDao extends SequentialWriteDao {
object SequentialWriteDao {
def apply(
participantId: Ref.ParticipantId,
lfValueTranslationCache: LfValueTranslationCache.Cache,
metrics: Metrics,
compressionStrategy: CompressionStrategy,
ledgerEndCache: MutableLedgerEndCache,
ingestionStorageBackend: IngestionStorageBackend[_],
parameterStorageBackend: ParameterStorageBackend,
): SequentialWriteDao =
SequentialWriteDaoImpl(
ingestionStorageBackend = ingestionStorageBackend,
parameterStorageBackend = parameterStorageBackend,
updateToDbDtos = UpdateToDbDto(
participantId = participantId,
translation = new LfValueTranslation(
cache = lfValueTranslationCache,
metrics = metrics,
enricherO = None,
loadPackage = (_, _) => Future.successful(None),
),
compressionStrategy = compressionStrategy,
),
ledgerEndCache = ledgerEndCache,
)
val noop: SequentialWriteDao = NoopSequentialWriteDao
}
private[appendonlydao] object NoopSequentialWriteDao extends SequentialWriteDao {
override def store(connection: Connection, offset: Offset, update: Option[Update]): Unit =
throw new UnsupportedOperationException
}
case class SequentialWriteDaoImpl[DB_BATCH](
private[appendonlydao] case class SequentialWriteDaoImpl[DB_BATCH](
ingestionStorageBackend: IngestionStorageBackend[DB_BATCH],
parameterStorageBackend: ParameterStorageBackend,
updateToDbDtos: Offset => state.Update => Iterator[DbDto],

View File

@ -24,6 +24,16 @@ trait StorageBackendFactory {
def createIntegrityStorageBackend: IntegrityStorageBackend
def createResetStorageBackend: ResetStorageBackend
def createStringInterningStorageBackend: StringInterningStorageBackend
final def readStorageBackend(ledgerEndCache: LedgerEndCache): ReadStorageBackend =
ReadStorageBackend(
configurationStorageBackend = createConfigurationStorageBackend(ledgerEndCache),
partyStorageBackend = createPartyStorageBackend(ledgerEndCache),
packageStorageBackend = createPackageStorageBackend(ledgerEndCache),
completionStorageBackend = createCompletionStorageBackend,
contractStorageBackend = createContractStorageBackend(ledgerEndCache),
eventStorageBackend = createEventStorageBackend(ledgerEndCache),
)
}
object StorageBackendFactory {
@ -33,21 +43,6 @@ object StorageBackendFactory {
case DbType.Postgres => PostgresStorageBackendFactory
case DbType.Oracle => OracleStorageBackendFactory
}
def readStorageBackendFor(
dbType: DbType,
ledgerEndCache: LedgerEndCache,
): ReadStorageBackend = {
val factory = of(dbType)
ReadStorageBackend(
configurationStorageBackend = factory.createConfigurationStorageBackend(ledgerEndCache),
partyStorageBackend = factory.createPartyStorageBackend(ledgerEndCache),
packageStorageBackend = factory.createPackageStorageBackend(ledgerEndCache),
completionStorageBackend = factory.createCompletionStorageBackend,
contractStorageBackend = factory.createContractStorageBackend(ledgerEndCache),
eventStorageBackend = factory.createEventStorageBackend(ledgerEndCache),
)
}
}
case class ReadStorageBackend(

View File

@ -24,7 +24,7 @@ private[backend] class PackageStorageBackendTemplate(ledgerEndCache: LedgerEndCa
private val SQL_SELECT_PACKAGES =
SQL(
"""select packages.package_id, packages.source_description, packages.known_since, packages.package_size
|from packages, parameters
|from packages
|where packages.ledger_offset <= {ledgerEndOffset}
|""".stripMargin
)
@ -59,7 +59,7 @@ private[backend] class PackageStorageBackendTemplate(ledgerEndCache: LedgerEndCa
private val SQL_SELECT_PACKAGE =
SQL("""select packages.package
|from packages, parameters
|from packages
|where package_id = {package_id}
|and packages.ledger_offset <= {ledgerEndOffset}
|""".stripMargin)

View File

@ -14,7 +14,14 @@ import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.appendonlydao.LedgerDao
import com.daml.platform.store.appendonlydao.{
DbDispatcher,
JdbcLedgerDao,
LedgerDao,
SequentialWriteDao,
}
import com.daml.platform.store.appendonlydao.events.CompressionStrategy
import com.daml.platform.store.backend.StorageBackendFactory
import com.daml.platform.store.cache.MutableLedgerEndCache
import com.daml.platform.store.dao.JdbcLedgerDaoBackend.{TestLedgerId, TestParticipantId}
import com.daml.platform.store.{DbType, FlywayMigrations, LfValueTranslationCache}
@ -51,23 +58,41 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
)(implicit
loggingContext: LoggingContext
): ResourceOwner[LedgerDao] = {
com.daml.platform.store.appendonlydao.JdbcLedgerDao.writeOwner(
serverRole = ServerRole.Testing(getClass),
jdbcUrl = jdbcUrl,
// this was the previous default.
// keeping it hardcoded here to keep tests working as before extracting the parameter
connectionPoolSize = 16,
connectionTimeout = 250.millis,
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
servicesExecutionContext = executionContext,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
enricher = Some(new ValueEnricher(new Engine())),
participantId = JdbcLedgerDaoBackend.TestParticipantIdRef,
ledgerEndCache = ledgerEndCache,
errorFactories = errorFactories,
)
val metrics = new Metrics(new MetricRegistry)
val dbType = DbType.jdbcType(jdbcUrl)
val storageBackendFactory = StorageBackendFactory.of(dbType)
DbDispatcher
.owner(
dataSource = storageBackendFactory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
serverRole = ServerRole.Testing(getClass),
connectionPoolSize = dbType.maxSupportedWriteConnections(16),
connectionTimeout = 250.millis,
metrics = metrics,
)
.map(dbDispatcher =>
JdbcLedgerDao.write(
dbDispatcher = dbDispatcher,
sequentialWriteDao = SequentialWriteDao(
participantId = JdbcLedgerDaoBackend.TestParticipantIdRef,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
metrics = metrics,
compressionStrategy = CompressionStrategy.none(metrics),
ledgerEndCache = ledgerEndCache,
ingestionStorageBackend = storageBackendFactory.createIngestionStorageBackend,
parameterStorageBackend = storageBackendFactory.createParameterStorageBackend,
),
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
enricher = Some(new ValueEnricher(new Engine())),
participantId = JdbcLedgerDaoBackend.TestParticipantIdRef,
storageBackendFactory = storageBackendFactory,
ledgerEndCache = ledgerEndCache,
errorFactories = errorFactories,
)
)
}
protected final var ledgerDao: LedgerDao = _

View File

@ -14,9 +14,15 @@ import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.LfValueTranslationCache
import com.daml.platform.store.{DbType, LfValueTranslationCache}
import com.daml.platform.store.appendonlydao.events.CompressionStrategy
import com.daml.platform.store.appendonlydao.{JdbcLedgerDao, LedgerDao}
import com.daml.platform.store.appendonlydao.{
DbDispatcher,
JdbcLedgerDao,
LedgerDao,
SequentialWriteDao,
}
import com.daml.platform.store.backend.StorageBackendFactory
import org.scalatest.LoneElement
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
@ -34,22 +40,40 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
loggingContext: LoggingContext
): ResourceOwner[LedgerDao] = {
val metrics = new Metrics(new MetricRegistry)
JdbcLedgerDao
.validatingWriteOwner(
val dbType = DbType.jdbcType(jdbcUrl)
val storageBackendFactory = StorageBackendFactory.of(dbType)
val participantId = Ref.ParticipantId.assertFromString("JdbcLedgerDaoPostCommitValidationSpec")
DbDispatcher
.owner(
dataSource = storageBackendFactory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
serverRole = ServerRole.Testing(getClass),
jdbcUrl = jdbcUrl,
connectionPoolSize = 16,
connectionPoolSize = dbType.maxSupportedWriteConnections(16),
connectionTimeout = 250.millis,
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
enricher = None,
participantId = Ref.ParticipantId.assertFromString("JdbcLedgerDaoPostCommitValidationSpec"),
compressionStrategy = CompressionStrategy.none(metrics),
ledgerEndCache = ledgerEndCache,
errorFactories = errorFactories,
)
.map(dbDispatcher =>
JdbcLedgerDao.validatingWrite(
dbDispatcher = dbDispatcher,
sequentialWriteDao = SequentialWriteDao(
participantId = participantId,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
metrics = metrics,
compressionStrategy = CompressionStrategy.none(metrics),
ledgerEndCache = ledgerEndCache,
ingestionStorageBackend = storageBackendFactory.createIngestionStorageBackend,
parameterStorageBackend = storageBackendFactory.createParameterStorageBackend,
),
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
enricher = None,
participantId = participantId,
storageBackendFactory = storageBackendFactory,
ledgerEndCache = ledgerEndCache,
errorFactories = errorFactories,
)
)
}

View File

@ -28,8 +28,9 @@ import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
import com.daml.platform.indexer.RecoveringIndexerIntegrationSpec._
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.appendonlydao.{JdbcLedgerDao, LedgerReadDao}
import com.daml.platform.store.LfValueTranslationCache
import com.daml.platform.store.appendonlydao.{DbDispatcher, JdbcLedgerDao, LedgerReadDao}
import com.daml.platform.store.{DbType, LfValueTranslationCache}
import com.daml.platform.store.backend.StorageBackendFactory
import com.daml.platform.store.cache.MutableLedgerEndCache
import com.daml.platform.testing.LogCollector
import com.daml.telemetry.{NoOpTelemetryContext, TelemetryContext}
@ -232,23 +233,31 @@ class RecoveringIndexerIntegrationSpec
val jdbcUrl =
s"jdbc:h2:mem:${getClass.getSimpleName.toLowerCase}-$testId;db_close_delay=-1;db_close_on_exit=false"
val errorFactories: ErrorFactories = mock[ErrorFactories]
JdbcLedgerDao
.readOwner(
val storageBackendFactory = StorageBackendFactory.of(DbType.jdbcType(jdbcUrl))
val metrics = new Metrics(new MetricRegistry)
DbDispatcher
.owner(
dataSource = storageBackendFactory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
serverRole = ServerRole.Testing(getClass),
jdbcUrl = jdbcUrl,
connectionPoolSize = 16,
connectionTimeout = 250.millis,
eventsPageSize = 100,
eventsProcessingParallelism = 8,
servicesExecutionContext = executionContext,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
enricher = None,
participantId = Ref.ParticipantId.assertFromString("RecoveringIndexerIntegrationSpec"),
errorFactories = errorFactories,
ledgerEndCache = mutableLedgerEndCache,
metrics = metrics,
)
.map(dbDispatcher =>
JdbcLedgerDao.read(
dbDispatcher = dbDispatcher,
eventsPageSize = 100,
eventsProcessingParallelism = 8,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
enricher = None,
participantId = Ref.ParticipantId.assertFromString("RecoveringIndexerIntegrationSpec"),
storageBackendFactory = storageBackendFactory,
ledgerEndCache = mutableLedgerEndCache,
errorFactories = errorFactories,
) -> mutableLedgerEndCache
)
.map(_ -> mutableLedgerEndCache)
}
}

View File

@ -40,9 +40,15 @@ import com.daml.platform.sandbox.stores.ledger.{Ledger, Rejection, SandboxOffset
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.appendonlydao.events.CompressionStrategy
import com.daml.platform.store.cache.{MutableLedgerEndCache, TranslationCacheBackedContractStore}
import com.daml.platform.store.appendonlydao.{LedgerDao, LedgerWriteDao}
import com.daml.platform.store.appendonlydao.{
DbDispatcher,
LedgerDao,
LedgerWriteDao,
SequentialWriteDao,
}
import com.daml.platform.store.backend.StorageBackendFactory
import com.daml.platform.store.entries.{LedgerEntry, PackageLedgerEntry, PartyLedgerEntry}
import com.daml.platform.store.{BaseLedger, FlywayMigrations, LfValueTranslationCache}
import com.daml.platform.store.{BaseLedger, DbType, FlywayMigrations, LfValueTranslationCache}
import com.google.rpc.status.{Status => RpcStatus}
import io.grpc.Status
@ -100,8 +106,26 @@ private[sandbox] object SqlLedger {
_ <- Resource.fromFuture(
new FlywayMigrations(jdbcUrl).migrate()
)
dbType = DbType.jdbcType(jdbcUrl)
storageBackendFactory = StorageBackendFactory.of(dbType)
ledgerEndCache = MutableLedgerEndCache()
dao <- ledgerDaoOwner(ledgerEndCache, servicesExecutionContext, errorFactories).acquire()
dbDispatcher <- DbDispatcher
.owner(
dataSource =
storageBackendFactory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
serverRole = serverRole,
connectionPoolSize = dbType.maxSupportedWriteConnections(databaseConnectionPoolSize),
connectionTimeout = databaseConnectionTimeout,
metrics = metrics,
)
.acquire()
dao = ledgerDaoOwner(
dbDispatcher,
storageBackendFactory,
ledgerEndCache,
servicesExecutionContext,
errorFactories,
)
_ <- startMode match {
case SqlStartMode.ResetAndStart =>
Resource.fromFuture(dao.reset())
@ -253,18 +277,27 @@ private[sandbox] object SqlLedger {
}
private def ledgerDaoOwner(
dbDispatcher: DbDispatcher,
storageBackendFactory: StorageBackendFactory,
ledgerEndCache: MutableLedgerEndCache,
servicesExecutionContext: ExecutionContext,
errorFactories: ErrorFactories,
): ResourceOwner[LedgerDao] = {
): LedgerDao = {
val compressionStrategy =
if (enableCompression) CompressionStrategy.allGZIP(metrics)
else CompressionStrategy.none(metrics)
com.daml.platform.store.appendonlydao.JdbcLedgerDao.validatingWriteOwner(
serverRole = serverRole,
jdbcUrl = jdbcUrl,
connectionPoolSize = databaseConnectionPoolSize,
connectionTimeout = databaseConnectionTimeout,
val refParticipantId = Ref.ParticipantId.assertFromString(participantId.toString)
com.daml.platform.store.appendonlydao.JdbcLedgerDao.validatingWrite(
dbDispatcher = dbDispatcher,
sequentialWriteDao = SequentialWriteDao(
participantId = refParticipantId,
lfValueTranslationCache = lfValueTranslationCache,
metrics = metrics,
compressionStrategy = compressionStrategy,
ledgerEndCache = ledgerEndCache,
ingestionStorageBackend = storageBackendFactory.createIngestionStorageBackend,
parameterStorageBackend = storageBackendFactory.createParameterStorageBackend,
),
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
servicesExecutionContext = servicesExecutionContext,
@ -272,8 +305,8 @@ private[sandbox] object SqlLedger {
lfValueTranslationCache = lfValueTranslationCache,
validatePartyAllocation = validatePartyAllocation,
enricher = Some(new ValueEnricher(engine)),
participantId = Ref.ParticipantId.assertFromString(participantId.toString),
compressionStrategy = compressionStrategy,
participantId = refParticipantId,
storageBackendFactory = storageBackendFactory,
ledgerEndCache = ledgerEndCache,
errorFactories = errorFactories,
)