mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 09:17:43 +03:00
Refactoring store factories [DPP-709] (#11572)
* Pulling up DbDispatcher factory form JdbcLedgerDao * Adapt usage * Moving ReadStorageBackend factory * Minor SQL fixing (parameter join remnants) changelog_begin changelog_end
This commit is contained in:
parent
41fb2890d6
commit
5dee88b038
@ -24,12 +24,14 @@ import com.daml.platform.common.{LedgerIdNotFoundException, MismatchException}
|
||||
import com.daml.platform.configuration.ServerRole
|
||||
import com.daml.platform.server.api.validation.ErrorFactories
|
||||
import com.daml.platform.store.appendonlydao.{
|
||||
DbDispatcher,
|
||||
JdbcLedgerDao,
|
||||
LedgerDaoTransactionsReader,
|
||||
LedgerReadDao,
|
||||
}
|
||||
import com.daml.platform.store.backend.StorageBackendFactory
|
||||
import com.daml.platform.store.cache.{LedgerEndCache, MutableLedgerEndCache}
|
||||
import com.daml.platform.store.{BaseLedger, LfValueTranslationCache}
|
||||
import com.daml.platform.store.{BaseLedger, DbType, LfValueTranslationCache}
|
||||
import com.daml.resources.ProgramResource.StartupException
|
||||
import com.daml.timer.RetryStrategy
|
||||
|
||||
@ -65,9 +67,25 @@ private[platform] object ReadOnlySqlLedger {
|
||||
|
||||
override def acquire()(implicit context: ResourceContext): Resource[ReadOnlySqlLedger] = {
|
||||
val ledgerEndCache = MutableLedgerEndCache()
|
||||
val storageBackendFactory = StorageBackendFactory.of(DbType.jdbcType(jdbcUrl))
|
||||
for {
|
||||
ledgerDao <- ledgerDaoOwner(servicesExecutionContext, errorFactories, ledgerEndCache)
|
||||
dbDispatcher <- DbDispatcher
|
||||
.owner(
|
||||
dataSource =
|
||||
storageBackendFactory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
|
||||
serverRole = serverRole,
|
||||
connectionPoolSize = databaseConnectionPoolSize,
|
||||
connectionTimeout = databaseConnectionTimeout,
|
||||
metrics = metrics,
|
||||
)
|
||||
.acquire()
|
||||
ledgerDao = ledgerDaoOwner(
|
||||
servicesExecutionContext,
|
||||
errorFactories,
|
||||
ledgerEndCache,
|
||||
dbDispatcher,
|
||||
storageBackendFactory,
|
||||
)
|
||||
ledgerId <- Resource.fromFuture(verifyLedgerId(ledgerDao, initialLedgerId))
|
||||
ledger <- ledgerOwner(ledgerDao, ledgerId, ledgerEndCache).acquire()
|
||||
} yield ledger
|
||||
@ -144,21 +162,21 @@ private[platform] object ReadOnlySqlLedger {
|
||||
servicesExecutionContext: ExecutionContext,
|
||||
errorFactories: ErrorFactories,
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
): ResourceOwner[LedgerReadDao] =
|
||||
JdbcLedgerDao.readOwner(
|
||||
serverRole,
|
||||
jdbcUrl,
|
||||
databaseConnectionPoolSize,
|
||||
databaseConnectionTimeout,
|
||||
eventsPageSize,
|
||||
eventsProcessingParallelism,
|
||||
servicesExecutionContext,
|
||||
metrics,
|
||||
lfValueTranslationCache,
|
||||
Some(enricher),
|
||||
participantId,
|
||||
errorFactories,
|
||||
ledgerEndCache,
|
||||
dbDispatcher: DbDispatcher,
|
||||
storageBackendFactory: StorageBackendFactory,
|
||||
): LedgerReadDao =
|
||||
JdbcLedgerDao.read(
|
||||
dbDispatcher = dbDispatcher,
|
||||
eventsPageSize = eventsPageSize,
|
||||
eventsProcessingParallelism = eventsProcessingParallelism,
|
||||
servicesExecutionContext = servicesExecutionContext,
|
||||
metrics = metrics,
|
||||
lfValueTranslationCache = lfValueTranslationCache,
|
||||
enricher = Some(enricher),
|
||||
participantId = participantId,
|
||||
storageBackendFactory = storageBackendFactory,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
errorFactories = errorFactories,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -13,7 +13,7 @@ private[platform] sealed abstract class DbType(
|
||||
if (supportsParallelWrites) maxConnections else 1
|
||||
}
|
||||
|
||||
private[platform] object DbType {
|
||||
object DbType {
|
||||
object Postgres
|
||||
extends DbType(
|
||||
"postgres",
|
||||
|
@ -14,6 +14,8 @@ import com.daml.metrics.Metrics
|
||||
import com.daml.platform.ApiOffset
|
||||
import com.daml.platform.configuration.ServerRole
|
||||
import com.daml.platform.server.api.validation.ErrorFactories
|
||||
import com.daml.platform.store.appendonlydao.{DbDispatcher, JdbcLedgerDao}
|
||||
import com.daml.platform.store.backend.StorageBackendFactory
|
||||
import com.daml.platform.store.cache.MutableLedgerEndCache
|
||||
import scalaz.Tag
|
||||
|
||||
@ -44,24 +46,33 @@ object IndexMetadata {
|
||||
)(implicit
|
||||
executionContext: ExecutionContext,
|
||||
loggingContext: LoggingContext,
|
||||
) =
|
||||
com.daml.platform.store.appendonlydao.JdbcLedgerDao.readOwner(
|
||||
serverRole = ServerRole.ReadIndexMetadata,
|
||||
jdbcUrl = jdbcUrl,
|
||||
connectionPoolSize = 1,
|
||||
connectionTimeout = 250.millis,
|
||||
eventsPageSize = 1000,
|
||||
eventsProcessingParallelism = 8,
|
||||
servicesExecutionContext = executionContext,
|
||||
metrics = new Metrics(new MetricRegistry),
|
||||
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
|
||||
enricher = None,
|
||||
// No participant ID is available for the dump index meta path,
|
||||
// and this property is not needed for the used ReadDao.
|
||||
participantId = Ref.ParticipantId.assertFromString("1"),
|
||||
errorFactories = errorFactories,
|
||||
ledgerEndCache = MutableLedgerEndCache(), // not used
|
||||
)
|
||||
) = {
|
||||
val storageBackendFactory = StorageBackendFactory.of(DbType.jdbcType(jdbcUrl))
|
||||
val metrics = new Metrics(new MetricRegistry)
|
||||
DbDispatcher
|
||||
.owner(
|
||||
dataSource = storageBackendFactory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
|
||||
serverRole = ServerRole.ReadIndexMetadata,
|
||||
connectionPoolSize = 1,
|
||||
connectionTimeout = 250.millis,
|
||||
metrics = metrics,
|
||||
)
|
||||
.map(dbDispatcher =>
|
||||
JdbcLedgerDao.read(
|
||||
dbDispatcher = dbDispatcher,
|
||||
eventsPageSize = 1000,
|
||||
eventsProcessingParallelism = 8,
|
||||
servicesExecutionContext = executionContext,
|
||||
metrics = metrics,
|
||||
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
|
||||
enricher = None,
|
||||
participantId = Ref.ParticipantId.assertFromString("1"),
|
||||
storageBackendFactory = storageBackendFactory,
|
||||
ledgerEndCache = MutableLedgerEndCache(), // not used
|
||||
errorFactories = errorFactories,
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
private val Empty = "<empty>"
|
||||
|
||||
|
@ -88,7 +88,7 @@ private[platform] final class DbDispatcher private (
|
||||
}
|
||||
}
|
||||
|
||||
private[platform] object DbDispatcher {
|
||||
object DbDispatcher {
|
||||
private val logger = ContextualizedLogger.get(this.getClass)
|
||||
|
||||
def owner(
|
||||
|
@ -20,7 +20,6 @@ import com.daml.ledger.participant.state.index.v2.{
|
||||
PackageDetails,
|
||||
}
|
||||
import com.daml.ledger.participant.state.{v2 => state}
|
||||
import com.daml.ledger.resources.ResourceOwner
|
||||
import com.daml.lf.archive.ArchiveParser
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.lf.data.Time.Timestamp
|
||||
@ -30,7 +29,6 @@ import com.daml.logging.LoggingContext.withEnrichedLoggingContext
|
||||
import com.daml.logging.entries.LoggingEntry
|
||||
import com.daml.logging.{ContextualizedLogger, LoggingContext}
|
||||
import com.daml.metrics.{Metrics, Timed}
|
||||
import com.daml.platform.configuration.ServerRole
|
||||
import com.daml.platform.server.api.validation.ErrorFactories
|
||||
import com.daml.platform.store.Conversions._
|
||||
import com.daml.platform.store._
|
||||
@ -41,9 +39,8 @@ import com.daml.platform.store.backend.{
|
||||
ReadStorageBackend,
|
||||
ResetStorageBackend,
|
||||
StorageBackendFactory,
|
||||
UpdateToDbDto,
|
||||
}
|
||||
import com.daml.platform.store.cache.{LedgerEndCache, MutableLedgerEndCache}
|
||||
import com.daml.platform.store.cache.LedgerEndCache
|
||||
import com.daml.platform.store.entries.{
|
||||
ConfigurationEntry,
|
||||
LedgerEntry,
|
||||
@ -51,7 +48,6 @@ import com.daml.platform.store.entries.{
|
||||
PartyLedgerEntry,
|
||||
}
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.{Failure, Success}
|
||||
|
||||
@ -759,11 +755,8 @@ private[platform] object JdbcLedgerDao {
|
||||
"transactionId" -> id
|
||||
}
|
||||
|
||||
def readOwner(
|
||||
serverRole: ServerRole,
|
||||
jdbcUrl: String,
|
||||
connectionPoolSize: Int,
|
||||
connectionTimeout: FiniteDuration,
|
||||
def read(
|
||||
dbDispatcher: DbDispatcher,
|
||||
eventsPageSize: Int,
|
||||
eventsProcessingParallelism: Int,
|
||||
servicesExecutionContext: ExecutionContext,
|
||||
@ -772,186 +765,103 @@ private[platform] object JdbcLedgerDao {
|
||||
enricher: Option[ValueEnricher],
|
||||
participantId: Ref.ParticipantId,
|
||||
errorFactories: ErrorFactories,
|
||||
storageBackendFactory: StorageBackendFactory,
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerReadDao] =
|
||||
owner(
|
||||
serverRole,
|
||||
jdbcUrl,
|
||||
connectionPoolSize,
|
||||
connectionTimeout,
|
||||
eventsPageSize,
|
||||
eventsProcessingParallelism,
|
||||
validate = false,
|
||||
servicesExecutionContext,
|
||||
metrics,
|
||||
lfValueTranslationCache,
|
||||
enricher = enricher,
|
||||
participantId = participantId,
|
||||
sequentialWriteDao = NoopSequentialWriteDao,
|
||||
errorFactories = errorFactories,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
).map(new MeteredLedgerReadDao(_, metrics))
|
||||
|
||||
def writeOwner(
|
||||
serverRole: ServerRole,
|
||||
jdbcUrl: String,
|
||||
connectionPoolSize: Int,
|
||||
connectionTimeout: FiniteDuration,
|
||||
eventsPageSize: Int,
|
||||
eventsProcessingParallelism: Int,
|
||||
servicesExecutionContext: ExecutionContext,
|
||||
metrics: Metrics,
|
||||
lfValueTranslationCache: LfValueTranslationCache.Cache,
|
||||
enricher: Option[ValueEnricher],
|
||||
participantId: Ref.ParticipantId,
|
||||
ledgerEndCache: MutableLedgerEndCache,
|
||||
errorFactories: ErrorFactories,
|
||||
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] = {
|
||||
val dbType = DbType.jdbcType(jdbcUrl)
|
||||
owner(
|
||||
serverRole,
|
||||
jdbcUrl,
|
||||
dbType.maxSupportedWriteConnections(connectionPoolSize),
|
||||
connectionTimeout,
|
||||
eventsPageSize,
|
||||
eventsProcessingParallelism,
|
||||
validate = false,
|
||||
servicesExecutionContext,
|
||||
metrics,
|
||||
lfValueTranslationCache,
|
||||
enricher = enricher,
|
||||
participantId = participantId,
|
||||
sequentialWriteDao = sequentialWriteDao(
|
||||
participantId,
|
||||
lfValueTranslationCache,
|
||||
): LedgerReadDao =
|
||||
new MeteredLedgerReadDao(
|
||||
new JdbcLedgerDao(
|
||||
dbDispatcher,
|
||||
servicesExecutionContext,
|
||||
eventsPageSize,
|
||||
eventsProcessingParallelism,
|
||||
false,
|
||||
metrics,
|
||||
CompressionStrategy.none(metrics),
|
||||
DbType.jdbcType(jdbcUrl),
|
||||
ledgerEndCache,
|
||||
),
|
||||
errorFactories = errorFactories,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
).map(new MeteredLedgerDao(_, metrics))
|
||||
}
|
||||
|
||||
def validatingWriteOwner(
|
||||
serverRole: ServerRole,
|
||||
jdbcUrl: String,
|
||||
connectionPoolSize: Int,
|
||||
connectionTimeout: FiniteDuration,
|
||||
eventsPageSize: Int,
|
||||
eventsProcessingParallelism: Int,
|
||||
servicesExecutionContext: ExecutionContext,
|
||||
metrics: Metrics,
|
||||
lfValueTranslationCache: LfValueTranslationCache.Cache,
|
||||
validatePartyAllocation: Boolean = false,
|
||||
enricher: Option[ValueEnricher],
|
||||
participantId: Ref.ParticipantId,
|
||||
compressionStrategy: CompressionStrategy,
|
||||
ledgerEndCache: MutableLedgerEndCache,
|
||||
errorFactories: ErrorFactories,
|
||||
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] = {
|
||||
val dbType = DbType.jdbcType(jdbcUrl)
|
||||
owner(
|
||||
serverRole,
|
||||
jdbcUrl,
|
||||
dbType.maxSupportedWriteConnections(connectionPoolSize),
|
||||
connectionTimeout,
|
||||
eventsPageSize,
|
||||
eventsProcessingParallelism,
|
||||
validate = true,
|
||||
servicesExecutionContext,
|
||||
metrics,
|
||||
lfValueTranslationCache,
|
||||
validatePartyAllocation,
|
||||
enricher = enricher,
|
||||
participantId = participantId,
|
||||
sequentialWriteDao = sequentialWriteDao(
|
||||
participantId,
|
||||
lfValueTranslationCache,
|
||||
metrics,
|
||||
compressionStrategy,
|
||||
dbType,
|
||||
ledgerEndCache,
|
||||
false,
|
||||
enricher,
|
||||
SequentialWriteDao.noop,
|
||||
participantId,
|
||||
storageBackendFactory.readStorageBackend(ledgerEndCache),
|
||||
storageBackendFactory.createParameterStorageBackend,
|
||||
storageBackendFactory.createDeduplicationStorageBackend,
|
||||
storageBackendFactory.createResetStorageBackend,
|
||||
errorFactories,
|
||||
),
|
||||
errorFactories = errorFactories,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
).map(new MeteredLedgerDao(_, metrics))
|
||||
}
|
||||
|
||||
private def sequentialWriteDao(
|
||||
participantId: Ref.ParticipantId,
|
||||
lfValueTranslationCache: LfValueTranslationCache.Cache,
|
||||
metrics: Metrics,
|
||||
compressionStrategy: CompressionStrategy,
|
||||
dbType: DbType,
|
||||
ledgerEndCache: MutableLedgerEndCache,
|
||||
): SequentialWriteDao = {
|
||||
val factory = StorageBackendFactory.of(dbType)
|
||||
SequentialWriteDaoImpl(
|
||||
ingestionStorageBackend = factory.createIngestionStorageBackend,
|
||||
parameterStorageBackend = factory.createParameterStorageBackend,
|
||||
updateToDbDtos = UpdateToDbDto(
|
||||
participantId = participantId,
|
||||
translation = new LfValueTranslation(
|
||||
cache = lfValueTranslationCache,
|
||||
metrics = metrics,
|
||||
enricherO = None,
|
||||
loadPackage = (_, _) => Future.successful(None),
|
||||
),
|
||||
compressionStrategy = compressionStrategy,
|
||||
),
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
metrics,
|
||||
)
|
||||
}
|
||||
|
||||
private def owner(
|
||||
serverRole: ServerRole,
|
||||
jdbcUrl: String,
|
||||
connectionPoolSize: Int,
|
||||
connectionTimeout: FiniteDuration,
|
||||
eventsPageSize: Int,
|
||||
eventsProcessingParallelism: Int,
|
||||
validate: Boolean,
|
||||
servicesExecutionContext: ExecutionContext,
|
||||
metrics: Metrics,
|
||||
lfValueTranslationCache: LfValueTranslationCache.Cache,
|
||||
validatePartyAllocation: Boolean = false,
|
||||
enricher: Option[ValueEnricher],
|
||||
participantId: Ref.ParticipantId,
|
||||
def write(
|
||||
dbDispatcher: DbDispatcher,
|
||||
sequentialWriteDao: SequentialWriteDao,
|
||||
eventsPageSize: Int,
|
||||
eventsProcessingParallelism: Int,
|
||||
servicesExecutionContext: ExecutionContext,
|
||||
metrics: Metrics,
|
||||
lfValueTranslationCache: LfValueTranslationCache.Cache,
|
||||
enricher: Option[ValueEnricher],
|
||||
participantId: Ref.ParticipantId,
|
||||
errorFactories: ErrorFactories,
|
||||
storageBackendFactory: StorageBackendFactory,
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] = {
|
||||
val dbType = DbType.jdbcType(jdbcUrl)
|
||||
val factory = StorageBackendFactory.of(dbType)
|
||||
for {
|
||||
dbDispatcher <- DbDispatcher.owner(
|
||||
factory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
|
||||
serverRole,
|
||||
connectionPoolSize,
|
||||
connectionTimeout,
|
||||
): LedgerDao =
|
||||
new MeteredLedgerDao(
|
||||
new JdbcLedgerDao(
|
||||
dbDispatcher,
|
||||
servicesExecutionContext,
|
||||
eventsPageSize,
|
||||
eventsProcessingParallelism,
|
||||
false,
|
||||
metrics,
|
||||
)
|
||||
} yield new JdbcLedgerDao(
|
||||
dbDispatcher,
|
||||
servicesExecutionContext,
|
||||
eventsPageSize,
|
||||
eventsProcessingParallelism,
|
||||
validate,
|
||||
lfValueTranslationCache,
|
||||
false,
|
||||
enricher,
|
||||
sequentialWriteDao,
|
||||
participantId,
|
||||
storageBackendFactory.readStorageBackend(ledgerEndCache),
|
||||
storageBackendFactory.createParameterStorageBackend,
|
||||
storageBackendFactory.createDeduplicationStorageBackend,
|
||||
storageBackendFactory.createResetStorageBackend,
|
||||
errorFactories,
|
||||
),
|
||||
metrics,
|
||||
)
|
||||
|
||||
def validatingWrite(
|
||||
dbDispatcher: DbDispatcher,
|
||||
sequentialWriteDao: SequentialWriteDao,
|
||||
eventsPageSize: Int,
|
||||
eventsProcessingParallelism: Int,
|
||||
servicesExecutionContext: ExecutionContext,
|
||||
metrics: Metrics,
|
||||
lfValueTranslationCache: LfValueTranslationCache.Cache,
|
||||
validatePartyAllocation: Boolean = false,
|
||||
enricher: Option[ValueEnricher],
|
||||
participantId: Ref.ParticipantId,
|
||||
errorFactories: ErrorFactories,
|
||||
storageBackendFactory: StorageBackendFactory,
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
): LedgerDao =
|
||||
new MeteredLedgerDao(
|
||||
new JdbcLedgerDao(
|
||||
dbDispatcher,
|
||||
servicesExecutionContext,
|
||||
eventsPageSize,
|
||||
eventsProcessingParallelism,
|
||||
true,
|
||||
metrics,
|
||||
lfValueTranslationCache,
|
||||
validatePartyAllocation,
|
||||
enricher,
|
||||
sequentialWriteDao,
|
||||
participantId,
|
||||
storageBackendFactory.readStorageBackend(ledgerEndCache),
|
||||
storageBackendFactory.createParameterStorageBackend,
|
||||
storageBackendFactory.createDeduplicationStorageBackend,
|
||||
storageBackendFactory.createResetStorageBackend,
|
||||
errorFactories,
|
||||
),
|
||||
metrics,
|
||||
lfValueTranslationCache,
|
||||
validatePartyAllocation,
|
||||
enricher,
|
||||
sequentialWriteDao,
|
||||
participantId,
|
||||
StorageBackendFactory.readStorageBackendFor(dbType, ledgerEndCache),
|
||||
factory.createParameterStorageBackend,
|
||||
factory.createDeduplicationStorageBackend,
|
||||
factory.createResetStorageBackend,
|
||||
errorFactories,
|
||||
)
|
||||
}
|
||||
|
||||
val acceptType = "accept"
|
||||
val rejectType = "reject"
|
||||
|
@ -8,21 +8,60 @@ import java.sql.Connection
|
||||
import com.daml.ledger.offset.Offset
|
||||
import com.daml.ledger.participant.state.v2.Update
|
||||
import com.daml.ledger.participant.state.{v2 => state}
|
||||
import com.daml.platform.store.backend.{DbDto, IngestionStorageBackend, ParameterStorageBackend}
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.metrics.Metrics
|
||||
import com.daml.platform.store.LfValueTranslationCache
|
||||
import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValueTranslation}
|
||||
import com.daml.platform.store.backend.{
|
||||
DbDto,
|
||||
IngestionStorageBackend,
|
||||
ParameterStorageBackend,
|
||||
UpdateToDbDto,
|
||||
}
|
||||
import com.daml.platform.store.cache.MutableLedgerEndCache
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.util.chaining.scalaUtilChainingOps
|
||||
|
||||
trait SequentialWriteDao {
|
||||
def store(connection: Connection, offset: Offset, update: Option[state.Update]): Unit
|
||||
}
|
||||
|
||||
object NoopSequentialWriteDao extends SequentialWriteDao {
|
||||
object SequentialWriteDao {
|
||||
def apply(
|
||||
participantId: Ref.ParticipantId,
|
||||
lfValueTranslationCache: LfValueTranslationCache.Cache,
|
||||
metrics: Metrics,
|
||||
compressionStrategy: CompressionStrategy,
|
||||
ledgerEndCache: MutableLedgerEndCache,
|
||||
ingestionStorageBackend: IngestionStorageBackend[_],
|
||||
parameterStorageBackend: ParameterStorageBackend,
|
||||
): SequentialWriteDao =
|
||||
SequentialWriteDaoImpl(
|
||||
ingestionStorageBackend = ingestionStorageBackend,
|
||||
parameterStorageBackend = parameterStorageBackend,
|
||||
updateToDbDtos = UpdateToDbDto(
|
||||
participantId = participantId,
|
||||
translation = new LfValueTranslation(
|
||||
cache = lfValueTranslationCache,
|
||||
metrics = metrics,
|
||||
enricherO = None,
|
||||
loadPackage = (_, _) => Future.successful(None),
|
||||
),
|
||||
compressionStrategy = compressionStrategy,
|
||||
),
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
)
|
||||
|
||||
val noop: SequentialWriteDao = NoopSequentialWriteDao
|
||||
}
|
||||
|
||||
private[appendonlydao] object NoopSequentialWriteDao extends SequentialWriteDao {
|
||||
override def store(connection: Connection, offset: Offset, update: Option[Update]): Unit =
|
||||
throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
case class SequentialWriteDaoImpl[DB_BATCH](
|
||||
private[appendonlydao] case class SequentialWriteDaoImpl[DB_BATCH](
|
||||
ingestionStorageBackend: IngestionStorageBackend[DB_BATCH],
|
||||
parameterStorageBackend: ParameterStorageBackend,
|
||||
updateToDbDtos: Offset => state.Update => Iterator[DbDto],
|
||||
|
@ -24,6 +24,16 @@ trait StorageBackendFactory {
|
||||
def createIntegrityStorageBackend: IntegrityStorageBackend
|
||||
def createResetStorageBackend: ResetStorageBackend
|
||||
def createStringInterningStorageBackend: StringInterningStorageBackend
|
||||
|
||||
final def readStorageBackend(ledgerEndCache: LedgerEndCache): ReadStorageBackend =
|
||||
ReadStorageBackend(
|
||||
configurationStorageBackend = createConfigurationStorageBackend(ledgerEndCache),
|
||||
partyStorageBackend = createPartyStorageBackend(ledgerEndCache),
|
||||
packageStorageBackend = createPackageStorageBackend(ledgerEndCache),
|
||||
completionStorageBackend = createCompletionStorageBackend,
|
||||
contractStorageBackend = createContractStorageBackend(ledgerEndCache),
|
||||
eventStorageBackend = createEventStorageBackend(ledgerEndCache),
|
||||
)
|
||||
}
|
||||
|
||||
object StorageBackendFactory {
|
||||
@ -33,21 +43,6 @@ object StorageBackendFactory {
|
||||
case DbType.Postgres => PostgresStorageBackendFactory
|
||||
case DbType.Oracle => OracleStorageBackendFactory
|
||||
}
|
||||
|
||||
def readStorageBackendFor(
|
||||
dbType: DbType,
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
): ReadStorageBackend = {
|
||||
val factory = of(dbType)
|
||||
ReadStorageBackend(
|
||||
configurationStorageBackend = factory.createConfigurationStorageBackend(ledgerEndCache),
|
||||
partyStorageBackend = factory.createPartyStorageBackend(ledgerEndCache),
|
||||
packageStorageBackend = factory.createPackageStorageBackend(ledgerEndCache),
|
||||
completionStorageBackend = factory.createCompletionStorageBackend,
|
||||
contractStorageBackend = factory.createContractStorageBackend(ledgerEndCache),
|
||||
eventStorageBackend = factory.createEventStorageBackend(ledgerEndCache),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
case class ReadStorageBackend(
|
||||
|
@ -24,7 +24,7 @@ private[backend] class PackageStorageBackendTemplate(ledgerEndCache: LedgerEndCa
|
||||
private val SQL_SELECT_PACKAGES =
|
||||
SQL(
|
||||
"""select packages.package_id, packages.source_description, packages.known_since, packages.package_size
|
||||
|from packages, parameters
|
||||
|from packages
|
||||
|where packages.ledger_offset <= {ledgerEndOffset}
|
||||
|""".stripMargin
|
||||
)
|
||||
@ -59,7 +59,7 @@ private[backend] class PackageStorageBackendTemplate(ledgerEndCache: LedgerEndCa
|
||||
|
||||
private val SQL_SELECT_PACKAGE =
|
||||
SQL("""select packages.package
|
||||
|from packages, parameters
|
||||
|from packages
|
||||
|where package_id = {package_id}
|
||||
|and packages.ledger_offset <= {ledgerEndOffset}
|
||||
|""".stripMargin)
|
||||
|
@ -14,7 +14,14 @@ import com.daml.logging.LoggingContext.newLoggingContext
|
||||
import com.daml.metrics.Metrics
|
||||
import com.daml.platform.configuration.ServerRole
|
||||
import com.daml.platform.server.api.validation.ErrorFactories
|
||||
import com.daml.platform.store.appendonlydao.LedgerDao
|
||||
import com.daml.platform.store.appendonlydao.{
|
||||
DbDispatcher,
|
||||
JdbcLedgerDao,
|
||||
LedgerDao,
|
||||
SequentialWriteDao,
|
||||
}
|
||||
import com.daml.platform.store.appendonlydao.events.CompressionStrategy
|
||||
import com.daml.platform.store.backend.StorageBackendFactory
|
||||
import com.daml.platform.store.cache.MutableLedgerEndCache
|
||||
import com.daml.platform.store.dao.JdbcLedgerDaoBackend.{TestLedgerId, TestParticipantId}
|
||||
import com.daml.platform.store.{DbType, FlywayMigrations, LfValueTranslationCache}
|
||||
@ -51,23 +58,41 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
|
||||
)(implicit
|
||||
loggingContext: LoggingContext
|
||||
): ResourceOwner[LedgerDao] = {
|
||||
com.daml.platform.store.appendonlydao.JdbcLedgerDao.writeOwner(
|
||||
serverRole = ServerRole.Testing(getClass),
|
||||
jdbcUrl = jdbcUrl,
|
||||
// this was the previous default.
|
||||
// keeping it hardcoded here to keep tests working as before extracting the parameter
|
||||
connectionPoolSize = 16,
|
||||
connectionTimeout = 250.millis,
|
||||
eventsPageSize = eventsPageSize,
|
||||
eventsProcessingParallelism = eventsProcessingParallelism,
|
||||
servicesExecutionContext = executionContext,
|
||||
metrics = new Metrics(new MetricRegistry),
|
||||
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
|
||||
enricher = Some(new ValueEnricher(new Engine())),
|
||||
participantId = JdbcLedgerDaoBackend.TestParticipantIdRef,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
errorFactories = errorFactories,
|
||||
)
|
||||
val metrics = new Metrics(new MetricRegistry)
|
||||
val dbType = DbType.jdbcType(jdbcUrl)
|
||||
val storageBackendFactory = StorageBackendFactory.of(dbType)
|
||||
DbDispatcher
|
||||
.owner(
|
||||
dataSource = storageBackendFactory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
|
||||
serverRole = ServerRole.Testing(getClass),
|
||||
connectionPoolSize = dbType.maxSupportedWriteConnections(16),
|
||||
connectionTimeout = 250.millis,
|
||||
metrics = metrics,
|
||||
)
|
||||
.map(dbDispatcher =>
|
||||
JdbcLedgerDao.write(
|
||||
dbDispatcher = dbDispatcher,
|
||||
sequentialWriteDao = SequentialWriteDao(
|
||||
participantId = JdbcLedgerDaoBackend.TestParticipantIdRef,
|
||||
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
|
||||
metrics = metrics,
|
||||
compressionStrategy = CompressionStrategy.none(metrics),
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
ingestionStorageBackend = storageBackendFactory.createIngestionStorageBackend,
|
||||
parameterStorageBackend = storageBackendFactory.createParameterStorageBackend,
|
||||
),
|
||||
eventsPageSize = eventsPageSize,
|
||||
eventsProcessingParallelism = eventsProcessingParallelism,
|
||||
servicesExecutionContext = executionContext,
|
||||
metrics = metrics,
|
||||
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
|
||||
enricher = Some(new ValueEnricher(new Engine())),
|
||||
participantId = JdbcLedgerDaoBackend.TestParticipantIdRef,
|
||||
storageBackendFactory = storageBackendFactory,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
errorFactories = errorFactories,
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
protected final var ledgerDao: LedgerDao = _
|
||||
|
@ -14,9 +14,15 @@ import com.daml.logging.LoggingContext
|
||||
import com.daml.metrics.Metrics
|
||||
import com.daml.platform.configuration.ServerRole
|
||||
import com.daml.platform.server.api.validation.ErrorFactories
|
||||
import com.daml.platform.store.LfValueTranslationCache
|
||||
import com.daml.platform.store.{DbType, LfValueTranslationCache}
|
||||
import com.daml.platform.store.appendonlydao.events.CompressionStrategy
|
||||
import com.daml.platform.store.appendonlydao.{JdbcLedgerDao, LedgerDao}
|
||||
import com.daml.platform.store.appendonlydao.{
|
||||
DbDispatcher,
|
||||
JdbcLedgerDao,
|
||||
LedgerDao,
|
||||
SequentialWriteDao,
|
||||
}
|
||||
import com.daml.platform.store.backend.StorageBackendFactory
|
||||
import org.scalatest.LoneElement
|
||||
import org.scalatest.flatspec.AsyncFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
@ -34,22 +40,40 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
|
||||
loggingContext: LoggingContext
|
||||
): ResourceOwner[LedgerDao] = {
|
||||
val metrics = new Metrics(new MetricRegistry)
|
||||
JdbcLedgerDao
|
||||
.validatingWriteOwner(
|
||||
val dbType = DbType.jdbcType(jdbcUrl)
|
||||
val storageBackendFactory = StorageBackendFactory.of(dbType)
|
||||
val participantId = Ref.ParticipantId.assertFromString("JdbcLedgerDaoPostCommitValidationSpec")
|
||||
DbDispatcher
|
||||
.owner(
|
||||
dataSource = storageBackendFactory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
|
||||
serverRole = ServerRole.Testing(getClass),
|
||||
jdbcUrl = jdbcUrl,
|
||||
connectionPoolSize = 16,
|
||||
connectionPoolSize = dbType.maxSupportedWriteConnections(16),
|
||||
connectionTimeout = 250.millis,
|
||||
eventsPageSize = eventsPageSize,
|
||||
eventsProcessingParallelism = eventsProcessingParallelism,
|
||||
servicesExecutionContext = executionContext,
|
||||
metrics = metrics,
|
||||
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
|
||||
enricher = None,
|
||||
participantId = Ref.ParticipantId.assertFromString("JdbcLedgerDaoPostCommitValidationSpec"),
|
||||
compressionStrategy = CompressionStrategy.none(metrics),
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
errorFactories = errorFactories,
|
||||
)
|
||||
.map(dbDispatcher =>
|
||||
JdbcLedgerDao.validatingWrite(
|
||||
dbDispatcher = dbDispatcher,
|
||||
sequentialWriteDao = SequentialWriteDao(
|
||||
participantId = participantId,
|
||||
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
|
||||
metrics = metrics,
|
||||
compressionStrategy = CompressionStrategy.none(metrics),
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
ingestionStorageBackend = storageBackendFactory.createIngestionStorageBackend,
|
||||
parameterStorageBackend = storageBackendFactory.createParameterStorageBackend,
|
||||
),
|
||||
eventsPageSize = eventsPageSize,
|
||||
eventsProcessingParallelism = eventsProcessingParallelism,
|
||||
servicesExecutionContext = executionContext,
|
||||
metrics = metrics,
|
||||
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
|
||||
enricher = None,
|
||||
participantId = participantId,
|
||||
storageBackendFactory = storageBackendFactory,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
errorFactories = errorFactories,
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -28,8 +28,9 @@ import com.daml.metrics.Metrics
|
||||
import com.daml.platform.configuration.ServerRole
|
||||
import com.daml.platform.indexer.RecoveringIndexerIntegrationSpec._
|
||||
import com.daml.platform.server.api.validation.ErrorFactories
|
||||
import com.daml.platform.store.appendonlydao.{JdbcLedgerDao, LedgerReadDao}
|
||||
import com.daml.platform.store.LfValueTranslationCache
|
||||
import com.daml.platform.store.appendonlydao.{DbDispatcher, JdbcLedgerDao, LedgerReadDao}
|
||||
import com.daml.platform.store.{DbType, LfValueTranslationCache}
|
||||
import com.daml.platform.store.backend.StorageBackendFactory
|
||||
import com.daml.platform.store.cache.MutableLedgerEndCache
|
||||
import com.daml.platform.testing.LogCollector
|
||||
import com.daml.telemetry.{NoOpTelemetryContext, TelemetryContext}
|
||||
@ -232,23 +233,31 @@ class RecoveringIndexerIntegrationSpec
|
||||
val jdbcUrl =
|
||||
s"jdbc:h2:mem:${getClass.getSimpleName.toLowerCase}-$testId;db_close_delay=-1;db_close_on_exit=false"
|
||||
val errorFactories: ErrorFactories = mock[ErrorFactories]
|
||||
JdbcLedgerDao
|
||||
.readOwner(
|
||||
val storageBackendFactory = StorageBackendFactory.of(DbType.jdbcType(jdbcUrl))
|
||||
val metrics = new Metrics(new MetricRegistry)
|
||||
DbDispatcher
|
||||
.owner(
|
||||
dataSource = storageBackendFactory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
|
||||
serverRole = ServerRole.Testing(getClass),
|
||||
jdbcUrl = jdbcUrl,
|
||||
connectionPoolSize = 16,
|
||||
connectionTimeout = 250.millis,
|
||||
eventsPageSize = 100,
|
||||
eventsProcessingParallelism = 8,
|
||||
servicesExecutionContext = executionContext,
|
||||
metrics = new Metrics(new MetricRegistry),
|
||||
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
|
||||
enricher = None,
|
||||
participantId = Ref.ParticipantId.assertFromString("RecoveringIndexerIntegrationSpec"),
|
||||
errorFactories = errorFactories,
|
||||
ledgerEndCache = mutableLedgerEndCache,
|
||||
metrics = metrics,
|
||||
)
|
||||
.map(dbDispatcher =>
|
||||
JdbcLedgerDao.read(
|
||||
dbDispatcher = dbDispatcher,
|
||||
eventsPageSize = 100,
|
||||
eventsProcessingParallelism = 8,
|
||||
servicesExecutionContext = executionContext,
|
||||
metrics = metrics,
|
||||
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
|
||||
enricher = None,
|
||||
participantId = Ref.ParticipantId.assertFromString("RecoveringIndexerIntegrationSpec"),
|
||||
storageBackendFactory = storageBackendFactory,
|
||||
ledgerEndCache = mutableLedgerEndCache,
|
||||
errorFactories = errorFactories,
|
||||
) -> mutableLedgerEndCache
|
||||
)
|
||||
.map(_ -> mutableLedgerEndCache)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -40,9 +40,15 @@ import com.daml.platform.sandbox.stores.ledger.{Ledger, Rejection, SandboxOffset
|
||||
import com.daml.platform.server.api.validation.ErrorFactories
|
||||
import com.daml.platform.store.appendonlydao.events.CompressionStrategy
|
||||
import com.daml.platform.store.cache.{MutableLedgerEndCache, TranslationCacheBackedContractStore}
|
||||
import com.daml.platform.store.appendonlydao.{LedgerDao, LedgerWriteDao}
|
||||
import com.daml.platform.store.appendonlydao.{
|
||||
DbDispatcher,
|
||||
LedgerDao,
|
||||
LedgerWriteDao,
|
||||
SequentialWriteDao,
|
||||
}
|
||||
import com.daml.platform.store.backend.StorageBackendFactory
|
||||
import com.daml.platform.store.entries.{LedgerEntry, PackageLedgerEntry, PartyLedgerEntry}
|
||||
import com.daml.platform.store.{BaseLedger, FlywayMigrations, LfValueTranslationCache}
|
||||
import com.daml.platform.store.{BaseLedger, DbType, FlywayMigrations, LfValueTranslationCache}
|
||||
import com.google.rpc.status.{Status => RpcStatus}
|
||||
import io.grpc.Status
|
||||
|
||||
@ -100,8 +106,26 @@ private[sandbox] object SqlLedger {
|
||||
_ <- Resource.fromFuture(
|
||||
new FlywayMigrations(jdbcUrl).migrate()
|
||||
)
|
||||
dbType = DbType.jdbcType(jdbcUrl)
|
||||
storageBackendFactory = StorageBackendFactory.of(dbType)
|
||||
ledgerEndCache = MutableLedgerEndCache()
|
||||
dao <- ledgerDaoOwner(ledgerEndCache, servicesExecutionContext, errorFactories).acquire()
|
||||
dbDispatcher <- DbDispatcher
|
||||
.owner(
|
||||
dataSource =
|
||||
storageBackendFactory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
|
||||
serverRole = serverRole,
|
||||
connectionPoolSize = dbType.maxSupportedWriteConnections(databaseConnectionPoolSize),
|
||||
connectionTimeout = databaseConnectionTimeout,
|
||||
metrics = metrics,
|
||||
)
|
||||
.acquire()
|
||||
dao = ledgerDaoOwner(
|
||||
dbDispatcher,
|
||||
storageBackendFactory,
|
||||
ledgerEndCache,
|
||||
servicesExecutionContext,
|
||||
errorFactories,
|
||||
)
|
||||
_ <- startMode match {
|
||||
case SqlStartMode.ResetAndStart =>
|
||||
Resource.fromFuture(dao.reset())
|
||||
@ -253,18 +277,27 @@ private[sandbox] object SqlLedger {
|
||||
}
|
||||
|
||||
private def ledgerDaoOwner(
|
||||
dbDispatcher: DbDispatcher,
|
||||
storageBackendFactory: StorageBackendFactory,
|
||||
ledgerEndCache: MutableLedgerEndCache,
|
||||
servicesExecutionContext: ExecutionContext,
|
||||
errorFactories: ErrorFactories,
|
||||
): ResourceOwner[LedgerDao] = {
|
||||
): LedgerDao = {
|
||||
val compressionStrategy =
|
||||
if (enableCompression) CompressionStrategy.allGZIP(metrics)
|
||||
else CompressionStrategy.none(metrics)
|
||||
com.daml.platform.store.appendonlydao.JdbcLedgerDao.validatingWriteOwner(
|
||||
serverRole = serverRole,
|
||||
jdbcUrl = jdbcUrl,
|
||||
connectionPoolSize = databaseConnectionPoolSize,
|
||||
connectionTimeout = databaseConnectionTimeout,
|
||||
val refParticipantId = Ref.ParticipantId.assertFromString(participantId.toString)
|
||||
com.daml.platform.store.appendonlydao.JdbcLedgerDao.validatingWrite(
|
||||
dbDispatcher = dbDispatcher,
|
||||
sequentialWriteDao = SequentialWriteDao(
|
||||
participantId = refParticipantId,
|
||||
lfValueTranslationCache = lfValueTranslationCache,
|
||||
metrics = metrics,
|
||||
compressionStrategy = compressionStrategy,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
ingestionStorageBackend = storageBackendFactory.createIngestionStorageBackend,
|
||||
parameterStorageBackend = storageBackendFactory.createParameterStorageBackend,
|
||||
),
|
||||
eventsPageSize = eventsPageSize,
|
||||
eventsProcessingParallelism = eventsProcessingParallelism,
|
||||
servicesExecutionContext = servicesExecutionContext,
|
||||
@ -272,8 +305,8 @@ private[sandbox] object SqlLedger {
|
||||
lfValueTranslationCache = lfValueTranslationCache,
|
||||
validatePartyAllocation = validatePartyAllocation,
|
||||
enricher = Some(new ValueEnricher(engine)),
|
||||
participantId = Ref.ParticipantId.assertFromString(participantId.toString),
|
||||
compressionStrategy = compressionStrategy,
|
||||
participantId = refParticipantId,
|
||||
storageBackendFactory = storageBackendFactory,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
errorFactories = errorFactories,
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user