Make indexer benchmark independent of kvutils [DPP-946] (#13144)

*  Make indexer benchmark independent of kvutils [DPP-946]  (#13144)

changelog_begin
changelog_end
This commit is contained in:
Sergey Kisel 2022-03-09 13:24:25 +01:00 committed by GitHub
parent 212bc9fdf3
commit 512d869ecf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 176 additions and 234 deletions

View File

@ -27,7 +27,6 @@ da_scala_library(
], ],
deps = [ deps = [
"//daml-lf/data", "//daml-lf/data",
"//language-support/scala/bindings",
"//ledger/ledger-api-health", "//ledger/ledger-api-health",
"//ledger/ledger-configuration", "//ledger/ledger-configuration",
"//ledger/ledger-offset", "//ledger/ledger-offset",
@ -55,16 +54,11 @@ da_scala_library(
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
":indexer-benchmark-lib", ":indexer-benchmark-lib",
"//daml-lf/data",
"//language-support/scala/bindings",
"//ledger/ledger-api-health", "//ledger/ledger-api-health",
"//ledger/ledger-configuration",
"//ledger/ledger-offset", "//ledger/ledger-offset",
"//ledger/metrics", "//ledger/participant-integration-api:participant-integration-api-tests-lib",
"//ledger/participant-state", "//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//libs-scala/contextualized-logging", "//libs-scala/contextualized-logging",
"@maven//:io_dropwizard_metrics_metrics_core",
], ],
) )

View File

@ -3,100 +3,35 @@
package com.daml.ledger.indexerbenchmark package com.daml.ledger.indexerbenchmark
import java.nio.file.{Files, Paths}
import java.util.concurrent.atomic.AtomicLong
import akka.NotUsed import akka.NotUsed
import akka.actor.ActorSystem import akka.stream.scaladsl.Source
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.configuration.LedgerId
import com.daml.ledger.offset.Offset import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.kvutils.api.{
KeyValueParticipantStateReader,
LedgerReader,
LedgerRecord,
}
import com.daml.ledger.participant.state.kvutils.export.ProtobufBasedLedgerDataImporter
import com.daml.ledger.participant.state.kvutils.{KVOffsetBuilder, Raw}
import com.daml.ledger.participant.state.v2.Update import com.daml.ledger.participant.state.v2.Update
import com.daml.logging.LoggingContext.newLoggingContext import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.Metrics import com.daml.platform.indexer.ha.EndlessReadService
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.Future
object Main { object Main {
def main(args: Array[String]): Unit = def main(args: Array[String]): Unit =
IndexerBenchmark.runAndExit(args, name => loadLedgerExport(name)) Config.parse(args) match {
case Some(config) =>
private[this] def loadLedgerExport(config: Config): Future[Iterator[(Offset, Update)]] = { IndexerBenchmark.runAndExit(config, () => Future.successful(loadStateUpdates(config)))
val path = Paths.get(config.updateSource) case None => sys.exit(1)
if (!Files.exists(path)) {
throw new RuntimeException(s"Input file $path does not exist")
}
val importer = ProtobufBasedLedgerDataImporter(path)
val offsetBuilder = new KVOffsetBuilder(0)
val dataSource: Source[LedgerRecord, NotUsed] = Source
.fromIterator(() => importer.read().iterator)
.statefulMapConcat { () =>
val nextOffset = new AtomicLong(0)
{ case (_, writeSet) =>
writeSet.map { case (key, value) =>
val offset = offsetBuilder.of(nextOffset.getAndIncrement())
val logEntryId = Raw.LogEntryId(key.bytes) // `key` is of an unknown type.
LedgerRecord(offset, logEntryId, value)
}
}
}
val keyValueSource = new LedgerReader {
override def events(offset: Option[Offset]): Source[LedgerRecord, NotUsed] =
if (offset.isDefined) {
Source.failed(
new IllegalArgumentException(
s"A read offset of $offset is not supported. Must be $None."
)
)
} else {
dataSource
}
override def currentHealth(): HealthStatus = Healthy
override def ledgerId(): LedgerId = IndexerBenchmark.LedgerId
} }
val metricRegistry = new MetricRegistry /* Note: this is a stub implementation. It doesn't provide meaningful performance numbers because:
val metrics = new Metrics(metricRegistry) * - EndlessReadService is currently throttled, and the Akka throttle operator is a performance bottleneck
val keyValueStateReader = KeyValueParticipantStateReader( * - EndlessReadService generates very simple transaction shapes that are not representative of
keyValueSource, * the load on a real ledger.
metrics, */
failOnUnexpectedEvent = false, private[this] def loadStateUpdates(config: Config): Source[(Offset, Update), NotUsed] =
)
// Note: this method is doing quite a lot of work to transform a sequence of write sets
// to a sequence of state updates.
// Note: this method eagerly loads the whole ledger export and transforms it into an array of state updates.
// This will consume a lot of memory, but will avoid slowing down the indexer with write set decoding during
// the benchmark.
val system = ActorSystem("IndexerBenchmarkUpdateReader")
implicit val materializer: Materializer = Materializer(system)
newLoggingContext { implicit loggingContext => newLoggingContext { implicit loggingContext =>
keyValueStateReader val readService = new EndlessReadService(updatesPerSecond = 10, name = config.updateSource)
.stateUpdates(None) config.updateCount match {
.take(config.updateCount.getOrElse(Long.MaxValue)) case None => readService.stateUpdates(None)
.zipWithIndex case Some(count) => readService.stateUpdates(None).take(count)
.map { case (data, index) => }
if (index % 1000 == 0) println(s"Generated update $index")
data
}
.runWith(Sink.seq[(Offset, Update)])
.map(seq => seq.iterator)(ExecutionContext.parasitic)
.andThen { case _ => system.terminate() }(ExecutionContext.parasitic)
} }
}
} }

