mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
DPP-316: Enable the use of the append only index database (#9368)
* Enable append-only schema in index changelog_begin changelog_end * Enable append-only schema in indexer * Add CLI flags to configure append-only indexer * Fix CLI flag name * Remove unused parameter * Change CLI flag description ... it's independent of Postgres * Refactor how indexer config is specified in CLI * Upper case constants
This commit is contained in:
parent
9ac74e697d
commit
301dcd9e40
@ -10,7 +10,11 @@ import com.daml.bazeltools.BazelRunfiles._
|
||||
import com.daml.ledger.api.testing.utils.{AkkaBeforeAndAfterAll, OwnedResource, SuiteResource}
|
||||
import com.daml.ledger.api.tls.TlsConfiguration
|
||||
import com.daml.ledger.on.memory.Owner
|
||||
import com.daml.ledger.participant.state.kvutils.app.{ParticipantConfig, ParticipantRunMode}
|
||||
import com.daml.ledger.participant.state.kvutils.app.{
|
||||
ParticipantConfig,
|
||||
ParticipantIndexerConfig,
|
||||
ParticipantRunMode,
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.{app => kvutils}
|
||||
import com.daml.ledger.participant.state.v1
|
||||
import com.daml.ledger.resources.ResourceContext
|
||||
@ -51,7 +55,9 @@ trait MultiParticipantFixture
|
||||
port = Port.Dynamic,
|
||||
portFile = Some(participant1Portfile),
|
||||
serverJdbcUrl = ParticipantConfig.defaultIndexJdbcUrl(participantId1),
|
||||
allowExistingSchemaForIndex = false,
|
||||
indexerConfig = ParticipantIndexerConfig(
|
||||
allowExistingSchema = false
|
||||
),
|
||||
maxCommandsInFlight = None,
|
||||
)
|
||||
private val participantId2 = v1.ParticipantId.assertFromString("participant2")
|
||||
@ -63,7 +69,9 @@ trait MultiParticipantFixture
|
||||
port = Port.Dynamic,
|
||||
portFile = Some(participant2Portfile),
|
||||
serverJdbcUrl = ParticipantConfig.defaultIndexJdbcUrl(participantId2),
|
||||
allowExistingSchemaForIndex = false,
|
||||
indexerConfig = ParticipantIndexerConfig(
|
||||
allowExistingSchema = false
|
||||
),
|
||||
maxCommandsInFlight = None,
|
||||
)
|
||||
override protected lazy val suiteResource = {
|
||||
|
@ -40,7 +40,13 @@ object SqlLedgerFactory extends LedgerFactory[ReadWriteService, ExtraConfig] {
|
||||
}
|
||||
|
||||
override def manipulateConfig(config: Config[ExtraConfig]): Config[ExtraConfig] =
|
||||
config.copy(participants = config.participants.map(_.copy(allowExistingSchemaForIndex = true)))
|
||||
config.copy(participants =
|
||||
config.participants.map(participantConfig =>
|
||||
participantConfig.copy(indexerConfig =
|
||||
participantConfig.indexerConfig.copy(allowExistingSchema = true)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
override def readWriteServiceOwner(
|
||||
config: Config[ExtraConfig],
|
||||
|
@ -26,4 +26,6 @@ case class ApiServerConfig(
|
||||
portFile: Option[Path],
|
||||
seeding: Seeding,
|
||||
managementServiceTimeout: Duration,
|
||||
// TODO append-only: remove after removing support for the current (mutating) schema
|
||||
enableAppendOnlySchema: Boolean,
|
||||
)
|
||||
|
@ -80,6 +80,7 @@ final class StandaloneApiServer(
|
||||
metrics = metrics,
|
||||
lfValueTranslationCache = lfValueTranslationCache,
|
||||
enricher = valueEnricher,
|
||||
enableAppendOnlySchema = config.enableAppendOnlySchema,
|
||||
)
|
||||
.map(index => new SpannedIndexService(new TimedIndexService(index, metrics)))
|
||||
authorizer = new Authorizer(Clock.systemUTC.instant _, ledgerId, participantId)
|
||||
|
@ -28,6 +28,7 @@ private[platform] object JdbcIndex {
|
||||
metrics: Metrics,
|
||||
lfValueTranslationCache: LfValueTranslationCache.Cache,
|
||||
enricher: ValueEnricher,
|
||||
enableAppendOnlySchema: Boolean,
|
||||
)(implicit mat: Materializer, loggingContext: LoggingContext): ResourceOwner[IndexService] =
|
||||
new ReadOnlySqlLedger.Owner(
|
||||
serverRole = serverRole,
|
||||
@ -39,6 +40,7 @@ private[platform] object JdbcIndex {
|
||||
metrics = metrics,
|
||||
lfValueTranslationCache = lfValueTranslationCache,
|
||||
enricher = enricher,
|
||||
enableAppendOnlySchema = enableAppendOnlySchema,
|
||||
).map { ledger =>
|
||||
new LedgerBackedIndexService(MeteredReadOnlyLedger(ledger, metrics), participantId)
|
||||
}
|
||||
|
@ -46,6 +46,8 @@ private[platform] object ReadOnlySqlLedger {
|
||||
metrics: Metrics,
|
||||
lfValueTranslationCache: LfValueTranslationCache.Cache,
|
||||
enricher: ValueEnricher,
|
||||
// TODO append-only: remove after removing support for the current (mutating) schema
|
||||
enableAppendOnlySchema: Boolean,
|
||||
)(implicit mat: Materializer, loggingContext: LoggingContext)
|
||||
extends ResourceOwner[ReadOnlyLedger] {
|
||||
|
||||
@ -106,16 +108,29 @@ private[platform] object ReadOnlySqlLedger {
|
||||
private def ledgerDaoOwner(
|
||||
servicesExecutionContext: ExecutionContext
|
||||
): ResourceOwner[LedgerReadDao] =
|
||||
JdbcLedgerDao.readOwner(
|
||||
serverRole,
|
||||
jdbcUrl,
|
||||
databaseConnectionPoolSize,
|
||||
eventsPageSize,
|
||||
servicesExecutionContext,
|
||||
metrics,
|
||||
lfValueTranslationCache,
|
||||
Some(enricher),
|
||||
)
|
||||
if (enableAppendOnlySchema) {
|
||||
com.daml.platform.store.appendonlydao.JdbcLedgerDao.readOwner(
|
||||
serverRole,
|
||||
jdbcUrl,
|
||||
databaseConnectionPoolSize,
|
||||
eventsPageSize,
|
||||
servicesExecutionContext,
|
||||
metrics,
|
||||
lfValueTranslationCache,
|
||||
Some(enricher),
|
||||
)
|
||||
} else {
|
||||
JdbcLedgerDao.readOwner(
|
||||
serverRole,
|
||||
jdbcUrl,
|
||||
databaseConnectionPoolSize,
|
||||
eventsPageSize,
|
||||
servicesExecutionContext,
|
||||
metrics,
|
||||
lfValueTranslationCache,
|
||||
Some(enricher),
|
||||
)
|
||||
}
|
||||
|
||||
def contractsStoreOwner(
|
||||
lfValueTranslationCache: LfValueTranslationCache.Cache,
|
||||
|
@ -22,6 +22,12 @@ case class IndexerConfig(
|
||||
// TODO append-only: remove after removing support for the current (mutating) schema
|
||||
enableAppendOnlySchema: Boolean = false,
|
||||
asyncCommitMode: DbType.AsyncCommitMode = DefaultAsyncCommitMode,
|
||||
inputMappingParallelism: Int = DefaultInputMappingParallelism,
|
||||
ingestionParallelism: Int = DefaultIngestionParallelism,
|
||||
submissionBatchSize: Long = DefaultSubmissionBatchSize,
|
||||
tailingRateLimitPerSecond: Int = DefaultTailingRateLimitPerSecond,
|
||||
batchWithinMillis: Long = DefaultBatchWithinMillis,
|
||||
enableCompression: Boolean = DefaultEnableCompression,
|
||||
)
|
||||
|
||||
object IndexerConfig {
|
||||
@ -32,4 +38,10 @@ object IndexerConfig {
|
||||
val DefaultDatabaseConnectionPoolSize: Int = 3
|
||||
val DefaultAsyncCommitMode: DbType.AsyncCommitMode = DbType.AsynchronousCommit
|
||||
|
||||
val DefaultInputMappingParallelism: Int = 16
|
||||
val DefaultIngestionParallelism: Int = 16
|
||||
val DefaultSubmissionBatchSize: Long = 50L
|
||||
val DefaultTailingRateLimitPerSecond: Int = 20
|
||||
val DefaultBatchWithinMillis: Long = 50L
|
||||
val DefaultEnableCompression: Boolean = false
|
||||
}
|
||||
|
@ -19,6 +19,8 @@ import com.daml.platform.ApiOffset.ApiOffsetConverter
|
||||
import com.daml.platform.common
|
||||
import com.daml.platform.common.MismatchException
|
||||
import com.daml.platform.configuration.ServerRole
|
||||
import com.daml.platform.indexer.parallel.ParallelIndexerFactory
|
||||
import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValueTranslation}
|
||||
import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerDao}
|
||||
import com.daml.platform.store.{DbType, FlywayMigrations, LfValueTranslationCache}
|
||||
|
||||
@ -26,6 +28,9 @@ import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
object JdbcIndexer {
|
||||
// TODO append-only: currently StandaloneIndexerServer (among others) has a hard dependency on this concrete class.
|
||||
// Clean this up, for example by extracting the public interface of this class into a trait so that it's easier
|
||||
// to write applications that do not depend on a particular indexer implementation.
|
||||
private[daml] final class Factory private[indexer] (
|
||||
config: IndexerConfig,
|
||||
readService: ReadService,
|
||||
@ -34,6 +39,7 @@ object JdbcIndexer {
|
||||
updateFlowOwnerBuilder: ExecuteUpdate.FlowOwnerBuilder,
|
||||
ledgerDaoOwner: ResourceOwner[LedgerDao],
|
||||
flywayMigrations: FlywayMigrations,
|
||||
lfValueTranslationCache: LfValueTranslationCache.Cache,
|
||||
)(implicit materializer: Materializer, loggingContext: LoggingContext) {
|
||||
|
||||
private[daml] def this(
|
||||
@ -62,28 +68,32 @@ object JdbcIndexer {
|
||||
enricher = None,
|
||||
),
|
||||
new FlywayMigrations(config.jdbcUrl),
|
||||
lfValueTranslationCache,
|
||||
)
|
||||
|
||||
private val logger = ContextualizedLogger.get(this.getClass)
|
||||
|
||||
def validateSchema()(implicit
|
||||
resourceContext: ResourceContext
|
||||
): Future[ResourceOwner[JdbcIndexer]] =
|
||||
): Future[ResourceOwner[Indexer]] =
|
||||
flywayMigrations
|
||||
.validate()
|
||||
.flatMap(_ => initialized(resetSchema = false))(resourceContext.executionContext)
|
||||
|
||||
def migrateSchema(
|
||||
allowExistingSchema: Boolean,
|
||||
enableAppendOnlySchema: Boolean, // TODO append-only: remove after removing support for the current (mutating) schema
|
||||
)(implicit resourceContext: ResourceContext): Future[ResourceOwner[JdbcIndexer]] =
|
||||
allowExistingSchema: Boolean
|
||||
)(implicit resourceContext: ResourceContext): Future[ResourceOwner[Indexer]] =
|
||||
flywayMigrations
|
||||
.migrate(allowExistingSchema, enableAppendOnlySchema)
|
||||
.migrate(allowExistingSchema, config.enableAppendOnlySchema)
|
||||
.flatMap(_ => initialized(resetSchema = false))(resourceContext.executionContext)
|
||||
|
||||
def resetSchema(): Future[ResourceOwner[JdbcIndexer]] = initialized(resetSchema = true)
|
||||
def resetSchema()(implicit
|
||||
resourceContext: ResourceContext
|
||||
): Future[ResourceOwner[Indexer]] = initialized(resetSchema = true)
|
||||
|
||||
private def initialized(resetSchema: Boolean): Future[ResourceOwner[JdbcIndexer]] =
|
||||
private[this] def initializedMutatingSchema(
|
||||
resetSchema: Boolean
|
||||
): Future[ResourceOwner[Indexer]] =
|
||||
Future.successful(for {
|
||||
ledgerDao <- ledgerDaoOwner
|
||||
_ <-
|
||||
@ -105,6 +115,59 @@ object JdbcIndexer {
|
||||
)
|
||||
} yield new JdbcIndexer(initialLedgerEnd, metrics, updateFlow))
|
||||
|
||||
private[this] def initializedAppendOnlySchema(resetSchema: Boolean)(implicit
|
||||
resourceContext: ResourceContext
|
||||
): Future[ResourceOwner[Indexer]] = {
|
||||
implicit val executionContext: ExecutionContext = resourceContext.executionContext
|
||||
// TODO append-only: clean up the mixed use of Future, Resource, and ResourceOwner
|
||||
Future.successful(for {
|
||||
// Note: the LedgerDao interface is only used for initialization here, it can be released immediately
|
||||
// after initialization is finished. Hence the use of ResourceOwner.use().
|
||||
_ <- ResourceOwner.forFuture(() =>
|
||||
ledgerDaoOwner.use(ledgerDao =>
|
||||
for {
|
||||
_ <-
|
||||
if (resetSchema) {
|
||||
ledgerDao.reset()
|
||||
} else {
|
||||
Future.successful(())
|
||||
}
|
||||
_ <- initializeLedger(ledgerDao)().acquire().asFuture
|
||||
} yield ()
|
||||
)
|
||||
)
|
||||
indexer <- ParallelIndexerFactory(
|
||||
jdbcUrl = config.jdbcUrl,
|
||||
participantId = config.participantId,
|
||||
translation = new LfValueTranslation(
|
||||
cache = lfValueTranslationCache,
|
||||
metrics = metrics,
|
||||
enricherO = None,
|
||||
loadPackage = (_, _) => Future.successful(None),
|
||||
),
|
||||
compressionStrategy =
|
||||
if (config.enableCompression) CompressionStrategy.allGZIP(metrics)
|
||||
else CompressionStrategy.none(metrics),
|
||||
mat = materializer,
|
||||
inputMappingParallelism = config.inputMappingParallelism,
|
||||
ingestionParallelism = config.ingestionParallelism,
|
||||
submissionBatchSize = config.submissionBatchSize,
|
||||
tailingRateLimitPerSecond = config.tailingRateLimitPerSecond,
|
||||
batchWithinMillis = config.batchWithinMillis,
|
||||
metrics = metrics,
|
||||
)
|
||||
} yield indexer)
|
||||
}
|
||||
|
||||
private def initialized(resetSchema: Boolean)(implicit
|
||||
resourceContext: ResourceContext
|
||||
): Future[ResourceOwner[Indexer]] =
|
||||
if (config.enableAppendOnlySchema)
|
||||
initializedAppendOnlySchema(resetSchema)
|
||||
else
|
||||
initializedMutatingSchema(resetSchema)
|
||||
|
||||
// TODO append-only: This is just a series of database operations, it should return a Future and not a ResourceOwner
|
||||
private def initializeLedger(dao: LedgerDao)(): ResourceOwner[Option[Offset]] =
|
||||
new ResourceOwner[Option[Offset]] {
|
||||
override def acquire()(implicit context: ResourceContext): Resource[Option[Offset]] =
|
||||
|
@ -45,7 +45,7 @@ final class StandaloneIndexerServer(
|
||||
Resource
|
||||
.fromFuture(
|
||||
indexerFactory
|
||||
.migrateSchema(config.allowExistingSchema, config.enableAppendOnlySchema)
|
||||
.migrateSchema(config.allowExistingSchema)
|
||||
)
|
||||
.flatMap(startIndexer(indexer, _))
|
||||
.map { _ =>
|
||||
|
@ -177,7 +177,7 @@ final class JdbcIndexerSpec
|
||||
participantId: String,
|
||||
mockFlow: Flow[OffsetUpdate, Unit, NotUsed] = noOpFlow,
|
||||
jdbcAsyncCommitMode: DbType.AsyncCommitMode = DbType.AsynchronousCommit,
|
||||
): Future[ResourceOwner[JdbcIndexer]] = {
|
||||
): Future[ResourceOwner[Indexer]] = {
|
||||
val config = IndexerConfig(
|
||||
participantId = v1.ParticipantId.assertFromString(participantId),
|
||||
jdbcUrl = postgresDatabase.url,
|
||||
@ -204,7 +204,8 @@ final class JdbcIndexerSpec
|
||||
mockedUpdateFlowOwnerBuilder(metrics, config.participantId, mockFlow),
|
||||
ledgerDaoOwner = ledgerDaoOwner,
|
||||
flywayMigrations = new FlywayMigrations(config.jdbcUrl),
|
||||
).migrateSchema(allowExistingSchema = true, enableAppendOnlySchema = false)
|
||||
LfValueTranslationCache.Cache.none,
|
||||
).migrateSchema(allowExistingSchema = true)
|
||||
}
|
||||
|
||||
private def mockedUpdateFlowOwnerBuilder(
|
||||
|
@ -52,6 +52,7 @@ object Config {
|
||||
CommandConfiguration.DefaultTrackerRetentionPeriod
|
||||
|
||||
val DefaultMaxInboundMessageSize: Int = 64 * 1024 * 1024
|
||||
|
||||
def createDefault[Extra](extra: Extra): Config[Extra] =
|
||||
Config(
|
||||
mode = Mode.Run,
|
||||
@ -119,11 +120,17 @@ object Config {
|
||||
"port-file, " +
|
||||
"server-jdbc-url, " +
|
||||
"api-server-connection-pool-size" +
|
||||
"indexer-connection-pool-size" +
|
||||
"max-commands-in-flight, " +
|
||||
"management-service-timeout, " +
|
||||
"run-mode, " +
|
||||
"shard-name" +
|
||||
"shard-name, " +
|
||||
"indexer-connection-pool-size, " +
|
||||
"indexer-input-mapping-parallelism, " +
|
||||
"indexer-ingestion-parallelism, " +
|
||||
"indexer-submission-batch-size, " +
|
||||
"indexer-tailing-rate-limit-per-second, " +
|
||||
"indexer-batch-within-millis, " +
|
||||
"indexer-enable-compression, " +
|
||||
"]"
|
||||
)
|
||||
.action((kv, config) => {
|
||||
@ -153,17 +160,42 @@ object Config {
|
||||
val apiServerConnectionPoolSize = kv
|
||||
.get("api-server-connection-pool-size")
|
||||
.map(_.toInt)
|
||||
.getOrElse(ParticipantConfig.defaultApiServerDatabaseConnectionPoolSize)
|
||||
.getOrElse(ParticipantConfig.DefaultApiServerDatabaseConnectionPoolSize)
|
||||
val indexerConnectionPoolSize = kv
|
||||
.get("indexer-connection-pool-size")
|
||||
.map(_.toInt)
|
||||
.getOrElse(ParticipantConfig.defaultIndexerDatabaseConnectionPoolSize)
|
||||
.getOrElse(ParticipantIndexerConfig.DefaultDatabaseConnectionPoolSize)
|
||||
val indexerInputMappingParallelism = kv
|
||||
.get("indexer-input-mapping-parallelism")
|
||||
.map(_.toInt)
|
||||
.getOrElse(ParticipantIndexerConfig.DefaultInputMappingParallelism)
|
||||
val indexerIngestionParallelism = kv
|
||||
.get("indexer-ingestion-parallelism")
|
||||
.map(_.toInt)
|
||||
.getOrElse(ParticipantIndexerConfig.DefaultIngestionParallelism)
|
||||
val indexerSubmissionBatchSize = kv
|
||||
.get("indexer-submission-batch-size")
|
||||
.map(_.toLong)
|
||||
.getOrElse(ParticipantIndexerConfig.DefaultSubmissionBatchSize)
|
||||
val indexerTailingRateLimitPerSecond = kv
|
||||
.get("indexer-tailing-rate-limit-per-second")
|
||||
.map(_.toInt)
|
||||
.getOrElse(ParticipantIndexerConfig.DefaultTailingRateLimitPerSecond)
|
||||
val indexerBatchWithinMillis = kv
|
||||
.get("indexer-batch-within-millis")
|
||||
.map(_.toLong)
|
||||
.getOrElse(ParticipantIndexerConfig.DefaultBatchWithinMillis)
|
||||
val indexerEnableCompression = kv
|
||||
.get("indexer-enable-compression")
|
||||
.map(_.toBoolean)
|
||||
.getOrElse(ParticipantIndexerConfig.DefaultEnableCompression)
|
||||
|
||||
val maxCommandsInFlight =
|
||||
kv.get("max-commands-in-flight").map(_.toInt)
|
||||
val managementServiceTimeout = kv
|
||||
.get("management-service-timeout")
|
||||
.map(Duration.parse)
|
||||
.getOrElse(ParticipantConfig.defaultManagementServiceTimeout)
|
||||
.getOrElse(ParticipantConfig.DefaultManagementServiceTimeout)
|
||||
val shardName = kv.get("shard-name")
|
||||
val partConfig = ParticipantConfig(
|
||||
runMode,
|
||||
@ -173,9 +205,17 @@ object Config {
|
||||
port,
|
||||
portFile,
|
||||
jdbcUrl,
|
||||
indexerDatabaseConnectionPoolSize = indexerConnectionPoolSize,
|
||||
indexerConfig = ParticipantIndexerConfig(
|
||||
databaseConnectionPoolSize = indexerConnectionPoolSize,
|
||||
allowExistingSchema = false,
|
||||
inputMappingParallelism = indexerInputMappingParallelism,
|
||||
ingestionParallelism = indexerIngestionParallelism,
|
||||
submissionBatchSize = indexerSubmissionBatchSize,
|
||||
tailingRateLimitPerSecond = indexerTailingRateLimitPerSecond,
|
||||
batchWithinMillis = indexerBatchWithinMillis,
|
||||
enableCompression = indexerEnableCompression,
|
||||
),
|
||||
apiServerDatabaseConnectionPoolSize = apiServerConnectionPoolSize,
|
||||
allowExistingSchemaForIndex = false,
|
||||
maxCommandsInFlight = maxCommandsInFlight,
|
||||
managementServiceTimeout = managementServiceTimeout,
|
||||
)
|
||||
|
@ -38,11 +38,16 @@ trait ConfigProvider[ExtraConfig] {
|
||||
IndexerConfig(
|
||||
participantConfig.participantId,
|
||||
jdbcUrl = participantConfig.serverJdbcUrl,
|
||||
databaseConnectionPoolSize = participantConfig.indexerDatabaseConnectionPoolSize,
|
||||
databaseConnectionPoolSize = participantConfig.indexerConfig.databaseConnectionPoolSize,
|
||||
startupMode = IndexerStartupMode.MigrateAndStart,
|
||||
eventsPageSize = config.eventsPageSize,
|
||||
allowExistingSchema = participantConfig.allowExistingSchemaForIndex,
|
||||
allowExistingSchema = participantConfig.indexerConfig.allowExistingSchema,
|
||||
enableAppendOnlySchema = config.enableAppendOnlySchema,
|
||||
inputMappingParallelism = participantConfig.indexerConfig.ingestionParallelism,
|
||||
submissionBatchSize = participantConfig.indexerConfig.submissionBatchSize,
|
||||
tailingRateLimitPerSecond = participantConfig.indexerConfig.tailingRateLimitPerSecond,
|
||||
batchWithinMillis = participantConfig.indexerConfig.batchWithinMillis,
|
||||
enableCompression = participantConfig.indexerConfig.enableCompression,
|
||||
)
|
||||
|
||||
def apiServerConfig(
|
||||
@ -62,6 +67,7 @@ trait ConfigProvider[ExtraConfig] {
|
||||
portFile = participantConfig.portFile,
|
||||
seeding = config.seeding,
|
||||
managementServiceTimeout = participantConfig.managementServiceTimeout,
|
||||
enableAppendOnlySchema = config.enableAppendOnlySchema,
|
||||
)
|
||||
|
||||
def commandConfig(
|
||||
|
@ -9,8 +9,6 @@ import com.daml.ledger.participant.state.v1.ParticipantId
|
||||
import com.daml.ports.Port
|
||||
import java.time.Duration
|
||||
|
||||
import com.daml.platform.indexer.IndexerConfig
|
||||
|
||||
final case class ParticipantConfig(
|
||||
mode: ParticipantRunMode,
|
||||
participantId: ParticipantId,
|
||||
@ -20,23 +18,19 @@ final case class ParticipantConfig(
|
||||
port: Port,
|
||||
portFile: Option[Path],
|
||||
serverJdbcUrl: String,
|
||||
allowExistingSchemaForIndex: Boolean,
|
||||
maxCommandsInFlight: Option[Int],
|
||||
managementServiceTimeout: Duration = ParticipantConfig.defaultManagementServiceTimeout,
|
||||
indexerDatabaseConnectionPoolSize: Int =
|
||||
ParticipantConfig.defaultIndexerDatabaseConnectionPoolSize,
|
||||
managementServiceTimeout: Duration = ParticipantConfig.DefaultManagementServiceTimeout,
|
||||
indexerConfig: ParticipantIndexerConfig,
|
||||
apiServerDatabaseConnectionPoolSize: Int =
|
||||
ParticipantConfig.defaultApiServerDatabaseConnectionPoolSize,
|
||||
ParticipantConfig.DefaultApiServerDatabaseConnectionPoolSize,
|
||||
)
|
||||
|
||||
object ParticipantConfig {
|
||||
def defaultIndexJdbcUrl(participantId: ParticipantId): String =
|
||||
s"jdbc:h2:mem:$participantId;db_close_delay=-1;db_close_on_exit=false"
|
||||
|
||||
val defaultManagementServiceTimeout: Duration = Duration.ofMinutes(2)
|
||||
|
||||
val defaultIndexerDatabaseConnectionPoolSize = IndexerConfig.DefaultDatabaseConnectionPoolSize
|
||||
val DefaultManagementServiceTimeout: Duration = Duration.ofMinutes(2)
|
||||
|
||||
// this pool is used for all data access for the ledger api (command submission, transaction service, ...)
|
||||
val defaultApiServerDatabaseConnectionPoolSize = 16
|
||||
val DefaultApiServerDatabaseConnectionPoolSize = 16
|
||||
}
|
||||
|
@ -0,0 +1,32 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.participant.state.kvutils.app
|
||||
|
||||
import com.daml.platform.indexer.IndexerConfig
|
||||
|
||||
/** Indexer-specific configuration of a participant.
|
||||
*
|
||||
* Parameters that are shared between the indexer and the ledger API server are stored in the parent [[ParticipantConfig]].
|
||||
* Parameters that are shared between all ledger participants are stored in the parent [[Config]]
|
||||
*/
|
||||
final case class ParticipantIndexerConfig(
|
||||
allowExistingSchema: Boolean,
|
||||
databaseConnectionPoolSize: Int = ParticipantIndexerConfig.DefaultDatabaseConnectionPoolSize,
|
||||
inputMappingParallelism: Int = ParticipantIndexerConfig.DefaultInputMappingParallelism,
|
||||
ingestionParallelism: Int = ParticipantIndexerConfig.DefaultIngestionParallelism,
|
||||
submissionBatchSize: Long = ParticipantIndexerConfig.DefaultSubmissionBatchSize,
|
||||
tailingRateLimitPerSecond: Int = ParticipantIndexerConfig.DefaultTailingRateLimitPerSecond,
|
||||
batchWithinMillis: Long = ParticipantIndexerConfig.DefaultBatchWithinMillis,
|
||||
enableCompression: Boolean = ParticipantIndexerConfig.DefaultEnableCompression,
|
||||
)
|
||||
|
||||
object ParticipantIndexerConfig {
|
||||
val DefaultDatabaseConnectionPoolSize: Int = IndexerConfig.DefaultDatabaseConnectionPoolSize
|
||||
val DefaultInputMappingParallelism: Int = IndexerConfig.DefaultInputMappingParallelism
|
||||
val DefaultIngestionParallelism: Int = IndexerConfig.DefaultIngestionParallelism
|
||||
val DefaultSubmissionBatchSize: Long = IndexerConfig.DefaultSubmissionBatchSize
|
||||
val DefaultTailingRateLimitPerSecond: Int = IndexerConfig.DefaultTailingRateLimitPerSecond
|
||||
val DefaultBatchWithinMillis: Long = IndexerConfig.DefaultBatchWithinMillis
|
||||
val DefaultEnableCompression: Boolean = IndexerConfig.DefaultEnableCompression
|
||||
}
|
@ -20,7 +20,7 @@ import com.daml.logging.LoggingContext
|
||||
import com.daml.logging.LoggingContext.newLoggingContext
|
||||
import com.daml.metrics.Metrics
|
||||
import com.daml.platform.configuration.ServerRole
|
||||
import com.daml.platform.indexer.{IndexerConfig, IndexerStartupMode, JdbcIndexer}
|
||||
import com.daml.platform.indexer.{Indexer, IndexerConfig, IndexerStartupMode, JdbcIndexer}
|
||||
import com.daml.platform.store.LfValueTranslationCache
|
||||
|
||||
import scala.concurrent.duration.Duration
|
||||
@ -233,7 +233,7 @@ class IntegrityChecker[LogResult](
|
||||
resourceContext: ResourceContext,
|
||||
materializer: Materializer,
|
||||
loggingContext: LoggingContext,
|
||||
): ResourceOwner[JdbcIndexer] =
|
||||
): ResourceOwner[Indexer] =
|
||||
for {
|
||||
servicesExecutionContext <- ResourceOwner
|
||||
.forExecutorService(() => Executors.newWorkStealingPool())
|
||||
@ -247,7 +247,7 @@ class IntegrityChecker[LogResult](
|
||||
lfValueTranslationCache,
|
||||
)
|
||||
migrating <- ResourceOwner.forFuture(() =>
|
||||
indexerFactory.migrateSchema(allowExistingSchema = false, enableAppendOnlySchema = false)
|
||||
indexerFactory.migrateSchema(allowExistingSchema = false)
|
||||
)
|
||||
migrated <- migrating
|
||||
} yield migrated
|
||||
|
@ -235,6 +235,7 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
|
||||
portFile = config.portFile,
|
||||
seeding = config.seeding.get,
|
||||
managementServiceTimeout = config.managementServiceTimeout,
|
||||
enableAppendOnlySchema = false,
|
||||
),
|
||||
engine = engine,
|
||||
commandConfig = config.commandConfig,
|
||||
|
Loading…
Reference in New Issue
Block a user