mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
DPP-496 HA indexer integration test (#11033)
* Add indexer stability test changelog_begin changelog_end * Add test for DebugStorageBackend * Reduce the log output for IndexerStabilitySpec * Add Oracle test * Rename file * Increase timeout * Rename DebugStorageBackend * Address review comments * Fixup rename * fixup address review comments * Refactor IndexerStabilitySpec * Reset can only be called on an aborted read service * Non-HA test can fail for multiple reasons * Fix logback for Oracle tests Otherwise logback complains about missing classes at startup * Fix Oracle jdbc url * Improve assert message * Improve log output Otherwise the user has to wait for 10min to get useful log output about the connection issue * Change default log ids The previous defaults are not valid for Oracle * fmt
This commit is contained in:
parent
735c3090a3
commit
8290347f74
@ -379,6 +379,7 @@ da_scala_test_suite(
|
||||
"//ledger/metrics:metrics-test-lib",
|
||||
"//ledger/participant-state",
|
||||
"//ledger/participant-state-index",
|
||||
"//ledger/test-common",
|
||||
"//libs-scala/contextualized-logging",
|
||||
"//libs-scala/logging-entries",
|
||||
"//libs-scala/oracle-testing",
|
||||
|
@ -58,8 +58,8 @@ case class HaConfig(
|
||||
workerLockAcquireRetryMillis: Long = 500,
|
||||
workerLockAcquireMaxRetry: Long = 1000,
|
||||
mainLockCheckerPeriodMillis: Long = 1000,
|
||||
indexerLockId: Int = 0x646d6c00, // note 0x646d6c equals ASCII encoded "dml"
|
||||
indexerWorkerLockId: Int = 0x646d6c01,
|
||||
indexerLockId: Int = 0x646d6c0, // note 0x646d6c equals ASCII encoded "dml"
|
||||
indexerWorkerLockId: Int = 0x646d6c1,
|
||||
enable: Boolean = false, // TODO ha: remove as stable
|
||||
)
|
||||
|
||||
|
@ -48,7 +48,8 @@ trait StorageBackend[DB_BATCH]
|
||||
with ContractStorageBackend
|
||||
with EventStorageBackend
|
||||
with DataSourceStorageBackend
|
||||
with DBLockStorageBackend {
|
||||
with DBLockStorageBackend
|
||||
with IntegrityStorageBackend {
|
||||
|
||||
/** Truncates all storage backend tables, EXCEPT the packages table.
|
||||
* Does not touch other tables, like the Flyway history table.
|
||||
@ -342,6 +343,15 @@ object DBLockStorageBackend {
|
||||
}
|
||||
}
|
||||
|
||||
trait IntegrityStorageBackend {
|
||||
|
||||
/** Verifies the integrity of the index database, throwing an exception if any issue is found.
|
||||
* This operation is allowed to take some time to finish.
|
||||
* It is not expected that it is used during regular index/indexer operation.
|
||||
*/
|
||||
def verifyIntegrity()(connection: Connection): Unit
|
||||
}
|
||||
|
||||
object StorageBackend {
|
||||
case class RawContractState(
|
||||
templateId: Option[String],
|
||||
|
@ -10,7 +10,7 @@ import javax.sql.DataSource
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.concurrent.duration.DurationInt
|
||||
import scala.util.Using
|
||||
import scala.util.{Failure, Using}
|
||||
|
||||
/** Returns a DataSource that is guaranteed to be connected to a responsive, compatible database. */
|
||||
object VerifiedDataSource {
|
||||
@ -38,6 +38,8 @@ object VerifiedDataSource {
|
||||
storageBackend.checkDatabaseAvailable
|
||||
)
|
||||
createdDatasource
|
||||
}.andThen { case Failure(exception) =>
|
||||
logger.warn(exception.getMessage)
|
||||
}
|
||||
}
|
||||
_ <- Future {
|
||||
|
@ -0,0 +1,76 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.platform.store.backend.common
|
||||
|
||||
import anorm.{RowParser, SQL}
|
||||
|
||||
import java.sql.Connection
|
||||
import anorm.SqlParser.long
|
||||
import anorm.~
|
||||
import com.daml.platform.store.backend.IntegrityStorageBackend
|
||||
|
||||
private[backend] trait IntegrityStorageBackendTemplate extends IntegrityStorageBackend {
|
||||
|
||||
private val allSequentialIds: String =
|
||||
s"""
|
||||
|SELECT event_sequential_id FROM participant_events_divulgence
|
||||
|UNION ALL
|
||||
|SELECT event_sequential_id FROM participant_events_create
|
||||
|UNION ALL
|
||||
|SELECT event_sequential_id FROM participant_events_consuming_exercise
|
||||
|UNION ALL
|
||||
|SELECT event_sequential_id FROM participant_events_non_consuming_exercise
|
||||
|""".stripMargin
|
||||
|
||||
private val SQL_EVENT_SEQUENTIAL_IDS_SUMMARY = SQL(s"""
|
||||
|WITH sequential_ids AS ($allSequentialIds)
|
||||
|SELECT min(event_sequential_id) as min, max(event_sequential_id) as max, count(event_sequential_id) as count
|
||||
|FROM sequential_ids, parameters
|
||||
|WHERE event_sequential_id <= parameters.ledger_end_sequential_id
|
||||
|""".stripMargin)
|
||||
|
||||
// Don't fetch an unbounded number of rows
|
||||
private val maxReportedDuplicates = 100
|
||||
|
||||
private val SQL_DUPLICATE_EVENT_SEQUENTIAL_IDS = SQL(s"""
|
||||
|WITH sequential_ids AS ($allSequentialIds)
|
||||
|SELECT event_sequential_id as id, count(*) as count
|
||||
|FROM sequential_ids, parameters
|
||||
|WHERE event_sequential_id <= parameters.ledger_end_sequential_id
|
||||
|GROUP BY event_sequential_id
|
||||
|HAVING count(*) > 1
|
||||
|FETCH NEXT $maxReportedDuplicates ROWS ONLY
|
||||
|""".stripMargin)
|
||||
|
||||
case class EventSequentialIdsRow(min: Long, max: Long, count: Long)
|
||||
|
||||
private val eventSequantialIdsParser: RowParser[EventSequentialIdsRow] =
|
||||
long("min") ~
|
||||
long("max") ~
|
||||
long("count") map { case min ~ max ~ count =>
|
||||
EventSequentialIdsRow(min, max, count)
|
||||
}
|
||||
|
||||
override def verifyIntegrity()(connection: Connection): Unit = {
|
||||
val duplicates = SQL_DUPLICATE_EVENT_SEQUENTIAL_IDS
|
||||
.as(long("id").*)(connection)
|
||||
val summary = SQL_EVENT_SEQUENTIAL_IDS_SUMMARY
|
||||
.as(eventSequantialIdsParser.single)(connection)
|
||||
|
||||
// Verify that there are no duplicate ids.
|
||||
if (duplicates.nonEmpty) {
|
||||
throw new RuntimeException(
|
||||
s"Found ${duplicates.length} duplicate event sequential ids. Examples: ${duplicates.mkString(", ")}"
|
||||
)
|
||||
}
|
||||
|
||||
// Verify that all ids are sequential (i.e., there are no "holes" in the ids).
|
||||
// Since we already know that there are not duplicates, it is enough to check that the count is consistent with the range.
|
||||
if (summary.count != summary.max - summary.min + 1) {
|
||||
throw new RuntimeException(
|
||||
s"Event sequential ids are not consecutive. Min=${summary.min}, max=${summary.max}, count=${summary.count}."
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
@ -5,7 +5,6 @@ package com.daml.platform.store.backend.h2
|
||||
|
||||
import java.sql.Connection
|
||||
import java.time.Instant
|
||||
|
||||
import anorm.{Row, SQL, SimpleSql}
|
||||
import anorm.SqlParser.get
|
||||
import com.daml.ledger.offset.Offset
|
||||
@ -20,6 +19,7 @@ import com.daml.platform.store.backend.common.{
|
||||
ConfigurationStorageBackendTemplate,
|
||||
ContractStorageBackendTemplate,
|
||||
DataSourceStorageBackendTemplate,
|
||||
IntegrityStorageBackendTemplate,
|
||||
DeduplicationStorageBackendTemplate,
|
||||
EventStorageBackendTemplate,
|
||||
EventStrategy,
|
||||
@ -38,8 +38,8 @@ import com.daml.platform.store.backend.{
|
||||
StorageBackend,
|
||||
common,
|
||||
}
|
||||
import javax.sql.DataSource
|
||||
|
||||
import javax.sql.DataSource
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
private[backend] object H2StorageBackend
|
||||
@ -53,7 +53,8 @@ private[backend] object H2StorageBackend
|
||||
with EventStorageBackendTemplate
|
||||
with ContractStorageBackendTemplate
|
||||
with CompletionStorageBackendTemplate
|
||||
with PartyStorageBackendTemplate {
|
||||
with PartyStorageBackendTemplate
|
||||
with IntegrityStorageBackendTemplate {
|
||||
|
||||
private val logger = ContextualizedLogger.get(this.getClass)
|
||||
|
||||
|
@ -12,6 +12,7 @@ import com.daml.platform.store.backend.common.{
|
||||
ConfigurationStorageBackendTemplate,
|
||||
ContractStorageBackendTemplate,
|
||||
DataSourceStorageBackendTemplate,
|
||||
IntegrityStorageBackendTemplate,
|
||||
DeduplicationStorageBackendTemplate,
|
||||
EventStorageBackendTemplate,
|
||||
EventStrategy,
|
||||
@ -30,15 +31,15 @@ import com.daml.platform.store.backend.{
|
||||
StorageBackend,
|
||||
common,
|
||||
}
|
||||
|
||||
import java.sql.Connection
|
||||
import java.time.Instant
|
||||
|
||||
import com.daml.ledger.offset.Offset
|
||||
import com.daml.platform.store.backend.EventStorageBackend.FilterParams
|
||||
import com.daml.logging.{ContextualizedLogger, LoggingContext}
|
||||
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
|
||||
import javax.sql.DataSource
|
||||
|
||||
import javax.sql.DataSource
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
private[backend] object OracleStorageBackend
|
||||
@ -52,7 +53,8 @@ private[backend] object OracleStorageBackend
|
||||
with EventStorageBackendTemplate
|
||||
with ContractStorageBackendTemplate
|
||||
with CompletionStorageBackendTemplate
|
||||
with PartyStorageBackendTemplate {
|
||||
with PartyStorageBackendTemplate
|
||||
with IntegrityStorageBackendTemplate {
|
||||
|
||||
private val logger = ContextualizedLogger.get(this.getClass)
|
||||
|
||||
@ -269,8 +271,8 @@ private[backend] object OracleStorageBackend
|
||||
|
||||
case class OracleLockId(id: Int) extends DBLockStorageBackend.LockId {
|
||||
// respecting Oracle limitations: https://docs.oracle.com/cd/B19306_01/appdev.102/b14258/d_lock.htm#ARPLS021
|
||||
assert(id >= 0)
|
||||
assert(id <= 1073741823)
|
||||
assert(id >= 0, s"Lock id $id is too small for Oracle")
|
||||
assert(id <= 1073741823, s"Lock id $id is too large for Oracle")
|
||||
}
|
||||
|
||||
private def oracleIntLockId(lockId: DBLockStorageBackend.LockId): Int =
|
||||
|
@ -5,7 +5,6 @@ package com.daml.platform.store.backend.postgresql
|
||||
|
||||
import java.sql.Connection
|
||||
import java.time.Instant
|
||||
|
||||
import anorm.SQL
|
||||
import anorm.SqlParser.{get, int}
|
||||
import com.daml.ledger.offset.Offset
|
||||
@ -21,6 +20,7 @@ import com.daml.platform.store.backend.common.{
|
||||
ConfigurationStorageBackendTemplate,
|
||||
ContractStorageBackendTemplate,
|
||||
DataSourceStorageBackendTemplate,
|
||||
IntegrityStorageBackendTemplate,
|
||||
DeduplicationStorageBackendTemplate,
|
||||
EventStorageBackendTemplate,
|
||||
EventStrategy,
|
||||
@ -39,6 +39,7 @@ import com.daml.platform.store.backend.{
|
||||
StorageBackend,
|
||||
common,
|
||||
}
|
||||
|
||||
import javax.sql.DataSource
|
||||
import org.postgresql.ds.PGSimpleDataSource
|
||||
|
||||
@ -53,7 +54,8 @@ private[backend] object PostgresStorageBackend
|
||||
with EventStorageBackendTemplate
|
||||
with ContractStorageBackendTemplate
|
||||
with CompletionStorageBackendTemplate
|
||||
with PartyStorageBackendTemplate {
|
||||
with PartyStorageBackendTemplate
|
||||
with IntegrityStorageBackendTemplate {
|
||||
|
||||
private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
|
||||
|
||||
|
@ -0,0 +1,225 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.platform.indexer.ha
|
||||
|
||||
import java.time.Instant
|
||||
import akka.NotUsed
|
||||
import akka.stream.{KillSwitches, SharedKillSwitch}
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.daml.daml_lf_dev.DamlLf
|
||||
import com.daml.ledger.api.health.HealthStatus
|
||||
import com.daml.lf.crypto
|
||||
import com.daml.ledger.configuration.{Configuration, LedgerId, LedgerInitialConditions}
|
||||
import com.daml.ledger.offset.Offset
|
||||
import com.daml.ledger.participant.state.v2.{CompletionInfo, ReadService, TransactionMeta, Update}
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.lf.data.Time.Timestamp
|
||||
import com.daml.lf.transaction.CommittedTransaction
|
||||
import com.daml.lf.transaction.test.TransactionBuilder
|
||||
import com.daml.lf.value.Value
|
||||
import com.daml.logging.{ContextualizedLogger, LoggingContext}
|
||||
import com.google.protobuf.ByteString
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import scala.concurrent.duration._
|
||||
|
||||
/** An infinite stream of state updates that fully conforms to the Daml ledger model.
|
||||
*
|
||||
* All instances of this class produce the same stream of state updates.
|
||||
*
|
||||
* @param updatesPerSecond The maximum number of updates per second produced.
|
||||
*/
|
||||
case class EndlessReadService(
|
||||
updatesPerSecond: Int,
|
||||
name: String,
|
||||
)(implicit loggingContext: LoggingContext)
|
||||
extends ReadService
|
||||
with AutoCloseable {
|
||||
import EndlessReadService._
|
||||
|
||||
private val logger = ContextualizedLogger.get(this.getClass)
|
||||
|
||||
override def currentHealth(): HealthStatus = synchronized {
|
||||
if (aborted) HealthStatus.unhealthy else HealthStatus.healthy
|
||||
}
|
||||
|
||||
override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = synchronized {
|
||||
logger.info("EndlessReadService.ledgerInitialConditions() called")
|
||||
initialConditionCalls.incrementAndGet()
|
||||
Source
|
||||
.single(LedgerInitialConditions(ledgerId, configuration, recordTime(0)))
|
||||
.via(killSwitch.flow)
|
||||
}
|
||||
|
||||
/** Produces the following stream of updates:
|
||||
* 1. a config change
|
||||
* 1. a party allocation
|
||||
* 1. a package upload
|
||||
* 1. a transaction that creates a contract
|
||||
* 1. a transaction that archives the previous contract
|
||||
*
|
||||
* The last two items above repeat indefinitely
|
||||
*/
|
||||
override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] =
|
||||
synchronized {
|
||||
logger.info(s"EndlessReadService.stateUpdates($beginAfter) called")
|
||||
stateUpdatesCalls.incrementAndGet()
|
||||
val startIndex: Int = beginAfter.map(index).getOrElse(1)
|
||||
Source
|
||||
.fromIterator(() => Iterator.from(startIndex))
|
||||
.throttle(updatesPerSecond, 1.second)
|
||||
.map {
|
||||
case i @ 1 =>
|
||||
offset(i) -> Update.ConfigurationChanged(
|
||||
recordTime(i),
|
||||
submissionId(i),
|
||||
participantId,
|
||||
configuration,
|
||||
)
|
||||
case i @ 2 =>
|
||||
offset(i) -> Update.PartyAddedToParticipant(
|
||||
party,
|
||||
"Operator",
|
||||
participantId,
|
||||
recordTime(i),
|
||||
Some(submissionId(i)),
|
||||
)
|
||||
case i @ 3 =>
|
||||
offset(i) -> Update.PublicPackageUpload(
|
||||
List(archive),
|
||||
Some("Package"),
|
||||
recordTime(i),
|
||||
Some(submissionId(i)),
|
||||
)
|
||||
case i if i % 2 == 0 =>
|
||||
offset(i) -> Update.TransactionAccepted(
|
||||
optCompletionInfo = Some(completionInfo(i)),
|
||||
transactionMeta = transactionMeta(i),
|
||||
transaction = createTransaction(i),
|
||||
transactionId = transactionId(i),
|
||||
recordTime = recordTime(i),
|
||||
divulgedContracts = List.empty,
|
||||
blindingInfo = None,
|
||||
)
|
||||
case i =>
|
||||
offset(i) -> Update.TransactionAccepted(
|
||||
optCompletionInfo = Some(completionInfo(i)),
|
||||
transactionMeta = transactionMeta(i),
|
||||
transaction = exerciseTransaction(i),
|
||||
transactionId = transactionId(i),
|
||||
recordTime = recordTime(i),
|
||||
divulgedContracts = List.empty,
|
||||
blindingInfo = None,
|
||||
)
|
||||
}
|
||||
.via(killSwitch.flow)
|
||||
}
|
||||
|
||||
def abort(cause: Throwable): Unit = synchronized {
|
||||
logger.info(s"EndlessReadService.abort() called")
|
||||
aborted = true
|
||||
killSwitch.abort(cause)
|
||||
}
|
||||
|
||||
def reset(): Unit = synchronized {
|
||||
assert(aborted)
|
||||
logger.info(s"EndlessReadService.reset() called")
|
||||
stateUpdatesCalls.set(0)
|
||||
initialConditionCalls.set(0)
|
||||
aborted = false
|
||||
killSwitch = KillSwitches.shared("EndlessReadService")
|
||||
}
|
||||
|
||||
override def close(): Unit = synchronized {
|
||||
logger.info(s"EndlessReadService.close() called")
|
||||
killSwitch.shutdown()
|
||||
}
|
||||
|
||||
val stateUpdatesCalls: AtomicInteger = new AtomicInteger(0)
|
||||
val initialConditionCalls: AtomicInteger = new AtomicInteger(0)
|
||||
var aborted: Boolean = false
|
||||
private var killSwitch: SharedKillSwitch = KillSwitches.shared("EndlessReadService")
|
||||
}
|
||||
|
||||
object EndlessReadService {
|
||||
val ledgerId: LedgerId = "EndlessReadService"
|
||||
val participantId: Ref.ParticipantId =
|
||||
Ref.ParticipantId.assertFromString("EndlessReadServiceParticipant")
|
||||
val configuration: Configuration = Configuration.reasonableInitialConfiguration
|
||||
val party: Ref.Party = Ref.Party.assertFromString("operator")
|
||||
val applicationId: Ref.ApplicationId = Ref.ApplicationId.assertFromString("Application")
|
||||
val workflowId: Ref.WorkflowId = Ref.WorkflowId.assertFromString("Workflow")
|
||||
val templateId: Ref.Identifier = Ref.Identifier.assertFromString("pkg:Mod:Template")
|
||||
val choiceName: Ref.Name = Ref.Name.assertFromString("SomeChoice")
|
||||
|
||||
private val archive = DamlLf.Archive.newBuilder
|
||||
.setHash("00001")
|
||||
.setHashFunction(DamlLf.HashFunction.SHA256)
|
||||
.setPayload(ByteString.copyFromUtf8("payload 1"))
|
||||
.build
|
||||
|
||||
// Note: all methods in this object MUST be fully deterministic
|
||||
def index(o: Offset): Int = Integer.parseInt(o.toHexString, 16)
|
||||
def offset(i: Int): Offset = Offset.fromHexString(Ref.HexString.assertFromString(f"$i%08d"))
|
||||
def submissionId(i: Int): Ref.SubmissionId = Ref.SubmissionId.assertFromString(f"sub$i%08d")
|
||||
def transactionId(i: Int): Ref.TransactionId = Ref.TransactionId.assertFromString(f"tx$i%08d")
|
||||
def commandId(i: Int): Ref.CommandId = Ref.CommandId.assertFromString(f"cmd$i%08d")
|
||||
def cid(i: Int): Value.ContractId = Value.ContractId.V0.assertFromString(s"#$i")
|
||||
def recordTime(i: Int): Timestamp =
|
||||
Timestamp.assertFromInstant(Instant.EPOCH.plusSeconds(i.toLong))
|
||||
def completionInfo(i: Int): CompletionInfo = CompletionInfo(
|
||||
actAs = List(party),
|
||||
applicationId = applicationId,
|
||||
commandId = commandId(i),
|
||||
optDeduplicationPeriod = None,
|
||||
submissionId = None,
|
||||
)
|
||||
def transactionMeta(i: Int): TransactionMeta = TransactionMeta(
|
||||
ledgerEffectiveTime = recordTime(i),
|
||||
workflowId = Some(workflowId),
|
||||
submissionTime = recordTime(i),
|
||||
submissionSeed = crypto.Hash.hashPrivateKey("EndlessReadService"),
|
||||
optUsedPackages = None,
|
||||
optNodeSeeds = None,
|
||||
optByKeyNodes = None,
|
||||
)
|
||||
// Creates contract #i
|
||||
private def createTransaction(i: Int): CommittedTransaction = {
|
||||
val builder = TransactionBuilder()
|
||||
val createNode = builder.create(
|
||||
id = cid(i),
|
||||
templateId = templateId,
|
||||
argument = Value.ValueUnit,
|
||||
signatories = Set(party),
|
||||
observers = Set.empty,
|
||||
key = None,
|
||||
)
|
||||
builder.add(createNode)
|
||||
builder.buildCommitted()
|
||||
}
|
||||
// Archives contract #(i-1)
|
||||
private def exerciseTransaction(i: Int): CommittedTransaction = {
|
||||
val builder = TransactionBuilder()
|
||||
val createNode = builder.create(
|
||||
id = cid(i - 1),
|
||||
templateId = templateId,
|
||||
argument = Value.ValueUnit,
|
||||
signatories = Set(party),
|
||||
observers = Set.empty,
|
||||
key = None,
|
||||
)
|
||||
val exerciseNode = builder.exercise(
|
||||
contract = createNode,
|
||||
choice = choiceName,
|
||||
consuming = true,
|
||||
actingParties = Set(party),
|
||||
argument = Value.ValueUnit,
|
||||
result = Some(Value.ValueUnit),
|
||||
choiceObservers = Set.empty,
|
||||
byKey = false,
|
||||
)
|
||||
builder.add(exerciseNode)
|
||||
builder.buildCommitted()
|
||||
}
|
||||
}
|
@ -0,0 +1,159 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.platform.indexer.ha
|
||||
|
||||
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
|
||||
import com.daml.ledger.resources.ResourceContext
|
||||
import com.daml.logging.LoggingContext
|
||||
import com.daml.platform.store.DbType
|
||||
import com.daml.platform.store.backend.StorageBackend
|
||||
import org.scalatest.Assertion
|
||||
import org.scalatest.concurrent.Eventually
|
||||
import org.scalatest.flatspec.AsyncFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.time.{Millis, Seconds, Span}
|
||||
|
||||
import java.sql.Connection
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.Success
|
||||
|
||||
trait IndexerStabilitySpec
|
||||
extends AsyncFlatSpec
|
||||
with Matchers
|
||||
with AkkaBeforeAndAfterAll
|
||||
with Eventually {
|
||||
|
||||
import IndexerStabilitySpec._
|
||||
|
||||
// To be overriden by the spec implementation
|
||||
def jdbcUrl: String
|
||||
|
||||
def haModeSupported: Boolean
|
||||
|
||||
// The default EC is coming from AsyncTestSuite and is serial, do not use it
|
||||
implicit val ec: ExecutionContext = system.dispatcher
|
||||
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
|
||||
|
||||
behavior of "concurrently running indexers"
|
||||
|
||||
it should "correctly work in high availability mode" in {
|
||||
val updatesPerSecond = 10 // Number of updates per second produced by the read service
|
||||
val indexerCount = 8 // Number of concurrently running indexers
|
||||
val restartIterations = 4 // Number of times the indexer should restart
|
||||
|
||||
implicit val rc: ResourceContext = ResourceContext(ec)
|
||||
|
||||
info(s"Creating indexers fixture with $indexerCount indexers")
|
||||
IndexerStabilityTestFixture
|
||||
.owner(
|
||||
updatesPerSecond,
|
||||
indexerCount,
|
||||
jdbcUrl,
|
||||
materializer,
|
||||
)
|
||||
.use[Unit] { indexers =>
|
||||
val storageBackend = StorageBackend.of(DbType.jdbcType(jdbcUrl))
|
||||
val dataSource = storageBackend.createDataSource(jdbcUrl)
|
||||
val connection = dataSource.getConnection()
|
||||
|
||||
Iterator
|
||||
.iterate(IterationState())(previousState => {
|
||||
// Assert that there is exactly one indexer running
|
||||
val activeIndexer = findActiveIndexer(indexers)
|
||||
info(s"Indexer ${activeIndexer.readService.name} is running")
|
||||
|
||||
// Assert that state updates are being indexed
|
||||
assertLedgerEndHasMoved(storageBackend, connection)
|
||||
info("Ledger end has moved")
|
||||
|
||||
// At this point, the indexer that was aborted by the previous iteration can be reset,
|
||||
// in order to keep the pool of competing indexers full.
|
||||
previousState.abortedIndexer.foreach(idx => {
|
||||
idx.readService.reset()
|
||||
info(s"ReadService ${idx.readService.name} was reset")
|
||||
})
|
||||
|
||||
// Abort the indexer by terminating the ReadService stream
|
||||
activeIndexer.readService.abort(simulatedFailure())
|
||||
info(s"ReadService ${activeIndexer.readService.name} was aborted")
|
||||
|
||||
IterationState(Some(activeIndexer))
|
||||
})
|
||||
.take(restartIterations + 1)
|
||||
.foreach(_ => ())
|
||||
|
||||
// Stop all indexers, in order to stop all database operations
|
||||
indexers.indexers.foreach(_.readService.abort(simulatedFailure()))
|
||||
info(s"All ReadServices were aborted")
|
||||
|
||||
// Wait until all indexers stop using the database, otherwise the test will
|
||||
// fail while trying to drop the database at the end.
|
||||
// It can take some time until all indexers actually stop indexing after the
|
||||
// state update stream was aborted. It is difficult to observe this event,
|
||||
// as the only externally visible signal is the health status of the indexer,
|
||||
// which is only "unhealthy" while RecoveringIndexer is waiting to restart.
|
||||
// Instead, we just wait a short time.
|
||||
Thread.sleep(1000L)
|
||||
|
||||
// Verify the integrity of the index database
|
||||
storageBackend.verifyIntegrity()(connection)
|
||||
info(s"Integrity of the index database was checked")
|
||||
|
||||
connection.close()
|
||||
Future.successful(())
|
||||
}
|
||||
.map(_ => succeed)
|
||||
}.transform { result =>
|
||||
if (haModeSupported) {
|
||||
result
|
||||
} else {
|
||||
// If HA mode is not supported, the test must fail, but there are multiple reasons why it can fail.
|
||||
// E.g., duplicate parameter table initialization, or duplicate event sequential ids.
|
||||
assert(result.isFailure, "The test must fail if HA mode is not supported")
|
||||
Success(succeed)
|
||||
}
|
||||
}
|
||||
|
||||
// Finds the first non-aborted indexer that has subscribed to the ReadService stream
|
||||
private def findActiveIndexer(indexers: Indexers): ReadServiceAndIndexer = {
|
||||
// It takes some time until a new indexer takes over after a failure.
|
||||
// The default ScalaTest timeout for eventually() is too short for this.
|
||||
implicit val patienceConfig: PatienceConfig = PatienceConfig(
|
||||
timeout = scaled(Span(10, Seconds)),
|
||||
interval = scaled(Span(100, Millis)),
|
||||
)
|
||||
eventually {
|
||||
indexers.runningIndexers.headOption.getOrElse(
|
||||
throw new RuntimeException("No indexer running")
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Asserts that the ledger end has moved at least the specified number of events within a short time
|
||||
private def assertLedgerEndHasMoved(
|
||||
storageBackend: StorageBackend[_],
|
||||
connection: Connection,
|
||||
)(implicit pos: org.scalactic.source.Position): Assertion = {
|
||||
implicit val patienceConfig: PatienceConfig = PatienceConfig(
|
||||
timeout = scaled(Span(10, Seconds)),
|
||||
interval = scaled(Span(100, Millis)),
|
||||
)
|
||||
// Note: we don't know exactly at which ledger end the current indexer has started.
|
||||
// We only observe that the ledger end is moving right now.
|
||||
val initialLedgerEnd = storageBackend.ledgerEndOrBeforeBegin(connection)
|
||||
val minEvents = 2L
|
||||
eventually {
|
||||
val ledgerEnd = storageBackend.ledgerEndOrBeforeBegin(connection)
|
||||
assert(ledgerEnd.lastEventSeqId > initialLedgerEnd.lastEventSeqId + minEvents)
|
||||
}
|
||||
}
|
||||
|
||||
private def simulatedFailure() = new RuntimeException("Simulated failure")
|
||||
}
|
||||
|
||||
object IndexerStabilitySpec {
|
||||
case class IterationState(
|
||||
abortedIndexer: Option[ReadServiceAndIndexer] = None
|
||||
)
|
||||
}
|
@ -0,0 +1,107 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.platform.indexer.ha
|
||||
|
||||
import akka.stream.Materializer
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.api.health.ReportsHealth
|
||||
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
|
||||
import com.daml.logging.ContextualizedLogger
|
||||
import com.daml.logging.LoggingContext.{newLoggingContext, withEnrichedLoggingContext}
|
||||
import com.daml.metrics.Metrics
|
||||
import com.daml.platform.indexer.{IndexerConfig, IndexerStartupMode, StandaloneIndexerServer}
|
||||
import com.daml.platform.store.LfValueTranslationCache
|
||||
|
||||
import java.util.concurrent.Executors
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
/** Stores a running indexer and the read service the indexer is reading from.
|
||||
* The read service is used exclusively by this indexer.
|
||||
*/
|
||||
case class ReadServiceAndIndexer(
|
||||
readService: EndlessReadService,
|
||||
indexing: ReportsHealth,
|
||||
)
|
||||
|
||||
case class Indexers(indexers: List[ReadServiceAndIndexer]) {
|
||||
// The list of all indexers that are running (determined by whether they have subscribed to the read service)
|
||||
def runningIndexers: List[ReadServiceAndIndexer] =
|
||||
indexers.filter(x => x.readService.stateUpdatesCalls.get() > 0 && !x.readService.aborted)
|
||||
def resetAll(): Unit = indexers.foreach(_.readService.reset())
|
||||
}
|
||||
|
||||
object IndexerStabilityTestFixture {
|
||||
|
||||
private val logger = ContextualizedLogger.get(this.getClass)
|
||||
|
||||
def owner(
|
||||
updatesPerSecond: Int,
|
||||
indexerCount: Int,
|
||||
jdbcUrl: String,
|
||||
materializer: Materializer,
|
||||
): ResourceOwner[Indexers] = new ResourceOwner[Indexers] {
|
||||
override def acquire()(implicit context: ResourceContext): Resource[Indexers] = {
|
||||
createIndexers(
|
||||
updatesPerSecond = updatesPerSecond,
|
||||
indexerCount = indexerCount,
|
||||
jdbcUrl = jdbcUrl,
|
||||
)(context, materializer)
|
||||
}
|
||||
}
|
||||
|
||||
private def createIndexers(
|
||||
updatesPerSecond: Int,
|
||||
indexerCount: Int,
|
||||
jdbcUrl: String,
|
||||
)(implicit resourceContext: ResourceContext, materializer: Materializer): Resource[Indexers] = {
|
||||
val indexerConfig = IndexerConfig(
|
||||
participantId = EndlessReadService.participantId,
|
||||
jdbcUrl = jdbcUrl,
|
||||
startupMode = IndexerStartupMode.MigrateAndStart,
|
||||
enableAppendOnlySchema = true,
|
||||
haConfig = HaConfig(enable = true),
|
||||
)
|
||||
|
||||
newLoggingContext { implicit loggingContext =>
|
||||
for {
|
||||
// This execution context is not used for indexing in the append-only schema, it can be shared
|
||||
servicesExecutionContext <- ResourceOwner
|
||||
.forExecutorService(() => Executors.newWorkStealingPool())
|
||||
.map(ExecutionContext.fromExecutorService)
|
||||
.acquire()
|
||||
|
||||
// Start N indexers that all compete for the same database
|
||||
_ = logger.info(s"Starting $indexerCount indexers for database $jdbcUrl")
|
||||
indexers <- Resource
|
||||
.sequence(
|
||||
(1 to indexerCount).toList
|
||||
.map(i =>
|
||||
for {
|
||||
// Create a read service
|
||||
readService <- ResourceOwner
|
||||
.forCloseable(() =>
|
||||
withEnrichedLoggingContext("name" -> s"ReadService$i") {
|
||||
readServiceLoggingContext =>
|
||||
EndlessReadService(updatesPerSecond, s"$i")(readServiceLoggingContext)
|
||||
}
|
||||
)
|
||||
.acquire()
|
||||
metricRegistry = new MetricRegistry
|
||||
metrics = new Metrics(metricRegistry)
|
||||
// Create an indexer and immediately start it
|
||||
indexing <- new StandaloneIndexerServer(
|
||||
readService = readService,
|
||||
config = indexerConfig,
|
||||
servicesExecutionContext = servicesExecutionContext,
|
||||
metrics = metrics,
|
||||
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
|
||||
).acquire()
|
||||
} yield ReadServiceAndIndexer(readService, indexing)
|
||||
)
|
||||
)
|
||||
.map(xs => Indexers(xs.toList))
|
||||
} yield indexers
|
||||
}
|
||||
}
|
||||
}
|
@ -13,6 +13,7 @@ trait StorageBackendSuite
|
||||
with StorageBackendTestsReset
|
||||
with StorageBackendTestsPruning
|
||||
with StorageBackendTestsDBLockForSuite
|
||||
with StorageBackendTestsDebug
|
||||
with StorageBackendTestsTimestamps {
|
||||
this: AsyncFlatSpec =>
|
||||
}
|
||||
|
@ -0,0 +1,67 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.platform.store.backend
|
||||
|
||||
import org.scalatest.flatspec.AsyncFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
|
||||
private[backend] trait StorageBackendTestsDebug extends Matchers with StorageBackendSpec {
|
||||
this: AsyncFlatSpec =>
|
||||
|
||||
import StorageBackendTestValues._
|
||||
|
||||
behavior of "DebugStorageBackend"
|
||||
|
||||
it should "find duplicate event ids" in {
|
||||
val updates = Vector(
|
||||
dtoCreate(offset(7), 7L, "#7"),
|
||||
dtoCreate(offset(7), 7L, "#7"), // duplicate id
|
||||
)
|
||||
|
||||
for {
|
||||
_ <- executeSql(backend.initializeParameters(someIdentityParams))
|
||||
_ <- executeSql(ingest(updates, _))
|
||||
_ <- executeSql(backend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(7), 7L)))
|
||||
failure <- executeSql(backend.verifyIntegrity()).failed
|
||||
} yield {
|
||||
// Error message should contain the duplicate event sequential id
|
||||
failure.getMessage should include("7")
|
||||
}
|
||||
}
|
||||
|
||||
it should "find non-consecutive event ids" in {
|
||||
val updates = Vector(
|
||||
dtoCreate(offset(1), 1L, "#1"),
|
||||
dtoCreate(offset(3), 3L, "#3"), // non-consecutive id
|
||||
)
|
||||
|
||||
for {
|
||||
_ <- executeSql(backend.initializeParameters(someIdentityParams))
|
||||
_ <- executeSql(ingest(updates, _))
|
||||
_ <- executeSql(backend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 3L)))
|
||||
failure <- executeSql(backend.verifyIntegrity()).failed
|
||||
} yield {
|
||||
failure.getMessage should include("consecutive")
|
||||
}
|
||||
}
|
||||
|
||||
it should "not find errors beyond the ledger end" in {
|
||||
val updates = Vector(
|
||||
dtoCreate(offset(1), 1L, "#1"),
|
||||
dtoCreate(offset(2), 2L, "#2"),
|
||||
dtoCreate(offset(7), 7L, "#7"), // beyond the ledger end
|
||||
dtoCreate(offset(7), 7L, "#7"), // duplicate id (beyond ledger end)
|
||||
dtoCreate(offset(9), 9L, "#9"), // non-consecutive id (beyond ledger end)
|
||||
)
|
||||
|
||||
for {
|
||||
_ <- executeSql(backend.initializeParameters(someIdentityParams))
|
||||
_ <- executeSql(ingest(updates, _))
|
||||
_ <- executeSql(backend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 2L)))
|
||||
_ <- executeSql(backend.verifyIntegrity())
|
||||
} yield {
|
||||
succeed
|
||||
}
|
||||
}
|
||||
}
|
@ -39,8 +39,21 @@
|
||||
<test>com.daml.platform.indexer.JdbcIndexerSpec</test>
|
||||
</appender>
|
||||
|
||||
<appender name="IndexerStabilitySpecAppender" class="com.daml.platform.testing.LogCollector">
|
||||
<test>com.daml.platform.indexer.ha.IndexerStabilitySpec</test>
|
||||
</appender>
|
||||
|
||||
<logger name="com.daml.platform.store.dao.HikariConnection" level="INFO">
|
||||
<appender-ref ref="JdbcIndexerSpecAppender"/>
|
||||
<appender-ref ref="IndexerStabilitySpecAppender"/>
|
||||
</logger>
|
||||
|
||||
<logger name="org.flywaydb" level="INFO">
|
||||
<appender-ref ref="IndexerStabilitySpecAppender"/>
|
||||
</logger>
|
||||
|
||||
<logger name="com.zaxxer.hikari" level="INFO">
|
||||
<appender-ref ref="IndexerStabilitySpecAppender"/>
|
||||
</logger>
|
||||
|
||||
<root level="DEBUG">
|
||||
|
@ -0,0 +1,10 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.platform.indexer.ha
|
||||
|
||||
final class IndexerStabilityH2Spec extends IndexerStabilitySpec {
|
||||
|
||||
override def jdbcUrl: String = "jdbc:h2:mem:indexer_stability_spec;db_close_delay=-1"
|
||||
override def haModeSupported: Boolean = false
|
||||
}
|
@ -0,0 +1,13 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.platform.indexer.ha
|
||||
|
||||
import com.daml.testing.oracle.OracleAroundAll
|
||||
|
||||
final class IndexerStabilityOracleSpec extends IndexerStabilitySpec with OracleAroundAll {
|
||||
|
||||
override def jdbcUrl: String =
|
||||
s"jdbc:oracle:thin:$oracleUser/$oraclePwd@localhost:$oraclePort/ORCLPDB1"
|
||||
override def haModeSupported: Boolean = true
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.platform.indexer.ha
|
||||
|
||||
import com.daml.testing.postgresql.PostgresAroundEach
|
||||
|
||||
final class IndexerStabilityPostgresSpec extends IndexerStabilitySpec with PostgresAroundEach {
|
||||
|
||||
override def jdbcUrl: String = postgresDatabase.url
|
||||
override def haModeSupported: Boolean = true
|
||||
}
|
Loading…
Reference in New Issue
Block a user