From 512d869ecf3465f2d95993fb4a6aa4181a177f5a Mon Sep 17 00:00:00 2001 From: Sergey Kisel <98825453+skisel-da@users.noreply.github.com> Date: Wed, 9 Mar 2022 13:24:25 +0100 Subject: [PATCH] Make indexer benchmark independent of kvutils [DPP-946] (#13144) * Make indexer benchmark independent of kvutils [DPP-946] (#13144) changelog_begin changelog_end --- ledger/indexer-benchmark/BUILD.bazel | 8 +- .../scala/ledger/indexerbenchmark/Main.scala | 103 ++-------- .../indexerbenchmark/IndexerBenchmark.scala | 177 +++++------------- .../IndexerBenchmarkResult.scala | 91 +++++++++ .../indexer/ha/EndlessReadService.scala | 29 +-- .../ha/IndexerStabilityTestFixture.scala | 2 +- 6 files changed, 176 insertions(+), 234 deletions(-) create mode 100644 ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmarkResult.scala diff --git a/ledger/indexer-benchmark/BUILD.bazel b/ledger/indexer-benchmark/BUILD.bazel index 3b0c4158d8..c43928ba09 100644 --- a/ledger/indexer-benchmark/BUILD.bazel +++ b/ledger/indexer-benchmark/BUILD.bazel @@ -27,7 +27,6 @@ da_scala_library( ], deps = [ "//daml-lf/data", - "//language-support/scala/bindings", "//ledger/ledger-api-health", "//ledger/ledger-configuration", "//ledger/ledger-offset", @@ -55,16 +54,11 @@ da_scala_library( visibility = ["//visibility:public"], deps = [ ":indexer-benchmark-lib", - "//daml-lf/data", - "//language-support/scala/bindings", "//ledger/ledger-api-health", - "//ledger/ledger-configuration", "//ledger/ledger-offset", - "//ledger/metrics", + "//ledger/participant-integration-api:participant-integration-api-tests-lib", "//ledger/participant-state", - "//ledger/participant-state/kvutils", "//libs-scala/contextualized-logging", - "@maven//:io_dropwizard_metrics_metrics_core", ], ) diff --git a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala index 415fac27dc..1e597776e4 100644 --- a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala +++ b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala @@ -3,100 +3,35 @@ package com.daml.ledger.indexerbenchmark -import java.nio.file.{Files, Paths} -import java.util.concurrent.atomic.AtomicLong - import akka.NotUsed -import akka.actor.ActorSystem -import akka.stream.Materializer -import akka.stream.scaladsl.{Sink, Source} -import com.codahale.metrics.MetricRegistry -import com.daml.ledger.api.health.{HealthStatus, Healthy} -import com.daml.ledger.configuration.LedgerId +import akka.stream.scaladsl.Source import com.daml.ledger.offset.Offset -import com.daml.ledger.participant.state.kvutils.api.{ - KeyValueParticipantStateReader, - LedgerReader, - LedgerRecord, -} -import com.daml.ledger.participant.state.kvutils.export.ProtobufBasedLedgerDataImporter -import com.daml.ledger.participant.state.kvutils.{KVOffsetBuilder, Raw} import com.daml.ledger.participant.state.v2.Update import com.daml.logging.LoggingContext.newLoggingContext -import com.daml.metrics.Metrics +import com.daml.platform.indexer.ha.EndlessReadService -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.Future object Main { + def main(args: Array[String]): Unit = - IndexerBenchmark.runAndExit(args, name => loadLedgerExport(name)) - - private[this] def loadLedgerExport(config: Config): Future[Iterator[(Offset, Update)]] = { - val path = Paths.get(config.updateSource) - if (!Files.exists(path)) { - throw new RuntimeException(s"Input file $path does not exist") - } - val importer = ProtobufBasedLedgerDataImporter(path) - - val offsetBuilder = new KVOffsetBuilder(0) - val dataSource: Source[LedgerRecord, NotUsed] = Source - .fromIterator(() => importer.read().iterator) - .statefulMapConcat { () => - val nextOffset = new AtomicLong(0) - - { case (_, writeSet) => - writeSet.map { case (key, value) => - val offset = offsetBuilder.of(nextOffset.getAndIncrement()) - val logEntryId = Raw.LogEntryId(key.bytes) // `key` is of an unknown type. - LedgerRecord(offset, logEntryId, value) - } - } - } - - val keyValueSource = new LedgerReader { - override def events(offset: Option[Offset]): Source[LedgerRecord, NotUsed] = - if (offset.isDefined) { - Source.failed( - new IllegalArgumentException( - s"A read offset of $offset is not supported. Must be $None." - ) - ) - } else { - dataSource - } - - override def currentHealth(): HealthStatus = Healthy - - override def ledgerId(): LedgerId = IndexerBenchmark.LedgerId + Config.parse(args) match { + case Some(config) => + IndexerBenchmark.runAndExit(config, () => Future.successful(loadStateUpdates(config))) + case None => sys.exit(1) } - val metricRegistry = new MetricRegistry - val metrics = new Metrics(metricRegistry) - val keyValueStateReader = KeyValueParticipantStateReader( - keyValueSource, - metrics, - failOnUnexpectedEvent = false, - ) - - // Note: this method is doing quite a lot of work to transform a sequence of write sets - // to a sequence of state updates. - // Note: this method eagerly loads the whole ledger export and transforms it into an array of state updates. - // This will consume a lot of memory, but will avoid slowing down the indexer with write set decoding during - // the benchmark. - val system = ActorSystem("IndexerBenchmarkUpdateReader") - implicit val materializer: Materializer = Materializer(system) + /* Note: this is a stub implementation. It doesn't provide meaningful performance numbers because: + * - EndlessReadService is currently throttled, and the Akka throttle operator is a performance bottleneck + * - EndlessReadService generates very simple transaction shapes that are not representative of + * the load on a real ledger. + */ + private[this] def loadStateUpdates(config: Config): Source[(Offset, Update), NotUsed] = newLoggingContext { implicit loggingContext => - keyValueStateReader - .stateUpdates(None) - .take(config.updateCount.getOrElse(Long.MaxValue)) - .zipWithIndex - .map { case (data, index) => - if (index % 1000 == 0) println(s"Generated update $index") - data - } - .runWith(Sink.seq[(Offset, Update)]) - .map(seq => seq.iterator)(ExecutionContext.parasitic) - .andThen { case _ => system.terminate() }(ExecutionContext.parasitic) + val readService = new EndlessReadService(updatesPerSecond = 10, name = config.updateSource) + config.updateCount match { + case None => readService.stateUpdates(None) + case Some(count) => readService.stateUpdates(None).take(count) + } } - } } diff --git a/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala b/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala index 006c5da676..f982af0e8b 100644 --- a/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala +++ b/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala @@ -3,13 +3,11 @@ package com.daml.ledger.indexerbenchmark -import java.util.concurrent.{Executors, TimeUnit} - import akka.NotUsed import akka.actor.ActorSystem import akka.stream.Materializer import akka.stream.scaladsl.Source -import com.codahale.metrics.{MetricRegistry, Snapshot} +import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.health.{HealthStatus, Healthy} import com.daml.ledger.configuration.{Configuration, LedgerInitialConditions, LedgerTimeModel} import com.daml.ledger.offset.Offset @@ -19,12 +17,14 @@ import com.daml.lf.data.Time import com.daml.logging.LoggingContext import com.daml.logging.LoggingContext.newLoggingContext import com.daml.metrics.{JvmMetricSet, Metrics} -import com.daml.platform.indexer.{JdbcIndexer, StandaloneIndexerServer} +import com.daml.platform.indexer.{Indexer, JdbcIndexer, StandaloneIndexerServer} import com.daml.platform.store.LfValueTranslationCache +import com.daml.resources import com.daml.testing.postgresql.PostgresResource +import java.util.concurrent.{Executors, TimeUnit} import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future} import scala.io.StdIn class IndexerBenchmark() { @@ -35,39 +35,37 @@ class IndexerBenchmark() { * and functional tests. */ def runWithEphemeralPostgres( - createUpdates: Config => Future[Iterator[(Offset, Update)]], + createUpdates: () => Future[Source[(Offset, Update), NotUsed]], config: Config, - ): Future[Unit] = { + ): Future[Unit] = PostgresResource .owner() .use(db => { println(s"Running the indexer benchmark against the ephemeral Postgres database ${db.url}") run(createUpdates, config.copy(indexerConfig = config.indexerConfig.copy(jdbcUrl = db.url))) })(ExecutionContext.parasitic) - } def run( - createUpdates: Config => Future[Iterator[(Offset, Update)]], + createUpdates: () => Future[Source[(Offset, Update), NotUsed]], config: Config, ): Future[Unit] = { newLoggingContext { implicit loggingContext => - val metricRegistry = new MetricRegistry - val metrics = new Metrics(metricRegistry) + val metrics = new Metrics(new MetricRegistry) metrics.registry.registerAll(new JvmMetricSet) val system = ActorSystem("IndexerBenchmark") implicit val materializer: Materializer = Materializer(system) implicit val resourceContext: ResourceContext = ResourceContext(system.dispatcher) - val indexerE = Executors.newWorkStealingPool() - val indexerEC = ExecutionContext.fromExecutor(indexerE) + val indexerExecutor = Executors.newWorkStealingPool() + val indexerExecutionContext = ExecutionContext.fromExecutor(indexerExecutor) println("Generating state updates...") - val updates = Await.result(createUpdates(config), Duration(10, "minute")) + val updates = Await.result(createUpdates(), Duration(10, "minute")) println("Creating read service and indexer...") val readService = createReadService(updates) - val indexerFactory = new JdbcIndexer.Factory( + val indexerFactory: JdbcIndexer.Factory = new JdbcIndexer.Factory( config.indexerConfig, readService, metrics, @@ -75,110 +73,21 @@ class IndexerBenchmark() { ) val resource = for { - _ <- config.metricsReporter.fold(Resource.unit)(reporter => - ResourceOwner - .forCloseable(() => reporter.register(metrics.registry)) - .map(_.start(config.metricsReportingInterval.getSeconds, TimeUnit.SECONDS)) - .acquire() - ) - + _ <- metricsResource(config, metrics) _ = println("Setting up the index database...") - indexer <- Await - .result( - StandaloneIndexerServer - .migrateOnly( - jdbcUrl = config.indexerConfig.jdbcUrl - ) - .map(_ => indexerFactory.initialized())(indexerEC), - Duration(5, "minute"), - ) - .acquire() - + indexer <- indexer(config, indexerExecutionContext, indexerFactory) _ = println("Starting the indexing...") startTime = System.nanoTime() handle <- indexer.acquire() - _ <- Resource.fromFuture(handle) stopTime = System.nanoTime() _ = println("Indexing done.") - - _ = system.terminate() - _ = indexerE.shutdown() + _ <- Resource.fromFuture(system.terminate()) + _ = indexerExecutor.shutdown() } yield { - val duration: Double = (stopTime - startTime).toDouble / 1000000000.0 - val updates: Long = metrics.daml.parallelIndexer.updates.getCount - val updateRate: Double = updates / duration - val inputMappingDurationMetric = metrics.registry.timer( - MetricRegistry.name(metrics.daml.parallelIndexer.inputMapping.executor, "duration") - ) - val batchingDurationMetric = metrics.registry.timer( - MetricRegistry.name(metrics.daml.parallelIndexer.batching.executor, "duration") - ) - val (failure, minimumUpdateRateFailureInfo): (Boolean, String) = - config.minUpdateRate match { - case Some(requiredMinUpdateRate) if requiredMinUpdateRate > updateRate => - ( - true, - s"[failure][UpdateRate] Minimum number of updates per second: required: $requiredMinUpdateRate, metered: $updateRate", - ) - case _ => (false, "") - } - println( - s""" - |-------------------------------------------------------------------------------- - |Indexer benchmark results - |-------------------------------------------------------------------------------- - | - |Input: - | source: ${config.updateSource} - | count: ${config.updateCount} - | required updates/sec: ${config.minUpdateRate.getOrElse("-")} - | jdbcUrl: ${config.indexerConfig.jdbcUrl} - | - |Indexer parameters: - | maxInputBufferSize: ${config.indexerConfig.maxInputBufferSize} - | inputMappingParallelism: ${config.indexerConfig.inputMappingParallelism} - | ingestionParallelism: ${config.indexerConfig.ingestionParallelism} - | submissionBatchSize: ${config.indexerConfig.submissionBatchSize} - | batchWithinMillis: ${config.indexerConfig.batchWithinMillis} - | tailingRateLimitPerSecond: ${config.indexerConfig.tailingRateLimitPerSecond} - | full indexer config: ${config.indexerConfig} - | - |Result: - | duration: $duration - | updates: $updates - | updates/sec: $updateRate - | $minimumUpdateRateFailureInfo - | - |Other metrics: - | inputMapping.batchSize: ${histogramToString( - metrics.daml.parallelIndexer.inputMapping.batchSize.getSnapshot - )} - | inputMapping.duration: ${histogramToString( - inputMappingDurationMetric.getSnapshot - )} - | inputMapping.duration.rate: ${inputMappingDurationMetric.getMeanRate} - | batching.duration: ${histogramToString(batchingDurationMetric.getSnapshot)} - | batching.duration.rate: ${batchingDurationMetric.getMeanRate} - | seqMapping.duration: ${metrics.daml.parallelIndexer.seqMapping.duration.getSnapshot}| - | seqMapping.duration.rate: ${metrics.daml.parallelIndexer.seqMapping.duration.getMeanRate}| - | ingestion.duration: ${histogramToString( - metrics.daml.parallelIndexer.ingestion.executionTimer.getSnapshot - )} - | ingestion.duration.rate: ${metrics.daml.parallelIndexer.ingestion.executionTimer.getMeanRate} - | tailIngestion.duration: ${histogramToString( - metrics.daml.parallelIndexer.tailIngestion.executionTimer.getSnapshot - )} - | tailIngestion.duration.rate: ${metrics.daml.parallelIndexer.tailIngestion.executionTimer.getMeanRate} - | - |Notes: - | The above numbers include all ingested updates, including package uploads. - | Inspect the metrics using a metrics reporter to better investigate how - | the indexer performs. - | - |-------------------------------------------------------------------------------- - |""".stripMargin - ) + val result = new IndexerBenchmarkResult(config, metrics, startTime, stopTime) + + println(result.banner) // Note: this allows the user to inpsect the contents of an ephemeral database if (config.waitForUserInput) { @@ -186,15 +95,40 @@ class IndexerBenchmark() { StdIn.readLine("Press to terminate this process.") } - if (failure) throw new RuntimeException("Indexer Benchmark failure.") + if (result.failure) throw new RuntimeException("Indexer Benchmark failure.") () } resource.asFuture } } + private def indexer( + config: Config, + indexerExecutionContext: ExecutionContextExecutor, + indexerFactory: JdbcIndexer.Factory, + )(implicit + loggingContext: LoggingContext, + rc: ResourceContext, + ): resources.Resource[ResourceContext, Indexer] = + Await + .result( + StandaloneIndexerServer + .migrateOnly(jdbcUrl = config.indexerConfig.jdbcUrl) + .map(_ => indexerFactory.initialized())(indexerExecutionContext), + Duration(5, "minute"), + ) + .acquire() + + private def metricsResource(config: Config, metrics: Metrics)(implicit rc: ResourceContext) = + config.metricsReporter.fold(Resource.unit)(reporter => + ResourceOwner + .forCloseable(() => reporter.register(metrics.registry)) + .map(_.start(config.metricsReportingInterval.getSeconds, TimeUnit.SECONDS)) + .acquire() + ) + private[this] def createReadService( - updates: Iterator[(Offset, Update)] + updates: Source[(Offset, Update), NotUsed] ): ReadService = { val initialConditions = LedgerInitialConditions( IndexerBenchmark.LedgerId, @@ -215,33 +149,20 @@ class IndexerBenchmark() { beginAfter: Option[Offset] )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = { assert(beginAfter.isEmpty, s"beginAfter is $beginAfter") - Source.fromIterator(() => updates) + updates } override def currentHealth(): HealthStatus = Healthy } } - - private[this] def histogramToString(data: Snapshot): String = { - s"[min: ${data.getMin}, median: ${data.getMedian}, max: ${data.getMax}]" - } } object IndexerBenchmark { val LedgerId = "IndexerBenchmarkLedger" - def runAndExit( - args: Array[String], - updates: Config => Future[Iterator[(Offset, Update)]], - ): Unit = - Config.parse(args) match { - case Some(config) => IndexerBenchmark.runAndExit(config, updates) - case None => sys.exit(1) - } - def runAndExit( config: Config, - updates: Config => Future[Iterator[(Offset, Update)]], + updates: () => Future[Source[(Offset, Update), NotUsed]], ): Unit = { val result: Future[Unit] = (if (config.indexerConfig.jdbcUrl.isEmpty) { diff --git a/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmarkResult.scala b/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmarkResult.scala new file mode 100644 index 0000000000..262ba17f4a --- /dev/null +++ b/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmarkResult.scala @@ -0,0 +1,91 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.indexerbenchmark + +import com.codahale.metrics.{MetricRegistry, Snapshot} +import com.daml.metrics.Metrics + +class IndexerBenchmarkResult(config: Config, metrics: Metrics, startTime: Long, stopTime: Long) { + + private val duration: Double = (stopTime - startTime).toDouble / 1000000000.0 + private val updates: Long = metrics.daml.parallelIndexer.updates.getCount + private val updateRate: Double = updates / duration + private val inputMappingDurationMetric = metrics.registry.timer( + MetricRegistry.name(metrics.daml.parallelIndexer.inputMapping.executor, "duration") + ) + private val batchingDurationMetric = metrics.registry.timer( + MetricRegistry.name(metrics.daml.parallelIndexer.batching.executor, "duration") + ) + val (failure, minimumUpdateRateFailureInfo): (Boolean, String) = + config.minUpdateRate match { + case Some(requiredMinUpdateRate) if requiredMinUpdateRate > updateRate => + ( + true, + s"[failure][UpdateRate] Minimum number of updates per second: required: $requiredMinUpdateRate, metered: $updateRate", + ) + case _ => (false, "") + } + + val banner = + s""" + |-------------------------------------------------------------------------------- + |Indexer benchmark results + |-------------------------------------------------------------------------------- + | + |Input: + | source: ${config.updateSource} + | count: ${config.updateCount} + | required updates/sec: ${config.minUpdateRate.getOrElse("-")} + | jdbcUrl: ${config.indexerConfig.jdbcUrl} + | + |Indexer parameters: + | maxInputBufferSize: ${config.indexerConfig.maxInputBufferSize} + | inputMappingParallelism: ${config.indexerConfig.inputMappingParallelism} + | ingestionParallelism: ${config.indexerConfig.ingestionParallelism} + | submissionBatchSize: ${config.indexerConfig.submissionBatchSize} + | batchWithinMillis: ${config.indexerConfig.batchWithinMillis} + | tailingRateLimitPerSecond: ${config.indexerConfig.tailingRateLimitPerSecond} + | full indexer config: ${config.indexerConfig} + | + |Result: + | duration: $duration + | updates: $updates + | updates/sec: $updateRate + | $minimumUpdateRateFailureInfo + | + |Other metrics: + | inputMapping.batchSize: ${histogramToString( + metrics.daml.parallelIndexer.inputMapping.batchSize.getSnapshot + )} + | inputMapping.duration: ${histogramToString( + inputMappingDurationMetric.getSnapshot + )} + | inputMapping.duration.rate: ${inputMappingDurationMetric.getMeanRate} + | batching.duration: ${histogramToString(batchingDurationMetric.getSnapshot)} + | batching.duration.rate: ${batchingDurationMetric.getMeanRate} + | seqMapping.duration: ${histogramToString( + metrics.daml.parallelIndexer.seqMapping.duration.getSnapshot + )}| + | seqMapping.duration.rate: ${metrics.daml.parallelIndexer.seqMapping.duration.getMeanRate}| + | ingestion.duration: ${histogramToString( + metrics.daml.parallelIndexer.ingestion.executionTimer.getSnapshot + )} + | ingestion.duration.rate: ${metrics.daml.parallelIndexer.ingestion.executionTimer.getMeanRate} + | tailIngestion.duration: ${histogramToString( + metrics.daml.parallelIndexer.tailIngestion.executionTimer.getSnapshot + )} + | tailIngestion.duration.rate: ${metrics.daml.parallelIndexer.tailIngestion.executionTimer.getMeanRate} + | + |Notes: + | The above numbers include all ingested updates, including package uploads. + | Inspect the metrics using a metrics reporter to better investigate how + | the indexer performs. + | + |-------------------------------------------------------------------------------- + |""".stripMargin + + private[this] def histogramToString(data: Snapshot): String = { + s"[min: ${data.getMin}, median: ${data.getMedian}, max: ${data.getMax}]" + } +} diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/EndlessReadService.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/EndlessReadService.scala index 47bca0092c..da3cd50290 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/EndlessReadService.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/EndlessReadService.scala @@ -3,16 +3,15 @@ package com.daml.platform.indexer.ha -import java.time.Instant import akka.NotUsed -import akka.stream.{KillSwitches, SharedKillSwitch} +import akka.stream.KillSwitches import akka.stream.scaladsl.Source import com.daml.daml_lf_dev.DamlLf import com.daml.ledger.api.health.HealthStatus -import com.daml.lf.crypto import com.daml.ledger.configuration.{Configuration, LedgerId, LedgerInitialConditions} import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.v2.{CompletionInfo, ReadService, TransactionMeta, Update} +import com.daml.lf.crypto import com.daml.lf.data.Ref import com.daml.lf.data.Time.Timestamp import com.daml.lf.transaction.CommittedTransaction @@ -21,8 +20,7 @@ import com.daml.lf.value.Value import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.google.protobuf.ByteString -import java.util.concurrent.atomic.AtomicInteger -import scala.concurrent.duration._ +import java.time.Instant /** An infinite stream of state updates that fully conforms to the Daml ledger model. * @@ -46,7 +44,7 @@ case class EndlessReadService( override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = synchronized { logger.info("EndlessReadService.ledgerInitialConditions() called") - initialConditionCalls.incrementAndGet() + initialConditionCalls += 1 Source .single(LedgerInitialConditions(ledgerId, configuration, recordTime(0))) .via(killSwitch.flow) @@ -66,11 +64,10 @@ case class EndlessReadService( )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = synchronized { logger.info(s"EndlessReadService.stateUpdates($beginAfter) called") - stateUpdatesCalls.incrementAndGet() + stateUpdatesCalls += 1 val startIndex: Int = beginAfter.map(index).getOrElse(0) + 1 Source .fromIterator(() => Iterator.from(startIndex)) - .throttle(updatesPerSecond, 1.second) .map { case i @ 1 => offset(i) -> Update.ConfigurationChanged( @@ -127,8 +124,8 @@ case class EndlessReadService( def reset(): Unit = synchronized { assert(aborted) logger.info(s"EndlessReadService.reset() called") - stateUpdatesCalls.set(0) - initialConditionCalls.set(0) + stateUpdatesCalls = 0 + initialConditionCalls = 0 aborted = false killSwitch = KillSwitches.shared("EndlessReadService") } @@ -138,10 +135,14 @@ case class EndlessReadService( killSwitch.shutdown() } - val stateUpdatesCalls: AtomicInteger = new AtomicInteger(0) - val initialConditionCalls: AtomicInteger = new AtomicInteger(0) - var aborted: Boolean = false - private var killSwitch: SharedKillSwitch = KillSwitches.shared("EndlessReadService") + def isRunning: Boolean = synchronized { + stateUpdatesCalls > 0 && !aborted + } + + private var stateUpdatesCalls: Int = 0 + private var initialConditionCalls: Int = 0 + private var aborted: Boolean = false + private var killSwitch = KillSwitches.shared("EndlessReadService") } object EndlessReadService { diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/IndexerStabilityTestFixture.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/IndexerStabilityTestFixture.scala index 9bd6c98234..496a88ca9f 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/IndexerStabilityTestFixture.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/IndexerStabilityTestFixture.scala @@ -27,7 +27,7 @@ case class ReadServiceAndIndexer( case class Indexers(indexers: List[ReadServiceAndIndexer]) { // The list of all indexers that are running (determined by whether they have subscribed to the read service) def runningIndexers: List[ReadServiceAndIndexer] = - indexers.filter(x => x.readService.stateUpdatesCalls.get() > 0 && !x.readService.aborted) + indexers.filter(_.readService.isRunning) def resetAll(): Unit = indexers.foreach(_.readService.reset()) }