diff --git a/ledger/participant-integration-api/BUILD.bazel b/ledger/participant-integration-api/BUILD.bazel
index d58b82d196..ba6012935a 100644
--- a/ledger/participant-integration-api/BUILD.bazel
+++ b/ledger/participant-integration-api/BUILD.bazel
@@ -379,6 +379,7 @@ da_scala_test_suite(
"//ledger/metrics:metrics-test-lib",
"//ledger/participant-state",
"//ledger/participant-state-index",
+ "//ledger/test-common",
"//libs-scala/contextualized-logging",
"//libs-scala/logging-entries",
"//libs-scala/oracle-testing",
diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/HaCoordinator.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/HaCoordinator.scala
index a52518b85e..bf3877968d 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/HaCoordinator.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/HaCoordinator.scala
@@ -58,8 +58,8 @@ case class HaConfig(
workerLockAcquireRetryMillis: Long = 500,
workerLockAcquireMaxRetry: Long = 1000,
mainLockCheckerPeriodMillis: Long = 1000,
- indexerLockId: Int = 0x646d6c00, // note 0x646d6c equals ASCII encoded "dml"
- indexerWorkerLockId: Int = 0x646d6c01,
+ indexerLockId: Int = 0x646d6c0, // note 0x646d6c equals ASCII encoded "dml"
+ indexerWorkerLockId: Int = 0x646d6c1,
enable: Boolean = false, // TODO ha: remove as stable
)
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala
index b518527539..531a44e7b4 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala
@@ -48,7 +48,8 @@ trait StorageBackend[DB_BATCH]
with ContractStorageBackend
with EventStorageBackend
with DataSourceStorageBackend
- with DBLockStorageBackend {
+ with DBLockStorageBackend
+ with IntegrityStorageBackend {
/** Truncates all storage backend tables, EXCEPT the packages table.
* Does not touch other tables, like the Flyway history table.
@@ -342,6 +343,15 @@ object DBLockStorageBackend {
}
}
+trait IntegrityStorageBackend {
+
+ /** Verifies the integrity of the index database, throwing an exception if any issue is found.
+ * This operation is allowed to take some time to finish.
+ * It is not expected that it is used during regular index/indexer operation.
+ */
+ def verifyIntegrity()(connection: Connection): Unit
+}
+
object StorageBackend {
case class RawContractState(
templateId: Option[String],
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/VerifiedDataSource.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/VerifiedDataSource.scala
index db8af4a041..d343510fd6 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/VerifiedDataSource.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/VerifiedDataSource.scala
@@ -10,7 +10,7 @@ import javax.sql.DataSource
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
-import scala.util.Using
+import scala.util.{Failure, Using}
/** Returns a DataSource that is guaranteed to be connected to a responsive, compatible database. */
object VerifiedDataSource {
@@ -38,6 +38,8 @@ object VerifiedDataSource {
storageBackend.checkDatabaseAvailable
)
createdDatasource
+ }.andThen { case Failure(exception) =>
+ logger.warn(exception.getMessage)
}
}
_ <- Future {
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/IntegrityStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/IntegrityStorageBackendTemplate.scala
new file mode 100644
index 0000000000..9371631197
--- /dev/null
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/IntegrityStorageBackendTemplate.scala
@@ -0,0 +1,76 @@
+// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+package com.daml.platform.store.backend.common
+
+import anorm.{RowParser, SQL}
+
+import java.sql.Connection
+import anorm.SqlParser.long
+import anorm.~
+import com.daml.platform.store.backend.IntegrityStorageBackend
+
+private[backend] trait IntegrityStorageBackendTemplate extends IntegrityStorageBackend {
+
+ private val allSequentialIds: String =
+ s"""
+ |SELECT event_sequential_id FROM participant_events_divulgence
+ |UNION ALL
+ |SELECT event_sequential_id FROM participant_events_create
+ |UNION ALL
+ |SELECT event_sequential_id FROM participant_events_consuming_exercise
+ |UNION ALL
+ |SELECT event_sequential_id FROM participant_events_non_consuming_exercise
+ |""".stripMargin
+
+ private val SQL_EVENT_SEQUENTIAL_IDS_SUMMARY = SQL(s"""
+ |WITH sequential_ids AS ($allSequentialIds)
+ |SELECT min(event_sequential_id) as min, max(event_sequential_id) as max, count(event_sequential_id) as count
+ |FROM sequential_ids, parameters
+ |WHERE event_sequential_id <= parameters.ledger_end_sequential_id
+ |""".stripMargin)
+
+ // Don't fetch an unbounded number of rows
+ private val maxReportedDuplicates = 100
+
+ private val SQL_DUPLICATE_EVENT_SEQUENTIAL_IDS = SQL(s"""
+ |WITH sequential_ids AS ($allSequentialIds)
+ |SELECT event_sequential_id as id, count(*) as count
+ |FROM sequential_ids, parameters
+ |WHERE event_sequential_id <= parameters.ledger_end_sequential_id
+ |GROUP BY event_sequential_id
+ |HAVING count(*) > 1
+ |FETCH NEXT $maxReportedDuplicates ROWS ONLY
+ |""".stripMargin)
+
+ case class EventSequentialIdsRow(min: Long, max: Long, count: Long)
+
+ private val eventSequantialIdsParser: RowParser[EventSequentialIdsRow] =
+ long("min") ~
+ long("max") ~
+ long("count") map { case min ~ max ~ count =>
+ EventSequentialIdsRow(min, max, count)
+ }
+
+ override def verifyIntegrity()(connection: Connection): Unit = {
+ val duplicates = SQL_DUPLICATE_EVENT_SEQUENTIAL_IDS
+ .as(long("id").*)(connection)
+ val summary = SQL_EVENT_SEQUENTIAL_IDS_SUMMARY
+ .as(eventSequantialIdsParser.single)(connection)
+
+ // Verify that there are no duplicate ids.
+ if (duplicates.nonEmpty) {
+ throw new RuntimeException(
+ s"Found ${duplicates.length} duplicate event sequential ids. Examples: ${duplicates.mkString(", ")}"
+ )
+ }
+
+ // Verify that all ids are sequential (i.e., there are no "holes" in the ids).
+ // Since we already know that there are not duplicates, it is enough to check that the count is consistent with the range.
+ if (summary.count != summary.max - summary.min + 1) {
+ throw new RuntimeException(
+ s"Event sequential ids are not consecutive. Min=${summary.min}, max=${summary.max}, count=${summary.count}."
+ )
+ }
+ }
+}
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackend.scala
index 2bb5cedcbb..54d4a1564e 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackend.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackend.scala
@@ -5,7 +5,6 @@ package com.daml.platform.store.backend.h2
import java.sql.Connection
import java.time.Instant
-
import anorm.{Row, SQL, SimpleSql}
import anorm.SqlParser.get
import com.daml.ledger.offset.Offset
@@ -20,6 +19,7 @@ import com.daml.platform.store.backend.common.{
ConfigurationStorageBackendTemplate,
ContractStorageBackendTemplate,
DataSourceStorageBackendTemplate,
+ IntegrityStorageBackendTemplate,
DeduplicationStorageBackendTemplate,
EventStorageBackendTemplate,
EventStrategy,
@@ -38,8 +38,8 @@ import com.daml.platform.store.backend.{
StorageBackend,
common,
}
-import javax.sql.DataSource
+import javax.sql.DataSource
import scala.util.control.NonFatal
private[backend] object H2StorageBackend
@@ -53,7 +53,8 @@ private[backend] object H2StorageBackend
with EventStorageBackendTemplate
with ContractStorageBackendTemplate
with CompletionStorageBackendTemplate
- with PartyStorageBackendTemplate {
+ with PartyStorageBackendTemplate
+ with IntegrityStorageBackendTemplate {
private val logger = ContextualizedLogger.get(this.getClass)
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackend.scala
index 2460bf0347..043e27e0b2 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackend.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackend.scala
@@ -12,6 +12,7 @@ import com.daml.platform.store.backend.common.{
ConfigurationStorageBackendTemplate,
ContractStorageBackendTemplate,
DataSourceStorageBackendTemplate,
+ IntegrityStorageBackendTemplate,
DeduplicationStorageBackendTemplate,
EventStorageBackendTemplate,
EventStrategy,
@@ -30,15 +31,15 @@ import com.daml.platform.store.backend.{
StorageBackend,
common,
}
+
import java.sql.Connection
import java.time.Instant
-
import com.daml.ledger.offset.Offset
import com.daml.platform.store.backend.EventStorageBackend.FilterParams
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
-import javax.sql.DataSource
+import javax.sql.DataSource
import scala.util.control.NonFatal
private[backend] object OracleStorageBackend
@@ -52,7 +53,8 @@ private[backend] object OracleStorageBackend
with EventStorageBackendTemplate
with ContractStorageBackendTemplate
with CompletionStorageBackendTemplate
- with PartyStorageBackendTemplate {
+ with PartyStorageBackendTemplate
+ with IntegrityStorageBackendTemplate {
private val logger = ContextualizedLogger.get(this.getClass)
@@ -269,8 +271,8 @@ private[backend] object OracleStorageBackend
case class OracleLockId(id: Int) extends DBLockStorageBackend.LockId {
// respecting Oracle limitations: https://docs.oracle.com/cd/B19306_01/appdev.102/b14258/d_lock.htm#ARPLS021
- assert(id >= 0)
- assert(id <= 1073741823)
+ assert(id >= 0, s"Lock id $id is too small for Oracle")
+ assert(id <= 1073741823, s"Lock id $id is too large for Oracle")
}
private def oracleIntLockId(lockId: DBLockStorageBackend.LockId): Int =
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackend.scala
index 2bbba67489..373dc952f5 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackend.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackend.scala
@@ -5,7 +5,6 @@ package com.daml.platform.store.backend.postgresql
import java.sql.Connection
import java.time.Instant
-
import anorm.SQL
import anorm.SqlParser.{get, int}
import com.daml.ledger.offset.Offset
@@ -21,6 +20,7 @@ import com.daml.platform.store.backend.common.{
ConfigurationStorageBackendTemplate,
ContractStorageBackendTemplate,
DataSourceStorageBackendTemplate,
+ IntegrityStorageBackendTemplate,
DeduplicationStorageBackendTemplate,
EventStorageBackendTemplate,
EventStrategy,
@@ -39,6 +39,7 @@ import com.daml.platform.store.backend.{
StorageBackend,
common,
}
+
import javax.sql.DataSource
import org.postgresql.ds.PGSimpleDataSource
@@ -53,7 +54,8 @@ private[backend] object PostgresStorageBackend
with EventStorageBackendTemplate
with ContractStorageBackendTemplate
with CompletionStorageBackendTemplate
- with PartyStorageBackendTemplate {
+ with PartyStorageBackendTemplate
+ with IntegrityStorageBackendTemplate {
private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/EndlessReadService.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/EndlessReadService.scala
new file mode 100644
index 0000000000..e9e1062ca0
--- /dev/null
+++ b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/EndlessReadService.scala
@@ -0,0 +1,225 @@
+// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+package com.daml.platform.indexer.ha
+
+import java.time.Instant
+import akka.NotUsed
+import akka.stream.{KillSwitches, SharedKillSwitch}
+import akka.stream.scaladsl.Source
+import com.daml.daml_lf_dev.DamlLf
+import com.daml.ledger.api.health.HealthStatus
+import com.daml.lf.crypto
+import com.daml.ledger.configuration.{Configuration, LedgerId, LedgerInitialConditions}
+import com.daml.ledger.offset.Offset
+import com.daml.ledger.participant.state.v2.{CompletionInfo, ReadService, TransactionMeta, Update}
+import com.daml.lf.data.Ref
+import com.daml.lf.data.Time.Timestamp
+import com.daml.lf.transaction.CommittedTransaction
+import com.daml.lf.transaction.test.TransactionBuilder
+import com.daml.lf.value.Value
+import com.daml.logging.{ContextualizedLogger, LoggingContext}
+import com.google.protobuf.ByteString
+
+import java.util.concurrent.atomic.AtomicInteger
+import scala.concurrent.duration._
+
+/** An infinite stream of state updates that fully conforms to the Daml ledger model.
+ *
+ * All instances of this class produce the same stream of state updates.
+ *
+ * @param updatesPerSecond The maximum number of updates per second produced.
+ */
+case class EndlessReadService(
+ updatesPerSecond: Int,
+ name: String,
+)(implicit loggingContext: LoggingContext)
+ extends ReadService
+ with AutoCloseable {
+ import EndlessReadService._
+
+ private val logger = ContextualizedLogger.get(this.getClass)
+
+ override def currentHealth(): HealthStatus = synchronized {
+ if (aborted) HealthStatus.unhealthy else HealthStatus.healthy
+ }
+
+ override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = synchronized {
+ logger.info("EndlessReadService.ledgerInitialConditions() called")
+ initialConditionCalls.incrementAndGet()
+ Source
+ .single(LedgerInitialConditions(ledgerId, configuration, recordTime(0)))
+ .via(killSwitch.flow)
+ }
+
+ /** Produces the following stream of updates:
+ * 1. a config change
+ * 1. a party allocation
+ * 1. a package upload
+ * 1. a transaction that creates a contract
+ * 1. a transaction that archives the previous contract
+ *
+ * The last two items above repeat indefinitely
+ */
+ override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] =
+ synchronized {
+ logger.info(s"EndlessReadService.stateUpdates($beginAfter) called")
+ stateUpdatesCalls.incrementAndGet()
+ val startIndex: Int = beginAfter.map(index).getOrElse(1)
+ Source
+ .fromIterator(() => Iterator.from(startIndex))
+ .throttle(updatesPerSecond, 1.second)
+ .map {
+ case i @ 1 =>
+ offset(i) -> Update.ConfigurationChanged(
+ recordTime(i),
+ submissionId(i),
+ participantId,
+ configuration,
+ )
+ case i @ 2 =>
+ offset(i) -> Update.PartyAddedToParticipant(
+ party,
+ "Operator",
+ participantId,
+ recordTime(i),
+ Some(submissionId(i)),
+ )
+ case i @ 3 =>
+ offset(i) -> Update.PublicPackageUpload(
+ List(archive),
+ Some("Package"),
+ recordTime(i),
+ Some(submissionId(i)),
+ )
+ case i if i % 2 == 0 =>
+ offset(i) -> Update.TransactionAccepted(
+ optCompletionInfo = Some(completionInfo(i)),
+ transactionMeta = transactionMeta(i),
+ transaction = createTransaction(i),
+ transactionId = transactionId(i),
+ recordTime = recordTime(i),
+ divulgedContracts = List.empty,
+ blindingInfo = None,
+ )
+ case i =>
+ offset(i) -> Update.TransactionAccepted(
+ optCompletionInfo = Some(completionInfo(i)),
+ transactionMeta = transactionMeta(i),
+ transaction = exerciseTransaction(i),
+ transactionId = transactionId(i),
+ recordTime = recordTime(i),
+ divulgedContracts = List.empty,
+ blindingInfo = None,
+ )
+ }
+ .via(killSwitch.flow)
+ }
+
+ def abort(cause: Throwable): Unit = synchronized {
+ logger.info(s"EndlessReadService.abort() called")
+ aborted = true
+ killSwitch.abort(cause)
+ }
+
+ def reset(): Unit = synchronized {
+ assert(aborted)
+ logger.info(s"EndlessReadService.reset() called")
+ stateUpdatesCalls.set(0)
+ initialConditionCalls.set(0)
+ aborted = false
+ killSwitch = KillSwitches.shared("EndlessReadService")
+ }
+
+ override def close(): Unit = synchronized {
+ logger.info(s"EndlessReadService.close() called")
+ killSwitch.shutdown()
+ }
+
+ val stateUpdatesCalls: AtomicInteger = new AtomicInteger(0)
+ val initialConditionCalls: AtomicInteger = new AtomicInteger(0)
+ var aborted: Boolean = false
+ private var killSwitch: SharedKillSwitch = KillSwitches.shared("EndlessReadService")
+}
+
+object EndlessReadService {
+ val ledgerId: LedgerId = "EndlessReadService"
+ val participantId: Ref.ParticipantId =
+ Ref.ParticipantId.assertFromString("EndlessReadServiceParticipant")
+ val configuration: Configuration = Configuration.reasonableInitialConfiguration
+ val party: Ref.Party = Ref.Party.assertFromString("operator")
+ val applicationId: Ref.ApplicationId = Ref.ApplicationId.assertFromString("Application")
+ val workflowId: Ref.WorkflowId = Ref.WorkflowId.assertFromString("Workflow")
+ val templateId: Ref.Identifier = Ref.Identifier.assertFromString("pkg:Mod:Template")
+ val choiceName: Ref.Name = Ref.Name.assertFromString("SomeChoice")
+
+ private val archive = DamlLf.Archive.newBuilder
+ .setHash("00001")
+ .setHashFunction(DamlLf.HashFunction.SHA256)
+ .setPayload(ByteString.copyFromUtf8("payload 1"))
+ .build
+
+ // Note: all methods in this object MUST be fully deterministic
+ def index(o: Offset): Int = Integer.parseInt(o.toHexString, 16)
+ def offset(i: Int): Offset = Offset.fromHexString(Ref.HexString.assertFromString(f"$i%08d"))
+ def submissionId(i: Int): Ref.SubmissionId = Ref.SubmissionId.assertFromString(f"sub$i%08d")
+ def transactionId(i: Int): Ref.TransactionId = Ref.TransactionId.assertFromString(f"tx$i%08d")
+ def commandId(i: Int): Ref.CommandId = Ref.CommandId.assertFromString(f"cmd$i%08d")
+ def cid(i: Int): Value.ContractId = Value.ContractId.V0.assertFromString(s"#$i")
+ def recordTime(i: Int): Timestamp =
+ Timestamp.assertFromInstant(Instant.EPOCH.plusSeconds(i.toLong))
+ def completionInfo(i: Int): CompletionInfo = CompletionInfo(
+ actAs = List(party),
+ applicationId = applicationId,
+ commandId = commandId(i),
+ optDeduplicationPeriod = None,
+ submissionId = None,
+ )
+ def transactionMeta(i: Int): TransactionMeta = TransactionMeta(
+ ledgerEffectiveTime = recordTime(i),
+ workflowId = Some(workflowId),
+ submissionTime = recordTime(i),
+ submissionSeed = crypto.Hash.hashPrivateKey("EndlessReadService"),
+ optUsedPackages = None,
+ optNodeSeeds = None,
+ optByKeyNodes = None,
+ )
+ // Creates contract #i
+ private def createTransaction(i: Int): CommittedTransaction = {
+ val builder = TransactionBuilder()
+ val createNode = builder.create(
+ id = cid(i),
+ templateId = templateId,
+ argument = Value.ValueUnit,
+ signatories = Set(party),
+ observers = Set.empty,
+ key = None,
+ )
+ builder.add(createNode)
+ builder.buildCommitted()
+ }
+ // Archives contract #(i-1)
+ private def exerciseTransaction(i: Int): CommittedTransaction = {
+ val builder = TransactionBuilder()
+ val createNode = builder.create(
+ id = cid(i - 1),
+ templateId = templateId,
+ argument = Value.ValueUnit,
+ signatories = Set(party),
+ observers = Set.empty,
+ key = None,
+ )
+ val exerciseNode = builder.exercise(
+ contract = createNode,
+ choice = choiceName,
+ consuming = true,
+ actingParties = Set(party),
+ argument = Value.ValueUnit,
+ result = Some(Value.ValueUnit),
+ choiceObservers = Set.empty,
+ byKey = false,
+ )
+ builder.add(exerciseNode)
+ builder.buildCommitted()
+ }
+}
diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/IndexerStabilitySpec.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/IndexerStabilitySpec.scala
new file mode 100644
index 0000000000..5e569f8859
--- /dev/null
+++ b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/IndexerStabilitySpec.scala
@@ -0,0 +1,159 @@
+// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+package com.daml.platform.indexer.ha
+
+import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
+import com.daml.ledger.resources.ResourceContext
+import com.daml.logging.LoggingContext
+import com.daml.platform.store.DbType
+import com.daml.platform.store.backend.StorageBackend
+import org.scalatest.Assertion
+import org.scalatest.concurrent.Eventually
+import org.scalatest.flatspec.AsyncFlatSpec
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Millis, Seconds, Span}
+
+import java.sql.Connection
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+trait IndexerStabilitySpec
+ extends AsyncFlatSpec
+ with Matchers
+ with AkkaBeforeAndAfterAll
+ with Eventually {
+
+ import IndexerStabilitySpec._
+
+ // To be overriden by the spec implementation
+ def jdbcUrl: String
+
+ def haModeSupported: Boolean
+
+ // The default EC is coming from AsyncTestSuite and is serial, do not use it
+ implicit val ec: ExecutionContext = system.dispatcher
+ private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
+
+ behavior of "concurrently running indexers"
+
+ it should "correctly work in high availability mode" in {
+ val updatesPerSecond = 10 // Number of updates per second produced by the read service
+ val indexerCount = 8 // Number of concurrently running indexers
+ val restartIterations = 4 // Number of times the indexer should restart
+
+ implicit val rc: ResourceContext = ResourceContext(ec)
+
+ info(s"Creating indexers fixture with $indexerCount indexers")
+ IndexerStabilityTestFixture
+ .owner(
+ updatesPerSecond,
+ indexerCount,
+ jdbcUrl,
+ materializer,
+ )
+ .use[Unit] { indexers =>
+ val storageBackend = StorageBackend.of(DbType.jdbcType(jdbcUrl))
+ val dataSource = storageBackend.createDataSource(jdbcUrl)
+ val connection = dataSource.getConnection()
+
+ Iterator
+ .iterate(IterationState())(previousState => {
+ // Assert that there is exactly one indexer running
+ val activeIndexer = findActiveIndexer(indexers)
+ info(s"Indexer ${activeIndexer.readService.name} is running")
+
+ // Assert that state updates are being indexed
+ assertLedgerEndHasMoved(storageBackend, connection)
+ info("Ledger end has moved")
+
+ // At this point, the indexer that was aborted by the previous iteration can be reset,
+ // in order to keep the pool of competing indexers full.
+ previousState.abortedIndexer.foreach(idx => {
+ idx.readService.reset()
+ info(s"ReadService ${idx.readService.name} was reset")
+ })
+
+ // Abort the indexer by terminating the ReadService stream
+ activeIndexer.readService.abort(simulatedFailure())
+ info(s"ReadService ${activeIndexer.readService.name} was aborted")
+
+ IterationState(Some(activeIndexer))
+ })
+ .take(restartIterations + 1)
+ .foreach(_ => ())
+
+ // Stop all indexers, in order to stop all database operations
+ indexers.indexers.foreach(_.readService.abort(simulatedFailure()))
+ info(s"All ReadServices were aborted")
+
+ // Wait until all indexers stop using the database, otherwise the test will
+ // fail while trying to drop the database at the end.
+ // It can take some time until all indexers actually stop indexing after the
+ // state update stream was aborted. It is difficult to observe this event,
+ // as the only externally visible signal is the health status of the indexer,
+ // which is only "unhealthy" while RecoveringIndexer is waiting to restart.
+ // Instead, we just wait a short time.
+ Thread.sleep(1000L)
+
+ // Verify the integrity of the index database
+ storageBackend.verifyIntegrity()(connection)
+ info(s"Integrity of the index database was checked")
+
+ connection.close()
+ Future.successful(())
+ }
+ .map(_ => succeed)
+ }.transform { result =>
+ if (haModeSupported) {
+ result
+ } else {
+ // If HA mode is not supported, the test must fail, but there are multiple reasons why it can fail.
+ // E.g., duplicate parameter table initialization, or duplicate event sequential ids.
+ assert(result.isFailure, "The test must fail if HA mode is not supported")
+ Success(succeed)
+ }
+ }
+
+ // Finds the first non-aborted indexer that has subscribed to the ReadService stream
+ private def findActiveIndexer(indexers: Indexers): ReadServiceAndIndexer = {
+ // It takes some time until a new indexer takes over after a failure.
+ // The default ScalaTest timeout for eventually() is too short for this.
+ implicit val patienceConfig: PatienceConfig = PatienceConfig(
+ timeout = scaled(Span(10, Seconds)),
+ interval = scaled(Span(100, Millis)),
+ )
+ eventually {
+ indexers.runningIndexers.headOption.getOrElse(
+ throw new RuntimeException("No indexer running")
+ )
+ }
+ }
+
+ // Asserts that the ledger end has moved at least the specified number of events within a short time
+ private def assertLedgerEndHasMoved(
+ storageBackend: StorageBackend[_],
+ connection: Connection,
+ )(implicit pos: org.scalactic.source.Position): Assertion = {
+ implicit val patienceConfig: PatienceConfig = PatienceConfig(
+ timeout = scaled(Span(10, Seconds)),
+ interval = scaled(Span(100, Millis)),
+ )
+ // Note: we don't know exactly at which ledger end the current indexer has started.
+ // We only observe that the ledger end is moving right now.
+ val initialLedgerEnd = storageBackend.ledgerEndOrBeforeBegin(connection)
+ val minEvents = 2L
+ eventually {
+ val ledgerEnd = storageBackend.ledgerEndOrBeforeBegin(connection)
+ assert(ledgerEnd.lastEventSeqId > initialLedgerEnd.lastEventSeqId + minEvents)
+ }
+ }
+
+ private def simulatedFailure() = new RuntimeException("Simulated failure")
+}
+
+object IndexerStabilitySpec {
+ case class IterationState(
+ abortedIndexer: Option[ReadServiceAndIndexer] = None
+ )
+}
diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/IndexerStabilityTestFixture.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/IndexerStabilityTestFixture.scala
new file mode 100644
index 0000000000..98a36d9991
--- /dev/null
+++ b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/IndexerStabilityTestFixture.scala
@@ -0,0 +1,107 @@
+// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+package com.daml.platform.indexer.ha
+
+import akka.stream.Materializer
+import com.codahale.metrics.MetricRegistry
+import com.daml.ledger.api.health.ReportsHealth
+import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
+import com.daml.logging.ContextualizedLogger
+import com.daml.logging.LoggingContext.{newLoggingContext, withEnrichedLoggingContext}
+import com.daml.metrics.Metrics
+import com.daml.platform.indexer.{IndexerConfig, IndexerStartupMode, StandaloneIndexerServer}
+import com.daml.platform.store.LfValueTranslationCache
+
+import java.util.concurrent.Executors
+import scala.concurrent.ExecutionContext
+
+/** Stores a running indexer and the read service the indexer is reading from.
+ * The read service is used exclusively by this indexer.
+ */
+case class ReadServiceAndIndexer(
+ readService: EndlessReadService,
+ indexing: ReportsHealth,
+)
+
+case class Indexers(indexers: List[ReadServiceAndIndexer]) {
+ // The list of all indexers that are running (determined by whether they have subscribed to the read service)
+ def runningIndexers: List[ReadServiceAndIndexer] =
+ indexers.filter(x => x.readService.stateUpdatesCalls.get() > 0 && !x.readService.aborted)
+ def resetAll(): Unit = indexers.foreach(_.readService.reset())
+}
+
+object IndexerStabilityTestFixture {
+
+ private val logger = ContextualizedLogger.get(this.getClass)
+
+ def owner(
+ updatesPerSecond: Int,
+ indexerCount: Int,
+ jdbcUrl: String,
+ materializer: Materializer,
+ ): ResourceOwner[Indexers] = new ResourceOwner[Indexers] {
+ override def acquire()(implicit context: ResourceContext): Resource[Indexers] = {
+ createIndexers(
+ updatesPerSecond = updatesPerSecond,
+ indexerCount = indexerCount,
+ jdbcUrl = jdbcUrl,
+ )(context, materializer)
+ }
+ }
+
+ private def createIndexers(
+ updatesPerSecond: Int,
+ indexerCount: Int,
+ jdbcUrl: String,
+ )(implicit resourceContext: ResourceContext, materializer: Materializer): Resource[Indexers] = {
+ val indexerConfig = IndexerConfig(
+ participantId = EndlessReadService.participantId,
+ jdbcUrl = jdbcUrl,
+ startupMode = IndexerStartupMode.MigrateAndStart,
+ enableAppendOnlySchema = true,
+ haConfig = HaConfig(enable = true),
+ )
+
+ newLoggingContext { implicit loggingContext =>
+ for {
+ // This execution context is not used for indexing in the append-only schema, it can be shared
+ servicesExecutionContext <- ResourceOwner
+ .forExecutorService(() => Executors.newWorkStealingPool())
+ .map(ExecutionContext.fromExecutorService)
+ .acquire()
+
+ // Start N indexers that all compete for the same database
+ _ = logger.info(s"Starting $indexerCount indexers for database $jdbcUrl")
+ indexers <- Resource
+ .sequence(
+ (1 to indexerCount).toList
+ .map(i =>
+ for {
+ // Create a read service
+ readService <- ResourceOwner
+ .forCloseable(() =>
+ withEnrichedLoggingContext("name" -> s"ReadService$i") {
+ readServiceLoggingContext =>
+ EndlessReadService(updatesPerSecond, s"$i")(readServiceLoggingContext)
+ }
+ )
+ .acquire()
+ metricRegistry = new MetricRegistry
+ metrics = new Metrics(metricRegistry)
+ // Create an indexer and immediately start it
+ indexing <- new StandaloneIndexerServer(
+ readService = readService,
+ config = indexerConfig,
+ servicesExecutionContext = servicesExecutionContext,
+ metrics = metrics,
+ lfValueTranslationCache = LfValueTranslationCache.Cache.none,
+ ).acquire()
+ } yield ReadServiceAndIndexer(readService, indexing)
+ )
+ )
+ .map(xs => Indexers(xs.toList))
+ } yield indexers
+ }
+ }
+}
diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala
index 9e276d4240..ae758e65c4 100644
--- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala
+++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala
@@ -13,6 +13,7 @@ trait StorageBackendSuite
with StorageBackendTestsReset
with StorageBackendTestsPruning
with StorageBackendTestsDBLockForSuite
+ with StorageBackendTestsDebug
with StorageBackendTestsTimestamps {
this: AsyncFlatSpec =>
}
diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDebug.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDebug.scala
new file mode 100644
index 0000000000..cf8b075f64
--- /dev/null
+++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDebug.scala
@@ -0,0 +1,67 @@
+// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+package com.daml.platform.store.backend
+
+import org.scalatest.flatspec.AsyncFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+private[backend] trait StorageBackendTestsDebug extends Matchers with StorageBackendSpec {
+ this: AsyncFlatSpec =>
+
+ import StorageBackendTestValues._
+
+ behavior of "DebugStorageBackend"
+
+ it should "find duplicate event ids" in {
+ val updates = Vector(
+ dtoCreate(offset(7), 7L, "#7"),
+ dtoCreate(offset(7), 7L, "#7"), // duplicate id
+ )
+
+ for {
+ _ <- executeSql(backend.initializeParameters(someIdentityParams))
+ _ <- executeSql(ingest(updates, _))
+ _ <- executeSql(backend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(7), 7L)))
+ failure <- executeSql(backend.verifyIntegrity()).failed
+ } yield {
+ // Error message should contain the duplicate event sequential id
+ failure.getMessage should include("7")
+ }
+ }
+
+ it should "find non-consecutive event ids" in {
+ val updates = Vector(
+ dtoCreate(offset(1), 1L, "#1"),
+ dtoCreate(offset(3), 3L, "#3"), // non-consecutive id
+ )
+
+ for {
+ _ <- executeSql(backend.initializeParameters(someIdentityParams))
+ _ <- executeSql(ingest(updates, _))
+ _ <- executeSql(backend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 3L)))
+ failure <- executeSql(backend.verifyIntegrity()).failed
+ } yield {
+ failure.getMessage should include("consecutive")
+ }
+ }
+
+ it should "not find errors beyond the ledger end" in {
+ val updates = Vector(
+ dtoCreate(offset(1), 1L, "#1"),
+ dtoCreate(offset(2), 2L, "#2"),
+ dtoCreate(offset(7), 7L, "#7"), // beyond the ledger end
+ dtoCreate(offset(7), 7L, "#7"), // duplicate id (beyond ledger end)
+ dtoCreate(offset(9), 9L, "#9"), // non-consecutive id (beyond ledger end)
+ )
+
+ for {
+ _ <- executeSql(backend.initializeParameters(someIdentityParams))
+ _ <- executeSql(ingest(updates, _))
+ _ <- executeSql(backend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 2L)))
+ _ <- executeSql(backend.verifyIntegrity())
+ } yield {
+ succeed
+ }
+ }
+}
diff --git a/ledger/participant-integration-api/src/test/resources/logback-test.xml b/ledger/participant-integration-api/src/test/resources/logback-test.xml
index 97cfcc36ab..041a958e05 100644
--- a/ledger/participant-integration-api/src/test/resources/logback-test.xml
+++ b/ledger/participant-integration-api/src/test/resources/logback-test.xml
@@ -39,8 +39,21 @@
com.daml.platform.indexer.JdbcIndexerSpec
+
+ com.daml.platform.indexer.ha.IndexerStabilitySpec
+
+
+
+
+
+
+
+
+
+
+
diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/IndexerStabilityH2Spec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/IndexerStabilityH2Spec.scala
new file mode 100644
index 0000000000..fc09ecad1f
--- /dev/null
+++ b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/IndexerStabilityH2Spec.scala
@@ -0,0 +1,10 @@
+// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+package com.daml.platform.indexer.ha
+
+final class IndexerStabilityH2Spec extends IndexerStabilitySpec {
+
+ override def jdbcUrl: String = "jdbc:h2:mem:indexer_stability_spec;db_close_delay=-1"
+ override def haModeSupported: Boolean = false
+}
diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/IndexerStabilityOracleSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/IndexerStabilityOracleSpec.scala
new file mode 100644
index 0000000000..e841ec1b2f
--- /dev/null
+++ b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/IndexerStabilityOracleSpec.scala
@@ -0,0 +1,13 @@
+// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+package com.daml.platform.indexer.ha
+
+import com.daml.testing.oracle.OracleAroundAll
+
+final class IndexerStabilityOracleSpec extends IndexerStabilitySpec with OracleAroundAll {
+
+ override def jdbcUrl: String =
+ s"jdbc:oracle:thin:$oracleUser/$oraclePwd@localhost:$oraclePort/ORCLPDB1"
+ override def haModeSupported: Boolean = true
+}
diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/IndexerStabilityPostgresSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/IndexerStabilityPostgresSpec.scala
new file mode 100644
index 0000000000..6388edf61c
--- /dev/null
+++ b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/IndexerStabilityPostgresSpec.scala
@@ -0,0 +1,12 @@
+// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+package com.daml.platform.indexer.ha
+
+import com.daml.testing.postgresql.PostgresAroundEach
+
+final class IndexerStabilityPostgresSpec extends IndexerStabilitySpec with PostgresAroundEach {
+
+ override def jdbcUrl: String = postgresDatabase.url
+ override def haModeSupported: Boolean = true
+}