View File

@ -3,13 +3,11 @@
package com.daml.ledger.indexerbenchmark package com.daml.ledger.indexerbenchmark
import java.util.concurrent.{Executors, TimeUnit}
import akka.NotUsed import akka.NotUsed
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.stream.Materializer import akka.stream.Materializer
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import com.codahale.metrics.{MetricRegistry, Snapshot} import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.health.{HealthStatus, Healthy} import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.configuration.{Configuration, LedgerInitialConditions, LedgerTimeModel} import com.daml.ledger.configuration.{Configuration, LedgerInitialConditions, LedgerTimeModel}
import com.daml.ledger.offset.Offset import com.daml.ledger.offset.Offset
@ -19,12 +17,14 @@ import com.daml.lf.data.Time
import com.daml.logging.LoggingContext import com.daml.logging.LoggingContext
import com.daml.logging.LoggingContext.newLoggingContext import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.{JvmMetricSet, Metrics} import com.daml.metrics.{JvmMetricSet, Metrics}
import com.daml.platform.indexer.{JdbcIndexer, StandaloneIndexerServer} import com.daml.platform.indexer.{Indexer, JdbcIndexer, StandaloneIndexerServer}
import com.daml.platform.store.LfValueTranslationCache import com.daml.platform.store.LfValueTranslationCache
import com.daml.resources
import com.daml.testing.postgresql.PostgresResource import com.daml.testing.postgresql.PostgresResource
import java.util.concurrent.{Executors, TimeUnit}
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
import scala.io.StdIn import scala.io.StdIn
class IndexerBenchmark() { class IndexerBenchmark() {
@ -35,39 +35,37 @@ class IndexerBenchmark() {
* and functional tests. * and functional tests.
*/ */
def runWithEphemeralPostgres( def runWithEphemeralPostgres(
createUpdates: Config => Future[Iterator[(Offset, Update)]], createUpdates: () => Future[Source[(Offset, Update), NotUsed]],
config: Config, config: Config,
): Future[Unit] = { ): Future[Unit] =
PostgresResource PostgresResource
.owner() .owner()
.use(db => { .use(db => {
println(s"Running the indexer benchmark against the ephemeral Postgres database ${db.url}") println(s"Running the indexer benchmark against the ephemeral Postgres database ${db.url}")
run(createUpdates, config.copy(indexerConfig = config.indexerConfig.copy(jdbcUrl = db.url))) run(createUpdates, config.copy(indexerConfig = config.indexerConfig.copy(jdbcUrl = db.url)))
})(ExecutionContext.parasitic) })(ExecutionContext.parasitic)
}
def run( def run(
createUpdates: Config => Future[Iterator[(Offset, Update)]], createUpdates: () => Future[Source[(Offset, Update), NotUsed]],
config: Config, config: Config,
): Future[Unit] = { ): Future[Unit] = {
newLoggingContext { implicit loggingContext => newLoggingContext { implicit loggingContext =>
val metricRegistry = new MetricRegistry val metrics = new Metrics(new MetricRegistry)
val metrics = new Metrics(metricRegistry)
metrics.registry.registerAll(new JvmMetricSet) metrics.registry.registerAll(new JvmMetricSet)
val system = ActorSystem("IndexerBenchmark") val system = ActorSystem("IndexerBenchmark")
implicit val materializer: Materializer = Materializer(system) implicit val materializer: Materializer = Materializer(system)
implicit val resourceContext: ResourceContext = ResourceContext(system.dispatcher) implicit val resourceContext: ResourceContext = ResourceContext(system.dispatcher)
val indexerE = Executors.newWorkStealingPool() val indexerExecutor = Executors.newWorkStealingPool()
val indexerEC = ExecutionContext.fromExecutor(indexerE) val indexerExecutionContext = ExecutionContext.fromExecutor(indexerExecutor)
println("Generating state updates...") println("Generating state updates...")
val updates = Await.result(createUpdates(config), Duration(10, "minute")) val updates = Await.result(createUpdates(), Duration(10, "minute"))
println("Creating read service and indexer...") println("Creating read service and indexer...")
val readService = createReadService(updates) val readService = createReadService(updates)
val indexerFactory = new JdbcIndexer.Factory( val indexerFactory: JdbcIndexer.Factory = new JdbcIndexer.Factory(
config.indexerConfig, config.indexerConfig,
readService, readService,
metrics, metrics,
@ -75,110 +73,21 @@ class IndexerBenchmark() {
) )
val resource = for { val resource = for {
_ <- config.metricsReporter.fold(Resource.unit)(reporter => _ <- metricsResource(config, metrics)
ResourceOwner
.forCloseable(() => reporter.register(metrics.registry))
.map(_.start(config.metricsReportingInterval.getSeconds, TimeUnit.SECONDS))
.acquire()
)
_ = println("Setting up the index database...") _ = println("Setting up the index database...")
indexer <- Await indexer <- indexer(config, indexerExecutionContext, indexerFactory)
.result(
StandaloneIndexerServer
.migrateOnly(
jdbcUrl = config.indexerConfig.jdbcUrl
)
.map(_ => indexerFactory.initialized())(indexerEC),
Duration(5, "minute"),
)
.acquire()
_ = println("Starting the indexing...") _ = println("Starting the indexing...")
startTime = System.nanoTime() startTime = System.nanoTime()
handle <- indexer.acquire() handle <- indexer.acquire()
_ <- Resource.fromFuture(handle) _ <- Resource.fromFuture(handle)
stopTime = System.nanoTime() stopTime = System.nanoTime()
_ = println("Indexing done.") _ = println("Indexing done.")
_ <- Resource.fromFuture(system.terminate())
_ = system.terminate() _ = indexerExecutor.shutdown()
_ = indexerE.shutdown()
} yield { } yield {
val duration: Double = (stopTime - startTime).toDouble / 1000000000.0 val result = new IndexerBenchmarkResult(config, metrics, startTime, stopTime)
val updates: Long = metrics.daml.parallelIndexer.updates.getCount
val updateRate: Double = updates / duration println(result.banner)
val inputMappingDurationMetric = metrics.registry.timer(
MetricRegistry.name(metrics.daml.parallelIndexer.inputMapping.executor, "duration")
)
val batchingDurationMetric = metrics.registry.timer(
MetricRegistry.name(metrics.daml.parallelIndexer.batching.executor, "duration")
)
val (failure, minimumUpdateRateFailureInfo): (Boolean, String) =
config.minUpdateRate match {
case Some(requiredMinUpdateRate) if requiredMinUpdateRate > updateRate =>
(
true,
s"[failure][UpdateRate] Minimum number of updates per second: required: $requiredMinUpdateRate, metered: $updateRate",
)
case _ => (false, "")
}
println(
s"""
|--------------------------------------------------------------------------------
|Indexer benchmark results
|--------------------------------------------------------------------------------
|
|Input:
| source: ${config.updateSource}
| count: ${config.updateCount}
| required updates/sec: ${config.minUpdateRate.getOrElse("-")}
| jdbcUrl: ${config.indexerConfig.jdbcUrl}
|
|Indexer parameters:
| maxInputBufferSize: ${config.indexerConfig.maxInputBufferSize}
| inputMappingParallelism: ${config.indexerConfig.inputMappingParallelism}
| ingestionParallelism: ${config.indexerConfig.ingestionParallelism}
| submissionBatchSize: ${config.indexerConfig.submissionBatchSize}
| batchWithinMillis: ${config.indexerConfig.batchWithinMillis}
| tailingRateLimitPerSecond: ${config.indexerConfig.tailingRateLimitPerSecond}
| full indexer config: ${config.indexerConfig}
|
|Result:
| duration: $duration
| updates: $updates
| updates/sec: $updateRate
| $minimumUpdateRateFailureInfo
|
|Other metrics:
| inputMapping.batchSize: ${histogramToString(
metrics.daml.parallelIndexer.inputMapping.batchSize.getSnapshot
)}
| inputMapping.duration: ${histogramToString(
inputMappingDurationMetric.getSnapshot
)}
| inputMapping.duration.rate: ${inputMappingDurationMetric.getMeanRate}
| batching.duration: ${histogramToString(batchingDurationMetric.getSnapshot)}
| batching.duration.rate: ${batchingDurationMetric.getMeanRate}
| seqMapping.duration: ${metrics.daml.parallelIndexer.seqMapping.duration.getSnapshot}|
| seqMapping.duration.rate: ${metrics.daml.parallelIndexer.seqMapping.duration.getMeanRate}|
| ingestion.duration: ${histogramToString(
metrics.daml.parallelIndexer.ingestion.executionTimer.getSnapshot
)}
| ingestion.duration.rate: ${metrics.daml.parallelIndexer.ingestion.executionTimer.getMeanRate}
| tailIngestion.duration: ${histogramToString(
metrics.daml.parallelIndexer.tailIngestion.executionTimer.getSnapshot
)}
| tailIngestion.duration.rate: ${metrics.daml.parallelIndexer.tailIngestion.executionTimer.getMeanRate}
|
|Notes:
| The above numbers include all ingested updates, including package uploads.
| Inspect the metrics using a metrics reporter to better investigate how
| the indexer performs.
|
|--------------------------------------------------------------------------------
|""".stripMargin
)
// Note: this allows the user to inpsect the contents of an ephemeral database // Note: this allows the user to inpsect the contents of an ephemeral database
if (config.waitForUserInput) { if (config.waitForUserInput) {
@ -186,15 +95,40 @@ class IndexerBenchmark() {
StdIn.readLine("Press <enter> to terminate this process.") StdIn.readLine("Press <enter> to terminate this process.")
} }
if (failure) throw new RuntimeException("Indexer Benchmark failure.") if (result.failure) throw new RuntimeException("Indexer Benchmark failure.")
() ()
} }
resource.asFuture resource.asFuture
} }
} }
private def indexer(
config: Config,
indexerExecutionContext: ExecutionContextExecutor,
indexerFactory: JdbcIndexer.Factory,
)(implicit
loggingContext: LoggingContext,
rc: ResourceContext,
): resources.Resource[ResourceContext, Indexer] =
Await
.result(
StandaloneIndexerServer
.migrateOnly(jdbcUrl = config.indexerConfig.jdbcUrl)
.map(_ => indexerFactory.initialized())(indexerExecutionContext),
Duration(5, "minute"),
)
.acquire()
private def metricsResource(config: Config, metrics: Metrics)(implicit rc: ResourceContext) =
config.metricsReporter.fold(Resource.unit)(reporter =>
ResourceOwner
.forCloseable(() => reporter.register(metrics.registry))
.map(_.start(config.metricsReportingInterval.getSeconds, TimeUnit.SECONDS))
.acquire()
)
private[this] def createReadService( private[this] def createReadService(
updates: Iterator[(Offset, Update)] updates: Source[(Offset, Update), NotUsed]
): ReadService = { ): ReadService = {
val initialConditions = LedgerInitialConditions( val initialConditions = LedgerInitialConditions(
IndexerBenchmark.LedgerId, IndexerBenchmark.LedgerId,
@ -215,33 +149,20 @@ class IndexerBenchmark() {
beginAfter: Option[Offset] beginAfter: Option[Offset]
)(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = { )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = {
assert(beginAfter.isEmpty, s"beginAfter is $beginAfter") assert(beginAfter.isEmpty, s"beginAfter is $beginAfter")
Source.fromIterator(() => updates) updates
} }
override def currentHealth(): HealthStatus = Healthy override def currentHealth(): HealthStatus = Healthy
} }
} }
private[this] def histogramToString(data: Snapshot): String = {
s"[min: ${data.getMin}, median: ${data.getMedian}, max: ${data.getMax}]"
}
} }
object IndexerBenchmark { object IndexerBenchmark {
val LedgerId = "IndexerBenchmarkLedger" val LedgerId = "IndexerBenchmarkLedger"
def runAndExit(
args: Array[String],
updates: Config => Future[Iterator[(Offset, Update)]],
): Unit =
Config.parse(args) match {
case Some(config) => IndexerBenchmark.runAndExit(config, updates)
case None => sys.exit(1)
}
def runAndExit( def runAndExit(
config: Config, config: Config,
updates: Config => Future[Iterator[(Offset, Update)]], updates: () => Future[Source[(Offset, Update), NotUsed]],
): Unit = { ): Unit = {
val result: Future[Unit] = val result: Future[Unit] =
(if (config.indexerConfig.jdbcUrl.isEmpty) { (if (config.indexerConfig.jdbcUrl.isEmpty) {

View File

@ -0,0 +1,91 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.indexerbenchmark
import com.codahale.metrics.{MetricRegistry, Snapshot}
import com.daml.metrics.Metrics
class IndexerBenchmarkResult(config: Config, metrics: Metrics, startTime: Long, stopTime: Long) {
private val duration: Double = (stopTime - startTime).toDouble / 1000000000.0
private val updates: Long = metrics.daml.parallelIndexer.updates.getCount
private val updateRate: Double = updates / duration
private val inputMappingDurationMetric = metrics.registry.timer(
MetricRegistry.name(metrics.daml.parallelIndexer.inputMapping.executor, "duration")
)
private val batchingDurationMetric = metrics.registry.timer(
MetricRegistry.name(metrics.daml.parallelIndexer.batching.executor, "duration")
)
val (failure, minimumUpdateRateFailureInfo): (Boolean, String) =
config.minUpdateRate match {
case Some(requiredMinUpdateRate) if requiredMinUpdateRate > updateRate =>
(
true,
s"[failure][UpdateRate] Minimum number of updates per second: required: $requiredMinUpdateRate, metered: $updateRate",
)
case _ => (false, "")
}
val banner =
s"""
|--------------------------------------------------------------------------------
|Indexer benchmark results
|--------------------------------------------------------------------------------
|
|Input:
| source: ${config.updateSource}
| count: ${config.updateCount}
| required updates/sec: ${config.minUpdateRate.getOrElse("-")}
| jdbcUrl: ${config.indexerConfig.jdbcUrl}
|
|Indexer parameters:
| maxInputBufferSize: ${config.indexerConfig.maxInputBufferSize}
| inputMappingParallelism: ${config.indexerConfig.inputMappingParallelism}
| ingestionParallelism: ${config.indexerConfig.ingestionParallelism}
| submissionBatchSize: ${config.indexerConfig.submissionBatchSize}
| batchWithinMillis: ${config.indexerConfig.batchWithinMillis}
| tailingRateLimitPerSecond: ${config.indexerConfig.tailingRateLimitPerSecond}
| full indexer config: ${config.indexerConfig}
|
|Result:
| duration: $duration
| updates: $updates
| updates/sec: $updateRate
| $minimumUpdateRateFailureInfo
|
|Other metrics:
| inputMapping.batchSize: ${histogramToString(
metrics.daml.parallelIndexer.inputMapping.batchSize.getSnapshot
)}
| inputMapping.duration: ${histogramToString(
inputMappingDurationMetric.getSnapshot
)}
| inputMapping.duration.rate: ${inputMappingDurationMetric.getMeanRate}
| batching.duration: ${histogramToString(batchingDurationMetric.getSnapshot)}
| batching.duration.rate: ${batchingDurationMetric.getMeanRate}
| seqMapping.duration: ${histogramToString(
metrics.daml.parallelIndexer.seqMapping.duration.getSnapshot
)}|
| seqMapping.duration.rate: ${metrics.daml.parallelIndexer.seqMapping.duration.getMeanRate}|
| ingestion.duration: ${histogramToString(
metrics.daml.parallelIndexer.ingestion.executionTimer.getSnapshot
)}
| ingestion.duration.rate: ${metrics.daml.parallelIndexer.ingestion.executionTimer.getMeanRate}
| tailIngestion.duration: ${histogramToString(
metrics.daml.parallelIndexer.tailIngestion.executionTimer.getSnapshot
)}
| tailIngestion.duration.rate: ${metrics.daml.parallelIndexer.tailIngestion.executionTimer.getMeanRate}
|
|Notes:
| The above numbers include all ingested updates, including package uploads.
| Inspect the metrics using a metrics reporter to better investigate how
| the indexer performs.
|
|--------------------------------------------------------------------------------
|""".stripMargin
private[this] def histogramToString(data: Snapshot): String = {
s"[min: ${data.getMin}, median: ${data.getMedian}, max: ${data.getMax}]"
}
}

View File

@ -3,16 +3,15 @@
package com.daml.platform.indexer.ha package com.daml.platform.indexer.ha
import java.time.Instant
import akka.NotUsed import akka.NotUsed
import akka.stream.{KillSwitches, SharedKillSwitch} import akka.stream.KillSwitches
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.health.HealthStatus import com.daml.ledger.api.health.HealthStatus
import com.daml.lf.crypto
import com.daml.ledger.configuration.{Configuration, LedgerId, LedgerInitialConditions} import com.daml.ledger.configuration.{Configuration, LedgerId, LedgerInitialConditions}
import com.daml.ledger.offset.Offset import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v2.{CompletionInfo, ReadService, TransactionMeta, Update} import com.daml.ledger.participant.state.v2.{CompletionInfo, ReadService, TransactionMeta, Update}
import com.daml.lf.crypto
import com.daml.lf.data.Ref import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp import com.daml.lf.data.Time.Timestamp
import com.daml.lf.transaction.CommittedTransaction import com.daml.lf.transaction.CommittedTransaction
@ -21,8 +20,7 @@ import com.daml.lf.value.Value
import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.google.protobuf.ByteString import com.google.protobuf.ByteString
import java.util.concurrent.atomic.AtomicInteger import java.time.Instant
import scala.concurrent.duration._
/** An infinite stream of state updates that fully conforms to the Daml ledger model. /** An infinite stream of state updates that fully conforms to the Daml ledger model.
* *
@ -46,7 +44,7 @@ case class EndlessReadService(
override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = synchronized { override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = synchronized {
logger.info("EndlessReadService.ledgerInitialConditions() called") logger.info("EndlessReadService.ledgerInitialConditions() called")
initialConditionCalls.incrementAndGet() initialConditionCalls += 1
Source Source
.single(LedgerInitialConditions(ledgerId, configuration, recordTime(0))) .single(LedgerInitialConditions(ledgerId, configuration, recordTime(0)))
.via(killSwitch.flow) .via(killSwitch.flow)
@ -66,11 +64,10 @@ case class EndlessReadService(
)(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] =
synchronized { synchronized {
logger.info(s"EndlessReadService.stateUpdates($beginAfter) called") logger.info(s"EndlessReadService.stateUpdates($beginAfter) called")
stateUpdatesCalls.incrementAndGet() stateUpdatesCalls += 1
val startIndex: Int = beginAfter.map(index).getOrElse(0) + 1 val startIndex: Int = beginAfter.map(index).getOrElse(0) + 1
Source Source
.fromIterator(() => Iterator.from(startIndex)) .fromIterator(() => Iterator.from(startIndex))
.throttle(updatesPerSecond, 1.second)
.map { .map {
case i @ 1 => case i @ 1 =>
offset(i) -> Update.ConfigurationChanged( offset(i) -> Update.ConfigurationChanged(
@ -127,8 +124,8 @@ case class EndlessReadService(
def reset(): Unit = synchronized { def reset(): Unit = synchronized {
assert(aborted) assert(aborted)
logger.info(s"EndlessReadService.reset() called") logger.info(s"EndlessReadService.reset() called")
stateUpdatesCalls.set(0) stateUpdatesCalls = 0
initialConditionCalls.set(0) initialConditionCalls = 0
aborted = false aborted = false
killSwitch = KillSwitches.shared("EndlessReadService") killSwitch = KillSwitches.shared("EndlessReadService")
} }
@ -138,10 +135,14 @@ case class EndlessReadService(
killSwitch.shutdown() killSwitch.shutdown()
} }
val stateUpdatesCalls: AtomicInteger = new AtomicInteger(0) def isRunning: Boolean = synchronized {
val initialConditionCalls: AtomicInteger = new AtomicInteger(0) stateUpdatesCalls > 0 && !aborted
var aborted: Boolean = false }
private var killSwitch: SharedKillSwitch = KillSwitches.shared("EndlessReadService")
private var stateUpdatesCalls: Int = 0
private var initialConditionCalls: Int = 0
private var aborted: Boolean = false
private var killSwitch = KillSwitches.shared("EndlessReadService")
} }
object EndlessReadService { object EndlessReadService {

View File

@ -27,7 +27,7 @@ case class ReadServiceAndIndexer(
case class Indexers(indexers: List[ReadServiceAndIndexer]) { case class Indexers(indexers: List[ReadServiceAndIndexer]) {
// The list of all indexers that are running (determined by whether they have subscribed to the read service) // The list of all indexers that are running (determined by whether they have subscribed to the read service)
def runningIndexers: List[ReadServiceAndIndexer] = def runningIndexers: List[ReadServiceAndIndexer] =
indexers.filter(x => x.readService.stateUpdatesCalls.get() > 0 && !x.readService.aborted) indexers.filter(_.readService.isRunning)
def resetAll(): Unit = indexers.foreach(_.readService.reset()) def resetAll(): Unit = indexers.foreach(_.readService.reset())
} }