Make ACS reader parameters configurable (#11732)

changelog_begin
changelog_end
This commit is contained in:
Robert Autenrieth 2021-11-23 21:48:02 +01:00 committed by GitHub
parent 329e609ec4
commit bd2a6852ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 186 additions and 7 deletions

View File

@ -28,6 +28,9 @@ case class ApiServerConfig(
configurationLoadTimeout: Duration,
eventsPageSize: Int = IndexConfiguration.DefaultEventsPageSize,
eventsProcessingParallelism: Int = IndexConfiguration.DefaultEventsProcessingParallelism,
acsIdPageSize: Int = IndexConfiguration.DefaultAcsIdPageSize,
acsIdFetchingParallelism: Int = IndexConfiguration.DefaultAcsIdFetchingParallelism,
acsContractFetchingParallelism: Int = IndexConfiguration.DefaultAcsContractFetchingParallelism,
portFile: Option[Path],
seeding: Seeding,
managementServiceTimeout: Duration,

View File

@ -85,6 +85,9 @@ final class StandaloneApiServer(
databaseConnectionTimeout = config.databaseConnectionTimeout,
eventsPageSize = config.eventsPageSize,
eventsProcessingParallelism = config.eventsProcessingParallelism,
acsIdPageSize = config.acsIdPageSize,
acsIdFetchingParallelism = config.acsIdFetchingParallelism,
acsContractFetchingParallelism = config.acsContractFetchingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,

View File

@ -7,5 +7,7 @@ object IndexConfiguration {
val DefaultEventsPageSize: Int = 1000
val DefaultEventsProcessingParallelism: Int = 8
val DefaultAcsIdPageSize: Int = 20000
val DefaultAcsIdFetchingParallelism: Int = 2
val DefaultAcsContractFetchingParallelism: Int = 2
}

View File

@ -29,6 +29,9 @@ private[platform] object JdbcIndex {
databaseConnectionTimeout: FiniteDuration,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -48,6 +51,9 @@ private[platform] object JdbcIndex {
initialLedgerId = ledgerId,
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
acsIdPageSize = acsIdPageSize,
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,

View File

@ -56,6 +56,9 @@ private[platform] object ReadOnlySqlLedger {
databaseConnectionTimeout: FiniteDuration,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -190,6 +193,9 @@ private[platform] object ReadOnlySqlLedger {
dbDispatcher = dbDispatcher,
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
acsIdPageSize = acsIdPageSize,
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,

View File

@ -66,6 +66,9 @@ object IndexMetadata {
dbDispatcher = dbDispatcher,
eventsPageSize = 1000,
eventsProcessingParallelism = 8,
acsIdPageSize = 20000,
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,

View File

@ -59,6 +59,9 @@ private class JdbcLedgerDao(
servicesExecutionContext: ExecutionContext,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
performPostCommitValidation: Boolean,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -662,9 +665,9 @@ private class JdbcLedgerDao(
queryNonPruned = queryNonPruned,
eventStorageBackend = readStorageBackend.eventStorageBackend,
pageSize = eventsPageSize,
idPageSize = eventsPageSize * 20,
idFetchingParallelism = 2,
acsFetchingparallelism = 2,
idPageSize = acsIdPageSize,
idFetchingParallelism = acsIdFetchingParallelism,
acsFetchingparallelism = acsContractFetchingParallelism,
metrics = metrics,
materializer = materializer,
),
@ -773,6 +776,9 @@ private[platform] object JdbcLedgerDao {
dbDispatcher: DbDispatcher,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -790,6 +796,9 @@ private[platform] object JdbcLedgerDao {
servicesExecutionContext,
eventsPageSize,
eventsProcessingParallelism,
acsIdPageSize,
acsIdFetchingParallelism,
acsContractFetchingParallelism,
false,
metrics,
lfValueTranslationCache,
@ -812,6 +821,9 @@ private[platform] object JdbcLedgerDao {
sequentialWriteDao: SequentialWriteDao,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -829,6 +841,9 @@ private[platform] object JdbcLedgerDao {
servicesExecutionContext,
eventsPageSize,
eventsProcessingParallelism,
acsIdPageSize,
acsIdFetchingParallelism,
acsContractFetchingParallelism,
false,
metrics,
lfValueTranslationCache,
@ -851,6 +866,9 @@ private[platform] object JdbcLedgerDao {
sequentialWriteDao: SequentialWriteDao,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -869,6 +887,9 @@ private[platform] object JdbcLedgerDao {
servicesExecutionContext,
eventsPageSize,
eventsProcessingParallelism,
acsIdPageSize,
acsIdFetchingParallelism,
acsContractFetchingParallelism,
true,
metrics,
lfValueTranslationCache,

View File

@ -55,6 +55,9 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
protected def daoOwner(
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
errorFactories: ErrorFactories,
)(implicit
loggingContext: LoggingContext
@ -85,6 +88,9 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
),
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
acsIdPageSize = acsIdPageSize,
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
@ -120,7 +126,14 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
_ <- Resource.fromFuture(
new FlywayMigrations(jdbcUrl).migrate()
)
dao <- daoOwner(100, 4, errorFactories).acquire()
dao <- daoOwner(
eventsPageSize = 100,
eventsProcessingParallelism = 4,
acsIdPageSize = 2000,
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
errorFactories,
).acquire()
_ <- Resource.fromFuture(dao.initialize(TestLedgerId, TestParticipantId))
initialLedgerEnd <- Resource.fromFuture(dao.lookupLedgerEnd())
_ = ledgerEndCache.set(initialLedgerEnd.lastOffset -> initialLedgerEnd.lastEventSeqId)

View File

@ -36,6 +36,9 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
override protected def daoOwner(
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
errorFactories: ErrorFactories,
)(implicit
loggingContext: LoggingContext
@ -76,6 +79,9 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
),
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
acsIdPageSize = acsIdPageSize,
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,

View File

@ -503,7 +503,13 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid
// `pageSize = 2` and the offset gaps in the `commandWithOffsetGaps` above are to make sure
// that streaming works with event pages separated by offsets that don't have events in the store
ledgerDao <- createLedgerDao(pageSize = 2, eventsProcessingParallelism = 8)
ledgerDao <- createLedgerDao(
pageSize = 2,
eventsProcessingParallelism = 8,
acsIdPageSize = 2,
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
)
response <- ledgerDao.transactionsReader
.getFlatTransactions(
@ -633,11 +639,20 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid
): Vector[Transaction] =
responses.foldLeft(Vector.empty[Transaction])((b, a) => b ++ a._2.transactions.toVector)
private def createLedgerDao(pageSize: Int, eventsProcessingParallelism: Int) =
private def createLedgerDao(
pageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
) =
LoggingContext.newLoggingContext { implicit loggingContext =>
daoOwner(
eventsPageSize = pageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
acsIdPageSize = acsIdPageSize,
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
MockitoSugar.mock[ErrorFactories],
).acquire()(ResourceContext(executionContext))
}.asFuture

View File

@ -42,6 +42,9 @@ final case class Config[Extra](
maxDeduplicationDuration: Option[Duration],
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
stateValueCache: caching.WeightedCache.Configuration,
lfValueTranslationEventCache: caching.SizedCache.Configuration,
lfValueTranslationContractCache: caching.SizedCache.Configuration,
@ -76,6 +79,9 @@ object Config {
configurationLoadTimeout = Duration.ofSeconds(10),
eventsPageSize = IndexConfiguration.DefaultEventsPageSize,
eventsProcessingParallelism = IndexConfiguration.DefaultEventsProcessingParallelism,
acsIdPageSize = IndexConfiguration.DefaultAcsIdPageSize,
acsIdFetchingParallelism = IndexConfiguration.DefaultAcsIdFetchingParallelism,
acsContractFetchingParallelism = IndexConfiguration.DefaultAcsContractFetchingParallelism,
stateValueCache = caching.WeightedCache.Configuration.none,
lfValueTranslationEventCache = caching.SizedCache.Configuration.none,
lfValueTranslationContractCache = caching.SizedCache.Configuration.none,
@ -466,6 +472,43 @@ object Config {
config.copy(eventsProcessingParallelism = eventsProcessingParallelism)
)
opt[Int]("acs-id-page-size")
.optional()
.text(
s"Number of contract ids fetched from the index for every round trip when serving ACS calls. Default is ${IndexConfiguration.DefaultAcsIdPageSize}."
)
.validate { acsIdPageSize =>
if (acsIdPageSize > 0) Right(())
else Left("acs-id-page-size should be strictly positive")
}
.action((acsIdPageSize, config) => config.copy(acsIdPageSize = acsIdPageSize))
opt[Int]("acs-id-fetching-parallelism")
.optional()
.text(
s"Number of contract id pages fetched in parallel when serving ACS calls. Default is ${IndexConfiguration.DefaultAcsIdFetchingParallelism}."
)
.validate { acsIdFetchingParallelism =>
if (acsIdFetchingParallelism > 0) Right(())
else Left("acs-id-fetching-parallelism should be strictly positive")
}
.action((acsIdFetchingParallelism, config) =>
config.copy(acsIdFetchingParallelism = acsIdFetchingParallelism)
)
opt[Int]("acs-contract-fetching-parallelism")
.optional()
.text(
s"Number of event pages fetched in parallel when serving ACS calls. Default is ${IndexConfiguration.DefaultAcsContractFetchingParallelism}."
)
.validate { acsContractFetchingParallelism =>
if (acsContractFetchingParallelism > 0) Right(())
else Left("acs-contract-fetching-parallelism should be strictly positive")
}
.action((acsContractFetchingParallelism, config) =>
config.copy(acsContractFetchingParallelism = acsContractFetchingParallelism)
)
opt[Long]("max-state-value-cache-size")
.optional()
.text(

View File

@ -251,6 +251,9 @@ class RecoveringIndexerIntegrationSpec
dbDispatcher = dbDispatcher,
eventsPageSize = 100,
eventsProcessingParallelism = 8,
acsIdPageSize = 20000,
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,

View File

@ -328,6 +328,9 @@ final class SandboxServer(
templateStore = packageStore,
eventsPageSize = config.eventsPageSize,
eventsProcessingParallelism = config.eventsProcessingParallelism,
acsIdPageSize = config.acsIdPageSize,
acsIdFetchingParallelism = config.acsIdFetchingParallelism,
acsContractFetchingParallelism = config.acsContractFetchingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,

View File

@ -61,6 +61,9 @@ private[sandbox] object SandboxIndexAndWriteService {
templateStore: InMemoryPackageStore,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -88,6 +91,9 @@ private[sandbox] object SandboxIndexAndWriteService {
startMode = startMode,
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
acsIdPageSize = acsIdPageSize,
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,

View File

@ -94,6 +94,9 @@ private[sandbox] object SqlLedger {
startMode: SqlStartMode,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -318,6 +321,9 @@ private[sandbox] object SqlLedger {
),
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
acsIdPageSize = acsIdPageSize,
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,

View File

@ -95,6 +95,9 @@ private[sandbox] object LedgerResource {
startMode = SqlStartMode.ResetAndStart,
eventsPageSize = 100,
eventsProcessingParallelism = 8,
acsIdPageSize = 2000,
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
servicesExecutionContext = servicesExecutionContext,
metrics = new Metrics(metrics),
lfValueTranslationCache = LfValueTranslationCache.Cache.none,

View File

@ -441,6 +441,9 @@ final class SqlLedgerSpec
startMode = SqlStartMode.MigrateAndStart,
eventsPageSize = 100,
eventsProcessingParallelism = 8,
acsIdPageSize = 2000,
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
servicesExecutionContext = executionContext,
metrics = new Metrics(metrics),
lfValueTranslationCache = LfValueTranslationCache.Cache.none,

View File

@ -257,6 +257,31 @@ class CommonCliBase(name: LedgerName) {
config.copy(eventsProcessingParallelism = eventsProcessingParallelism)
)
opt[Int]("acs-id-page-size")
.optional()
.text(
s"Number of contract ids fetched from the index for every round trip when serving ACS calls. Default is ${SandboxConfig.DefaultAcsIdPageSize}."
)
.action((acsIdPageSize, config) => config.copy(acsIdPageSize = acsIdPageSize))
opt[Int]("acs-id-fetching-parallelism")
.optional()
.text(
s"Number of contract id pages fetched in parallel when serving ACS calls. Default is ${SandboxConfig.DefaultAcsIdFetchingParallelism}."
)
.action((acsIdFetchingParallelism, config) =>
config.copy(acsIdFetchingParallelism = acsIdFetchingParallelism)
)
opt[Int]("acs-contract-fetching-parallelism")
.optional()
.text(
s"Number of event pages fetched in parallel when serving ACS calls. Default is ${SandboxConfig.DefaultAcsContractFetchingParallelism}."
)
.action((acsContractFetchingParallelism, config) =>
config.copy(acsContractFetchingParallelism = acsContractFetchingParallelism)
)
opt[Int]("max-commands-in-flight")
.optional()
.action((value, config) =>

View File

@ -57,6 +57,9 @@ final case class SandboxConfig(
maxParallelSubmissions: Int, // only used by Sandbox Classic
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
lfValueTranslationEventCacheConfiguration: SizedCache.Configuration,
lfValueTranslationContractCacheConfiguration: SizedCache.Configuration,
profileDir: Option[Path],
@ -93,6 +96,9 @@ object SandboxConfig {
val DefaultEventsPageSize: Int = 1000
val DefaultEventsProcessingParallelism: Int = 8
val DefaultAcsIdPageSize: Int = 20000
val DefaultAcsIdFetchingParallelism: Int = 2
val DefaultAcsContractFetchingParallelism: Int = 2
val DefaultTimeProviderType: TimeProviderType = TimeProviderType.WallClock
@ -144,6 +150,9 @@ object SandboxConfig {
maxParallelSubmissions = 512,
eventsPageSize = DefaultEventsPageSize,
eventsProcessingParallelism = DefaultEventsProcessingParallelism,
acsIdPageSize = DefaultAcsIdPageSize,
acsIdFetchingParallelism = DefaultAcsIdFetchingParallelism,
acsContractFetchingParallelism = DefaultAcsContractFetchingParallelism,
lfValueTranslationEventCacheConfiguration = DefaultLfValueTranslationCacheConfiguration,
lfValueTranslationContractCacheConfiguration = DefaultLfValueTranslationCacheConfiguration,
profileDir = None,