From 308f938512b2ed9698e61924de1f422310ddc430 Mon Sep 17 00:00:00 2001 From: Marton Nagy Date: Thu, 16 Sep 2021 17:08:45 +0200 Subject: [PATCH] Dpp 494 unit testing ha coordinator (#10862) * Unit testing: HA Coordinator * Preparation: switch to fix-thread-pool in AkkaBeforeAndAfterAll to have more stable test runtimes CHANGELOG_BEGIN CHANGELOG_END * Preparation: switch to timer from akka-pattern and Scheduler in waiting-futures, for higher precision timing CHANGELOG_BEGIN CHANGELOG_END * Preparation: prevent race in PollingChecker (corner-case uncovered via sustained testing) CHANGELOG_BEGIN CHANGELOG_END * Preparation - HaCoordinator refactorings: switch from DataSource to connection factory functions to clean dependencies, fixing some typos CHANGELOG_BEGIN CHANGELOG_END * Add test infrastructure for locking CHANGELOG_BEGIN CHANGELOG_END * Reuse StorageBackendTestsDBLock to test TestDBLockStorageBackend CHANGELOG_BEGIN CHANGELOG_END * Add unit test suite for HaCoordinator CHANGELOG_BEGIN CHANGELOG_END * Fix random generation to respect scala 2.12 CHANGELOG_BEGIN CHANGELOG_END * Adds test cases for graceful-shutdown during initialization CHANGELOG_BEGIN CHANGELOG_END * Minor changes based on review CHANGELOG_BEGIN CHANGELOG_END * Some comment rewording based on review CHANGELOG_BEGIN CHANGELOG_END --- .../testing/utils/AkkaBeforeAndAfterAll.scala | 5 +- .../platform/indexer/ha/HaCoordinator.scala | 25 +- .../platform/indexer/ha/PollingChecker.scala | 10 +- .../indexer/ha/PreemptableSequence.scala | 18 +- .../parallel/ParallelIndexerFactory.scala | 42 +- .../indexer/ha/TestDBLockStorageBackend.scala | 226 +++++ .../store/backend/StorageBackendSuite.scala | 5 +- .../backend/StorageBackendTestsDBLock.scala | 139 +-- .../indexer/ha/HaCoordinatorSpec.scala | 856 ++++++++++++++++++ .../ha/TestDBLockStorageBackendSpec.scala | 26 + 10 files changed, 1244 insertions(+), 108 deletions(-) create mode 100644 ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/TestDBLockStorageBackend.scala create mode 100644 ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/HaCoordinatorSpec.scala create mode 100644 ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/TestDBLockStorageBackendSpec.scala diff --git a/ledger-api/testing-utils/src/main/scala/com/digitalasset/ledger/api/testing/utils/AkkaBeforeAndAfterAll.scala b/ledger-api/testing-utils/src/main/scala/com/digitalasset/ledger/api/testing/utils/AkkaBeforeAndAfterAll.scala index 227d50515f1..5ee2d30750f 100644 --- a/ledger-api/testing-utils/src/main/scala/com/digitalasset/ledger/api/testing/utils/AkkaBeforeAndAfterAll.scala +++ b/ledger-api/testing-utils/src/main/scala/com/digitalasset/ledger/api/testing/utils/AkkaBeforeAndAfterAll.scala @@ -23,14 +23,15 @@ trait AkkaBeforeAndAfterAll extends BeforeAndAfterAll { private implicit lazy val executionContext: ExecutionContext = ExecutionContext.fromExecutorService( - Executors.newCachedThreadPool( + Executors.newFixedThreadPool( + 16, new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat(s"$actorSystemName-thread-pool-worker-%d") .setUncaughtExceptionHandler((thread, e) => logger.error(s"got an uncaught exception on thread: ${thread.getName}", e) ) - .build() + .build(), ) ) diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/HaCoordinator.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/HaCoordinator.scala index 6fdc20822e6..a52518b85ef 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/HaCoordinator.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/HaCoordinator.scala @@ -4,13 +4,12 @@ package com.daml.platform.indexer.ha import java.sql.Connection +import java.util.Timer -import akka.actor.Scheduler import akka.stream.KillSwitch import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.platform.store.backend.DBLockStorageBackend.{Lock, LockId, LockMode} import com.daml.platform.store.backend.DBLockStorageBackend -import javax.sql.DataSource import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Future} @@ -55,9 +54,9 @@ trait HaCoordinator { } case class HaConfig( - mainLockAquireRetryMillis: Long = 500, - workerLockAquireRetryMillis: Long = 500, - workerLockAquireMaxRetry: Long = 1000, + mainLockAcquireRetryMillis: Long = 500, + workerLockAcquireRetryMillis: Long = 500, + workerLockAcquireMaxRetry: Long = 1000, mainLockCheckerPeriodMillis: Long = 1000, indexerLockId: Int = 0x646d6c00, // note 0x646d6c equals ASCII encoded "dml" indexerWorkerLockId: Int = 0x646d6c01, @@ -75,22 +74,22 @@ object HaCoordinator { * - provides a ConnectionInitializer function which is mandatory to execute on all worker connections during execution * - will spawn a polling-daemon to observe continuous presence of the main lock * - * @param dataSource to spawn the main connection which keeps the Indexer Main Lock + * @param connectionFactory to spawn the main connection which keeps the Indexer Main Lock * @param storageBackend is the database-independent abstraction of session/connection level database locking * @param executionContext which is use to execute initialisation, will do blocking/IO work, so dedicated execution context is recommended */ def databaseLockBasedHaCoordinator( - dataSource: DataSource, + connectionFactory: () => Connection, storageBackend: DBLockStorageBackend, executionContext: ExecutionContext, - scheduler: Scheduler, + timer: Timer, haConfig: HaConfig, )(implicit loggingContext: LoggingContext): HaCoordinator = { implicit val ec: ExecutionContext = executionContext val indexerLockId = storageBackend.lock(haConfig.indexerLockId) val indexerWorkerLockId = storageBackend.lock(haConfig.indexerWorkerLockId) - val preemptableSequence = PreemptableSequence(scheduler) + val preemptableSequence = PreemptableSequence(timer) new HaCoordinator { override def protectedExecution( @@ -114,18 +113,18 @@ object HaCoordinator { import sequenceHelper._ logger.info("Starting databaseLockBasedHaCoordinator") for { - mainConnection <- go[Connection](dataSource.getConnection) + mainConnection <- go[Connection](connectionFactory()) _ = logger.info("Step 1: creating main-connection - DONE") _ = registerRelease { logger.info("Releasing main connection...") mainConnection.close() logger.info("Released main connection") } - _ <- retry(haConfig.mainLockAquireRetryMillis)(acquireMainLock(mainConnection)) + _ <- retry(haConfig.mainLockAcquireRetryMillis)(acquireMainLock(mainConnection)) _ = logger.info("Step 2: acquire exclusive Indexer Main Lock on main-connection - DONE") exclusiveWorkerLock <- retry[Lock]( - haConfig.workerLockAquireRetryMillis, - haConfig.workerLockAquireMaxRetry, + haConfig.workerLockAcquireRetryMillis, + haConfig.workerLockAcquireMaxRetry, )( acquireLock(mainConnection, indexerWorkerLockId, LockMode.Exclusive) ) diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/PollingChecker.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/PollingChecker.scala index 8294a7dcff9..a545a272a92 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/PollingChecker.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/PollingChecker.scala @@ -51,6 +51,9 @@ class PollingChecker( def check(): Unit = synchronized { logger.debug(s"Checking...") Try(checkBody) match { + case _ if closed => + throw new Exception("Checker is already closed") + case Success(_) => logger.debug(s"Check successful.") @@ -61,5 +64,10 @@ class PollingChecker( } } - def close(): Unit = timer.cancel() + private var closed: Boolean = false + + def close(): Unit = synchronized { + closed = true + timer.cancel() + } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/PreemptableSequence.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/PreemptableSequence.scala index 9759795369b..43e8d50e282 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/PreemptableSequence.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/PreemptableSequence.scala @@ -3,10 +3,10 @@ package com.daml.platform.indexer.ha -import akka.actor.Scheduler +import java.util.{Timer, TimerTask} + import com.daml.logging.{ContextualizedLogger, LoggingContext} -import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success} @@ -88,7 +88,7 @@ object PreemptableSequence { * - and encapsulate synchronous work in futures (this could be possibly blocking) * Because of the possible blocking nature a dedicated pool is recommended. */ - def apply(scheduler: Scheduler)(implicit + def apply(timer: Timer)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext, ): PreemptableSequence = { sequence => @@ -98,8 +98,16 @@ object PreemptableSequence { var releaseStack: List[() => Future[Unit]] = Nil val helper: SequenceHelper = new SequenceHelper { - private def waitFor(delayMillis: Long): Future[Unit] = - goF(akka.pattern.after(FiniteDuration(delayMillis, "millis"), scheduler)(Future.unit)) + private def waitFor(delayMillis: Long): Future[Unit] = { + val p = Promise[Unit]() + timer.schedule( + new TimerTask { + override def run(): Unit = p.success(()) + }, + delayMillis, + ) + goF(p.future) + } override def registerRelease(release: => Unit): Unit = synchronized { logger.info(s"Registered release function") diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala index 6dc552c2884..aed9c96adbe 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala @@ -3,6 +3,7 @@ package com.daml.platform.indexer.parallel +import java.util.Timer import java.util.concurrent.Executors import akka.stream.{KillSwitch, Materializer} @@ -52,28 +53,29 @@ object ParallelIndexerFactory { Some(metrics.daml.parallelIndexer.batching.executor -> metrics.registry), ) haCoordinator <- - if (storageBackend.dbLockSupported && haConfig.enable) - ResourceOwner - .forExecutorService(() => - ExecutionContext.fromExecutorService( - Executors.newFixedThreadPool( - 1, - new ThreadFactoryBuilder().setNameFormat(s"ha-coordinator-%d").build, + if (storageBackend.dbLockSupported && haConfig.enable) { + for { + executionContext <- ResourceOwner + .forExecutorService(() => + ExecutionContext.fromExecutorService( + Executors.newFixedThreadPool( + 1, + new ThreadFactoryBuilder().setNameFormat(s"ha-coordinator-%d").build, + ) ) ) - ) - .map( - HaCoordinator.databaseLockBasedHaCoordinator( - // this DataSource will be used to spawn the main connection where we keep the Indexer Main Lock - // The life-cycle of such connections matches the life-cycle of a protectedExecution - dataSource = storageBackend.createDataSource(jdbcUrl), - storageBackend = storageBackend, - _, - scheduler = mat.system.scheduler, - haConfig = haConfig, - ) - ) - else + timer <- ResourceOwner.forTimer(() => new Timer) + // this DataSource will be used to spawn the main connection where we keep the Indexer Main Lock + // The life-cycle of such connections matches the life-cycle of a protectedExecution + dataSource = storageBackend.createDataSource(jdbcUrl) + } yield HaCoordinator.databaseLockBasedHaCoordinator( + connectionFactory = () => dataSource.getConnection, + storageBackend = storageBackend, + executionContext = executionContext, + timer = timer, + haConfig = haConfig, + ) + } else ResourceOwner.successful(NoopHaCoordinator) } yield toIndexer { implicit resourceContext => implicit val ec: ExecutionContext = resourceContext.executionContext diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/TestDBLockStorageBackend.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/TestDBLockStorageBackend.scala new file mode 100644 index 00000000000..ccea2804cb7 --- /dev/null +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/TestDBLockStorageBackend.scala @@ -0,0 +1,226 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.indexer.ha + +import java.sql.{ + Blob, + CallableStatement, + Clob, + Connection, + DatabaseMetaData, + NClob, + PreparedStatement, + SQLWarning, + SQLXML, + Savepoint, + Statement, + Struct, +} +import java.util.Properties +import java.util.concurrent.Executor +import java.{sql, util} + +import com.daml.platform.store.backend.DBLockStorageBackend + +class TestDBLockStorageBackend extends DBLockStorageBackend { + + private var locks: Map[DBLockStorageBackend.Lock, Set[Connection]] = Map.empty + + override def tryAcquire( + lockId: DBLockStorageBackend.LockId, + lockMode: DBLockStorageBackend.LockMode, + )(connection: Connection): Option[DBLockStorageBackend.Lock] = synchronized { + if (connection.isClosed) throw new Exception("trying to acquire on a closed connection") + if (!lockId.isInstanceOf[TestLockId]) throw new Exception("foreign lockId") + removeClosedConnectionRelatedLocks() + val lock = DBLockStorageBackend.Lock(lockId, lockMode) + def doLock(): Option[DBLockStorageBackend.Lock] = { + locks = locks + (lock -> (locks.getOrElse(lock, Set.empty) + connection)) + Some(lock) + } + lockMode match { + case DBLockStorageBackend.LockMode.Exclusive => + ( + locks.get(lock), + locks.get(lock.copy(lockMode = DBLockStorageBackend.LockMode.Shared)), + ) match { + case (None, None) => doLock() + case (Some(connections), None) if connections == Set(connection) => doLock() + case _ => None // if any shared lock held, we cannot lock exclusively + } + case DBLockStorageBackend.LockMode.Shared => + ( + locks.get(lock), + locks.get(lock.copy(lockMode = DBLockStorageBackend.LockMode.Exclusive)), + ) match { + case (_, None) => doLock() + case _ => None // if any exclusive lock held, we cannot lock shared + } + } + } + + override def release(lock: DBLockStorageBackend.Lock)(connection: Connection): Boolean = + synchronized { + if (connection.isClosed) throw new Exception("trying to release on a closed connection") + if (!lock.lockId.isInstanceOf[TestLockId]) throw new Exception("foreign lockId") + removeClosedConnectionRelatedLocks() + locks.get(lock) match { + case None => false + case Some(connections) if connections contains connection => + if (connections.size == 1) locks = locks - lock + else locks = locks + (lock -> (connections - connection)) + true + case _ => false + } + } + + private def removeClosedConnectionRelatedLocks(): Unit = + locks = locks.collect { + case (lock, conns) if conns.exists(!_.isClosed) => (lock, conns.filter(!_.isClosed)) + } + + override def lock(id: Int): DBLockStorageBackend.LockId = + TestLockId(id) + + def cutExclusiveLockHoldingConnection(lockId: Int): Unit = synchronized { + val mainExclusiveIndexerLock = + DBLockStorageBackend.Lock(TestLockId(lockId), DBLockStorageBackend.LockMode.Exclusive) + locks + .getOrElse(mainExclusiveIndexerLock, Set.empty) + .foreach(_.close()) + locks = locks - mainExclusiveIndexerLock + } + + override def dbLockSupported: Boolean = true +} + +case class TestLockId(id: Int) extends DBLockStorageBackend.LockId + +class TestConnection extends Connection { + override def createStatement(): Statement = throw new UnsupportedOperationException + + override def prepareStatement(s: String): PreparedStatement = + throw new UnsupportedOperationException + + override def prepareCall(s: String): CallableStatement = throw new UnsupportedOperationException + + override def nativeSQL(s: String): String = throw new UnsupportedOperationException + + override def setAutoCommit(b: Boolean): Unit = throw new UnsupportedOperationException + + override def getAutoCommit: Boolean = throw new UnsupportedOperationException + + override def commit(): Unit = throw new UnsupportedOperationException + + override def rollback(): Unit = throw new UnsupportedOperationException + + private var _closed: Boolean = false + + override def close(): Unit = synchronized { + _closed = true + } + + override def isClosed: Boolean = synchronized(_closed) + + override def getMetaData: DatabaseMetaData = throw new UnsupportedOperationException + + override def setReadOnly(b: Boolean): Unit = throw new UnsupportedOperationException + + override def isReadOnly: Boolean = throw new UnsupportedOperationException + + override def setCatalog(s: String): Unit = throw new UnsupportedOperationException + + override def getCatalog: String = throw new UnsupportedOperationException + + override def setTransactionIsolation(i: Int): Unit = throw new UnsupportedOperationException + + override def getTransactionIsolation: Int = throw new UnsupportedOperationException + + override def getWarnings: SQLWarning = throw new UnsupportedOperationException + + override def clearWarnings(): Unit = throw new UnsupportedOperationException + + override def createStatement(i: Int, i1: Int): Statement = throw new UnsupportedOperationException + + override def prepareStatement(s: String, i: Int, i1: Int): PreparedStatement = + throw new UnsupportedOperationException + + override def prepareCall(s: String, i: Int, i1: Int): CallableStatement = + throw new UnsupportedOperationException + + override def getTypeMap: util.Map[String, Class[_]] = throw new UnsupportedOperationException + + override def setTypeMap(map: util.Map[String, Class[_]]): Unit = + throw new UnsupportedOperationException + + override def setHoldability(i: Int): Unit = throw new UnsupportedOperationException + + override def getHoldability: Int = throw new UnsupportedOperationException + + override def setSavepoint(): Savepoint = throw new UnsupportedOperationException + + override def setSavepoint(s: String): Savepoint = throw new UnsupportedOperationException + + override def rollback(savepoint: Savepoint): Unit = throw new UnsupportedOperationException + + override def releaseSavepoint(savepoint: Savepoint): Unit = + throw new UnsupportedOperationException + + override def createStatement(i: Int, i1: Int, i2: Int): Statement = + throw new UnsupportedOperationException + + override def prepareStatement(s: String, i: Int, i1: Int, i2: Int): PreparedStatement = + throw new UnsupportedOperationException + + override def prepareCall(s: String, i: Int, i1: Int, i2: Int): CallableStatement = + throw new UnsupportedOperationException + + override def prepareStatement(s: String, i: Int): PreparedStatement = + throw new UnsupportedOperationException + + override def prepareStatement(s: String, ints: Array[Int]): PreparedStatement = + throw new UnsupportedOperationException + + override def prepareStatement(s: String, strings: Array[String]): PreparedStatement = + throw new UnsupportedOperationException + + override def createClob(): Clob = throw new UnsupportedOperationException + + override def createBlob(): Blob = throw new UnsupportedOperationException + + override def createNClob(): NClob = throw new UnsupportedOperationException + + override def createSQLXML(): SQLXML = throw new UnsupportedOperationException + + override def isValid(i: Int): Boolean = throw new UnsupportedOperationException + + override def setClientInfo(s: String, s1: String): Unit = throw new UnsupportedOperationException + + override def setClientInfo(properties: Properties): Unit = throw new UnsupportedOperationException + + override def getClientInfo(s: String): String = throw new UnsupportedOperationException + + override def getClientInfo: Properties = throw new UnsupportedOperationException + + override def createArrayOf(s: String, objects: Array[AnyRef]): sql.Array = + throw new UnsupportedOperationException + + override def createStruct(s: String, objects: Array[AnyRef]): Struct = + throw new UnsupportedOperationException + + override def setSchema(s: String): Unit = throw new UnsupportedOperationException + + override def getSchema: String = throw new UnsupportedOperationException + + override def abort(executor: Executor): Unit = throw new UnsupportedOperationException + + override def setNetworkTimeout(executor: Executor, i: Int): Unit = + throw new UnsupportedOperationException + + override def getNetworkTimeout: Int = throw new UnsupportedOperationException + + override def unwrap[T](aClass: Class[T]): T = throw new UnsupportedOperationException + + override def isWrapperFor(aClass: Class[_]): Boolean = throw new UnsupportedOperationException +} diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala index c264730e446..9e276d4240d 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala @@ -6,14 +6,13 @@ package com.daml.platform.store.backend import org.scalatest.flatspec.AsyncFlatSpec trait StorageBackendSuite - extends StorageBackendSpec - with StorageBackendTestsInitialization + extends StorageBackendTestsInitialization with StorageBackendTestsInitializeIngestion with StorageBackendTestsIngestion with StorageBackendTestsCompletions with StorageBackendTestsReset with StorageBackendTestsPruning - with StorageBackendTestsDBLock + with StorageBackendTestsDBLockForSuite with StorageBackendTestsTimestamps { this: AsyncFlatSpec => } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDBLock.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDBLock.scala index f1647cc78e6..9b6a5de2c47 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDBLock.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDBLock.scala @@ -5,6 +5,7 @@ package com.daml.platform.store.backend import java.sql.Connection +import com.daml.logging.LoggingContext import com.daml.platform.store.backend.DBLockStorageBackend.{Lock, LockId, LockMode} import org.scalatest.Assertion import org.scalatest.concurrent.Eventually @@ -16,134 +17,134 @@ import org.scalatest.time.{Seconds, Span} import scala.concurrent.Future import scala.util.Try -private[backend] trait StorageBackendTestsDBLock - extends Matchers - with StorageBackendSpec - with Eventually { +private[platform] trait StorageBackendTestsDBLock extends Matchers with Eventually { this: AsyncFlatSpec => + protected def dbLock: DBLockStorageBackend + protected def getConnection: Connection + behavior of "DBLockStorageBackend" it should "allow to acquire the same shared lock many times" in dbLockTestCase(1) { c => - backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty - backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty - backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty - backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty } it should "allow to acquire the same exclusive lock many times" in dbLockTestCase(1) { c => - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)) should not be empty - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)) should not be empty - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)) should not be empty - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)) should not be empty } it should "allow shared locking" in dbLockTestCase(2) { c => - backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty - backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(2)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(2)) should not be empty } it should "allow shared locking many times" in dbLockTestCase(5) { c => - backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty - backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(2)) should not be empty - backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(3)) should not be empty - backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(4)) should not be empty - backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(5)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(2)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(3)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(4)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(5)) should not be empty } it should "not allow exclusive when locked by shared" in dbLockTestCase(2) { c => - backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty } it should "not allow exclusive when locked by exclusive" in dbLockTestCase(2) { c => - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)) should not be empty - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty } it should "not allow shared when locked by exclusive" in dbLockTestCase(2) { c => - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)) should not be empty - backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(2)) shouldBe empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(2)) shouldBe empty } it should "unlock successfully a shared lock" in dbLockTestCase(2) { c => - val lock = backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)).get - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty - backend.release(lock)(c(1)) shouldBe true - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) should not be empty + val lock = dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)).get + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty + dbLock.release(lock)(c(1)) shouldBe true + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) should not be empty } it should "release successfully a shared lock if connection closed" in dbLockTestCase(2) { c => - backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)).get - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)).get + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty c(1).close() eventually(timeout)( - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) should not be empty ) } it should "unlock successfully an exclusive lock" in dbLockTestCase(2) { c => - val lock = backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)).get - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty - backend.release(lock)(c(1)) shouldBe true - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) should not be empty + val lock = dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)).get + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty + dbLock.release(lock)(c(1)) shouldBe true + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) should not be empty } it should "release successfully an exclusive lock if connection closed" in dbLockTestCase(2) { c => - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)).get - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)).get + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty c(1).close() eventually(timeout)( - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) should not be empty ) } it should "be able to lock exclusive, if all shared locks are released" in dbLockTestCase(4) { c => - val shared1 = backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)).get - val shared2 = backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(2)).get - val shared3 = backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(3)).get - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(4)) shouldBe empty + val shared1 = dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)).get + val shared2 = dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(2)).get + val shared3 = dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(3)).get + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(4)) shouldBe empty - backend.release(shared1)(c(1)) - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(4)) shouldBe empty - backend.release(shared2)(c(2)) - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(4)) shouldBe empty - backend.release(shared3)(c(3)) - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(4)) should not be empty + dbLock.release(shared1)(c(1)) + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(4)) shouldBe empty + dbLock.release(shared2)(c(2)) + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(4)) shouldBe empty + dbLock.release(shared3)(c(3)) + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(4)) should not be empty } it should "lock immediately, or fail immediately" in dbLockTestCase(2) { c => - backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty val start = System.currentTimeMillis() - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty (System.currentTimeMillis() - start) should be < 500L } it should "fail to unlock lock held by others" in dbLockTestCase(2) { c => - val lock = backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)).get - backend.release(lock)(c(2)) shouldBe false + val lock = dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)).get + dbLock.release(lock)(c(2)) shouldBe false } it should "fail to unlock lock which is not held by anyone" in dbLockTestCase(1) { c => - backend.release(Lock(backend.lock(1), LockMode.Shared))(c(1)) shouldBe false + dbLock.release(Lock(dbLock.lock(1), LockMode.Shared))(c(1)) shouldBe false } it should "fail if attempt to use backend-foreign lock-id for locking" in dbLockTestCase(1) { c => - Try(backend.tryAcquire(new LockId {}, LockMode.Shared)(c(1))).isFailure shouldBe true + Try(dbLock.tryAcquire(new LockId {}, LockMode.Shared)(c(1))).isFailure shouldBe true } it should "fail if attempt to use backend-foreign lock-id for un-locking" in dbLockTestCase(1) { c => - Try(backend.release(Lock(new LockId {}, LockMode.Shared))(c(1))).isFailure shouldBe true + Try(dbLock.release(Lock(new LockId {}, LockMode.Shared))(c(1))).isFailure shouldBe true } it should "lock successfully exclusive different locks" in dbLockTestCase(1) { c => - backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)) should not be empty - backend.tryAcquire(backend.lock(2), LockMode.Exclusive)(c(1)) should not be empty - backend.tryAcquire(backend.lock(3), LockMode.Exclusive)(c(1)) should not be empty - backend.tryAcquire(backend.lock(4), LockMode.Exclusive)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(2), LockMode.Exclusive)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(3), LockMode.Exclusive)(c(1)) should not be empty + dbLock.tryAcquire(dbLock.lock(4), LockMode.Exclusive)(c(1)) should not be empty } private val timeout = Timeout(Span(10, Seconds)) @@ -151,18 +152,28 @@ private[backend] trait StorageBackendTestsDBLock private def dbLockTestCase( numOfConnectionsNeeded: Int )(test: List[Connection] => Any): Future[Assertion] = { - val dataSource = backend.createDataSource(jdbcUrl) - if (backend.dbLockSupported) { + if (dbLock.dbLockSupported) { // prepending with null so we can refer to connections 1 based in tests - val connections = null :: List.fill(numOfConnectionsNeeded)(dataSource.getConnection) + val connections = null :: List.fill(numOfConnectionsNeeded)(getConnection) val result = Try(test(connections)) connections.foreach(c => Try(c.close())) Future.fromTry(result).map(_ => 1 shouldBe 1) } else { info( - s"This test makes sense only for StorageBackend which supports DB-Locks. For ${backend.getClass.getName} StorageBackend this test is disabled." + s"This test makes sense only for StorageBackend which supports DB-Locks. For ${dbLock.getClass.getName} StorageBackend this test is disabled." ) Future.successful(1 shouldBe 1) } } } + +trait StorageBackendTestsDBLockForSuite + extends StorageBackendTestsDBLock + with StorageBackendProvider { + this: AsyncFlatSpec => + + override def dbLock: DBLockStorageBackend = backend + + override def getConnection: Connection = + backend.createDataSource(jdbcUrl)(LoggingContext.ForTesting).getConnection +} diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/HaCoordinatorSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/HaCoordinatorSpec.scala new file mode 100644 index 00000000000..81c39bbb559 --- /dev/null +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/HaCoordinatorSpec.scala @@ -0,0 +1,856 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.indexer.ha + +import java.sql.Connection +import java.util.Timer +import java.util.concurrent.atomic.AtomicInteger + +import akka.stream.KillSwitch +import akka.testkit.TestProbe +import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll +import com.daml.logging.{ContextualizedLogger, LoggingContext} +import com.daml.platform.store.backend.DBLockStorageBackend +import org.scalatest.concurrent.Eventually +import org.scalatest.flatspec.AsyncFlatSpec +import org.scalatest.matchers.should.Matchers + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Random, Try} + +class HaCoordinatorSpec + extends AsyncFlatSpec + with Matchers + with AkkaBeforeAndAfterAll + with Eventually { + implicit val ec: ExecutionContext = + system.dispatcher // we need this to not use the default EC which is coming from AsyncTestSuite, and which is serial + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + private val logger = ContextualizedLogger.get(this.getClass) + private val timer = new Timer(true) + + private val mainLockAcquireRetryMillis = 20L + private val workerLockAcquireRetryMillis = 20L + private val mainLockCheckerPeriodMillis = 20L + private val timeoutToleranceMillis = + 500L // unfortunately this needs to be a insanely big tolerance, not to render the test flaky. under normal circumstances this should pass with +5 millis + + private val mainLockId = 10 + private val main = TestLockId(mainLockId) + + private val workerLockId = 20 + private val worker = TestLockId(workerLockId) + + behavior of "databaseLockBasedHaCoordinator graceful shutdown" + + it should "successfully propagate successful end of execution" in { + val protectedSetup = setup() + import protectedSetup._ + + for { + _ <- connectionInitializerFuture + _ = { + info("As HACoordinator is initialized") + protectedHandle.completed.isCompleted shouldBe false + info("Protected Handle is not completed") + completeExecutionInitialization() + info("As protected execution initialization is finished") + executionCompletedPromise.success(()) + info("As execution is completed") + } + _ <- protectedHandle.completed + } yield { + info("Protected Handle is completed successfully") + 1 shouldBe 1 + } + } + + it should "successfully propagate failed end of execution" in { + val protectedSetup = setup() + import protectedSetup._ + + for { + _ <- connectionInitializerFuture + _ = { + info("As HACoordinator is initialized") + protectedHandle.completed.isCompleted shouldBe false + info("Protected Handle is not completed") + completeExecutionInitialization() + info("As protected execution initialization is finished") + executionCompletedPromise.failure(new Exception("failed execution")) + info("As execution completes with failure") + } + ex <- protectedHandle.completed.failed + } yield { + info("Protected Handle is completed with failure") + ex.getMessage shouldBe "failed execution" + } + } + + it should "propagate graceful shutdown to the protected execution" in { + val protectedSetup = setup() + import protectedSetup._ + + for { + connectionInitializer <- connectionInitializerFuture + _ = { + info("As HACoordinator is initialized") + protectedHandle.completed.isCompleted shouldBe false + info("Protected Handle is not completed") + connectionInitializer.initialize(new TestConnection) + info("Connection initializer works") + completeExecutionInitialization() + info("As protected execution initialization is finished") + connectionInitializer.initialize(new TestConnection) + info("Connection initializer works") + protectedHandle.killSwitch.shutdown() + info("And as graceful shutdown started") + } + _ <- executionShutdownFuture + _ = { + info("Shutdown is observed at execution") + connectionInitializer.initialize(new TestConnection) + info("Connection initializer works") + executionAbortedFuture.isCompleted shouldBe false + info("Abort is not observed at execution") + protectedHandle.completed.isCompleted shouldBe false + info("Protected Handle not completed") + Thread.sleep(200) + info("As waiting 200 millis") + protectedHandle.completed.isCompleted shouldBe false + info( + "Protected Handle is still not completed (hence it is waiting for execution to finish as the first step of the teardown process)" + ) + executionCompletedPromise.success(()) + info("As execution is completed") + } + _ <- protectedHandle.completed + } yield { + info("Protected Handle is completed successfully") + Try(connectionInitializer.initialize(new TestConnection)).isFailure shouldBe true + info("Connection initializer does not work anymore") + 1 shouldBe 1 + } + } + + it should "propagate graceful shutdown to the protected execution if shutdown initiated before execution initialization is finished" in { + val protectedSetup = setup() + import protectedSetup._ + + for { + connectionInitializer <- connectionInitializerFuture + _ = { + info("As HACoordinator is initialized") + protectedHandle.completed.isCompleted shouldBe false + info("Protected Handle is not completed") + connectionInitializer.initialize(new TestConnection) + info("Connection initializer works") + protectedHandle.killSwitch.shutdown() + info("As graceful shutdown started") + completeExecutionInitialization() + info("And As protected execution initialization is finished") + connectionInitializer.initialize(new TestConnection) + info( + "Connection initializer still works (release process first releases the execution, and only then the main connection and the poller)" + ) + } + _ <- executionShutdownFuture + _ = { + info("Shutdown is observed at execution") + connectionInitializer.initialize(new TestConnection) + info("Connection initializer works") + executionAbortedFuture.isCompleted shouldBe false + info("Abort is not observed at execution") + protectedHandle.completed.isCompleted shouldBe false + info("Protected Handle not completed") + executionCompletedPromise.success(()) + info("As execution is completed") + } + _ <- protectedHandle.completed + } yield { + info("Protected Handle is completed successfully") + Try(connectionInitializer.initialize(new TestConnection)).isFailure shouldBe true + info("Connection initializer does not work anymore") + 1 shouldBe 1 + } + } + + it should "swallow failures if graceful shutdown is underway" in { + val protectedSetup = setup() + import protectedSetup._ + + for { + connectionInitializer <- connectionInitializerFuture + _ = { + info("As HACoordinator is initialized") + protectedHandle.completed.isCompleted shouldBe false + info("Protected Handle is not completed") + connectionInitializer.initialize(new TestConnection) + info("Connection initializer works") + completeExecutionInitialization() + info("As protected execution initialization is finished") + connectionInitializer.initialize(new TestConnection) + info("Connection initializer works") + protectedHandle.killSwitch.shutdown() + info("And as graceful shutdown started") + } + _ <- executionShutdownFuture + _ = { + info("Shutdown is observed at execution") + connectionInitializer.initialize(new TestConnection) + info("Connection initializer works") + executionAbortedFuture.isCompleted shouldBe false + info("Abort is not observed at execution") + protectedHandle.completed.isCompleted shouldBe false + info("Protected Handle not completed") + executionCompletedPromise.failure(new Exception("some exception")) + info("As execution is completes with failure") + } + _ <- protectedHandle.completed + } yield { + info("Protected Handle is completed successfully") + Try(connectionInitializer.initialize(new TestConnection)).isFailure shouldBe true + info("Connection initializer does not work anymore") + 1 shouldBe 1 + } + } + + behavior of "databaseLockBasedHaCoordinator initialization" + + it should "fail if getting connection fails" in { + val protectedSetup = + setup(connectionFactory = () => throw new Exception("as getting connection")) + import protectedSetup._ + + for { + result <- protectedHandle.completed.failed + } yield { + info("Protected Handle is completed with failure") + result.getMessage shouldBe "as getting connection" + } + } + + it should "wait if main lock cannot be acquired" in { + val dbLock = new TestDBLockStorageBackend + val blockingConnection = new TestConnection + val blockingLock = + dbLock.tryAcquire(main, DBLockStorageBackend.LockMode.Exclusive)(blockingConnection).get + info("As acquiring the main lock from the outside") + val protectedSetup = setup(dbLock = dbLock) + import protectedSetup._ + info("As acquiring the main lock from the outside") + Thread.sleep(200) + info("And as waiting for 200 millis") + connectionInitializerFuture.isCompleted shouldBe false + protectedHandle.completed.isCompleted shouldBe false + info("Initialization should be waiting") + dbLock.release(blockingLock)(blockingConnection) shouldBe true + info("As releasing the blocking lock") + + for { + _ <- connectionInitializerFuture + _ = { + info("Initialisation should completed successfully") + completeExecutionInitialization() // cleanup + executionCompletedPromise.success(()) // cleanup + } + _ <- protectedHandle.completed + } yield { + 1 shouldBe 1 + } + } + + it should "wait for main lock can be interrupted by graceful shutdown" in { + val dbLock = new TestDBLockStorageBackend + val blockingConnection = new TestConnection + dbLock.tryAcquire(main, DBLockStorageBackend.LockMode.Exclusive)(blockingConnection).get + info("As acquiring the main lock from the outside") + val protectedSetup = setup(dbLock = dbLock) + import protectedSetup._ + Thread.sleep(200) + info("And as waiting for 200 millis") + connectionInitializerFuture.isCompleted shouldBe false + protectedHandle.completed.isCompleted shouldBe false + info("Initialization should be waiting") + protectedHandle.killSwitch.shutdown() + info("As graceful shutdown started") + + for { + _ <- protectedHandle.completed + } yield { + info("Protected Handle is completed successfully") + connectionInitializerFuture.isCompleted shouldBe false + } + } + + it should "wait if worker lock cannot be acquired due to exclusive blocking" in { + val dbLock = new TestDBLockStorageBackend + val blockingConnection = new TestConnection + val blockingLock = + dbLock.tryAcquire(worker, DBLockStorageBackend.LockMode.Exclusive)(blockingConnection).get + info("As acquiring the worker lock from the outside") + val protectedSetup = setup(dbLock = dbLock) + import protectedSetup._ + Thread.sleep(200) + info("And as waiting for 200 millis") + connectionInitializerFuture.isCompleted shouldBe false + protectedHandle.completed.isCompleted shouldBe false + info("Initialization should be waiting") + dbLock.release(blockingLock)(blockingConnection) shouldBe true + info("As releasing the blocking lock") + + for { + _ <- connectionInitializerFuture + _ = { + info("Initialisation should completed successfully") + completeExecutionInitialization() // cleanup + executionCompletedPromise.success(()) // cleanup + } + _ <- protectedHandle.completed + } yield { + 1 shouldBe 1 + } + } + + it should "wait if worker lock cannot be acquired due to shared blocking" in { + val dbLock = new TestDBLockStorageBackend + val blockingConnection = new TestConnection + val blockingLock = + dbLock.tryAcquire(worker, DBLockStorageBackend.LockMode.Shared)(blockingConnection).get + info("As acquiring the worker lock from the outside") + val protectedSetup = setup(dbLock = dbLock) + import protectedSetup._ + + Thread.sleep(200) + info("And as waiting for 200 millis") + connectionInitializerFuture.isCompleted shouldBe false + protectedHandle.completed.isCompleted shouldBe false + info("Initialization should be waiting") + dbLock.release(blockingLock)(blockingConnection) shouldBe true + info("As releasing the blocking lock") + + for { + _ <- connectionInitializerFuture + _ = { + info("Initialisation should completed successfully") + completeExecutionInitialization() // cleanup + executionCompletedPromise.success(()) // cleanup + } + _ <- protectedHandle.completed + } yield { + 1 shouldBe 1 + } + } + + it should "wait for worker lock can be interrupted by graceful shutdown" in { + val dbLock = new TestDBLockStorageBackend + val blockingConnection = new TestConnection + dbLock.tryAcquire(worker, DBLockStorageBackend.LockMode.Shared)(blockingConnection).get + info("As acquiring the worker lock from the outside") + val protectedSetup = setup(dbLock = dbLock) + import protectedSetup._ + + Thread.sleep(200) + info("And as waiting for 200 millis") + connectionInitializerFuture.isCompleted shouldBe false + protectedHandle.completed.isCompleted shouldBe false + info("Initialization should be waiting") + protectedHandle.killSwitch.shutdown() + info("As graceful shutdown starts") + + for { + _ <- protectedHandle.completed + } yield { + info("Protected Handle completes successfully") + connectionInitializerFuture.isCompleted shouldBe false + } + } + + it should "fail if worker lock cannot be acquired in time due to shared blocking" in { + val dbLock = new TestDBLockStorageBackend + val blockingConnection = new TestConnection + dbLock.tryAcquire(worker, DBLockStorageBackend.LockMode.Shared)(blockingConnection).get + info("As acquiring the worker lock from the outside") + val protectedSetup = setup( + dbLock = dbLock, + workerLockAcquireMaxRetry = 2, + ) + import protectedSetup._ + Thread.sleep(200) + info("And as waiting for 200 millis") + for { + failure <- protectedHandle.completed.failed + } yield { + info("Initialisation should completed with failure") + failure.getMessage shouldBe "Cannot acquire lock TestLockId(20) in lock-mode Exclusive: lock busy" + } + } + + it should "fail if execution initialization fails" in { + val protectedSetup = setup() + import protectedSetup._ + + for { + _ <- connectionInitializerFuture + _ = { + info("As execution initialization starts") + failDuringExecutionInitialization(new Exception("failed as initializing")) + info("Initialization fails") + } + failure <- protectedHandle.completed.failed + } yield { + info("Protected execution fails with failure") + failure.getMessage shouldBe "failed as initializing" + } + } + + behavior of "databaseLockBasedHaCoordinator main connection polling" + + it should "successfully prevent further worker connection-spawning, and trigger shutdown in execution, if main lock cannot be acquired anymore, triggered by connection-spawning" in { + val protectedSetup = setup() + import protectedSetup._ + + for { + connectionInitializer <- connectionInitializerFuture + _ = { + info("As HACoordinator is initialized") + protectedHandle.completed.isCompleted shouldBe false + info("Protected Handle is not completed") + completeExecutionInitialization() + info("As protected execution initialization is finished") + connectionInitializer.initialize(new TestConnection) + info("Connection initializer works") + Thread.sleep(200) + info("As waiting 200 millis") + protectedHandle.completed.isCompleted shouldBe false + info("Protected Handle is still not completed") + connectionInitializer.initialize(new TestConnection) + info("Connection initializer still works") + executionAbortedFuture.isCompleted shouldBe false + info("Execution is not aborted yet") + dbLock.cutExclusiveLockHoldingConnection(mainLockId) + info("As main connection is severed") + Try(connectionInitializer.initialize(new TestConnection)).isFailure shouldBe true + info("Connection initializer not working anymore") + protectedHandle.completed.isCompleted shouldBe false + info("Protected Handle is still not completed") + } + abortException <- executionAbortedFuture + _ = { + info("Execution is aborted") + abortException.getMessage shouldBe "check failed, killSwitch aborted" + executionCompletedPromise.failure( + new Exception("execution failed due to abort coming from outside") + ) + info("As execution fails") + } + failure <- protectedHandle.completed.failed + } yield { + info("Protected Handle is completed with failure") + failure.getMessage shouldBe "check failed, killSwitch aborted" + info("And completion failure is populated by check-failure") + 1 shouldBe 1 + } + } + + it should "successfully prevent further worker connection-spawning, and trigger shutdown in execution, if main lock cannot be acquired anymore, triggered by timeout" in { + val protectedSetup = setup() + import protectedSetup._ + + for { + connectionInitializer <- connectionInitializerFuture + mainConnectionSeveredAtNanos = { + info("As HACoordinator is initialized") + protectedHandle.completed.isCompleted shouldBe false + info("Protected Handle is not completed") + completeExecutionInitialization() + info("As protected execution initialization is finished") + connectionInitializer.initialize(new TestConnection) + info("Connection initializer works") + Thread.sleep(200) + info("As waiting 200 millis") + protectedHandle.completed.isCompleted shouldBe false + info("Protected Handle is still not completed") + connectionInitializer.initialize(new TestConnection) + info("Connection initializer still works") + protectedSetup.executionAbortedFuture.isCompleted shouldBe false + info("Execution is not aborted yet") + dbLock.cutExclusiveLockHoldingConnection(mainLockId) + info("As main connection is severed") + System.nanoTime() + } + abortException <- executionAbortedFuture + _ = { + info("Execution is aborted") + abortException.getMessage shouldBe "check failed, killSwitch aborted" + (System.nanoTime() - mainConnectionSeveredAtNanos) should be < (( + mainLockAcquireRetryMillis + + timeoutToleranceMillis + ) * 1000L * 1000L) + info("Within polling time-bounds") + Try(connectionInitializer.initialize(new TestConnection)).isFailure shouldBe true + info("Connection initializer not working anymore") + protectedHandle.completed.isCompleted shouldBe false + info("Protected Handle is still not completed") + executionCompletedPromise.failure( + new Exception("execution failed due to abort coming from outside") + ) + info("As execution fails") + } + failure <- protectedHandle.completed.failed + } yield { + info("Protected Handle is completed with failure") + failure.getMessage shouldBe "check failed, killSwitch aborted" + info("And completion failure is populated by check-failure") + 1 shouldBe 1 + } + } + + behavior of "databaseLockBasedHaCoordinator in multi-node setup" + + it should "successfully protect execution, and respect upper bound threshold as switching over in a 5 node setup, without worker locking" in { + val dbLock = new TestDBLockStorageBackend + + var nodes: Set[ProtectedSetup] = Set.empty + val nodeStartedExecutionProbe = TestProbe() + val nodeStoppedExecutionProbe = TestProbe() + val nodeHaltedProbe = TestProbe() + + def addNode(): Unit = synchronized { + val node = setup(dbLock = dbLock) + node.connectionInitializerFuture + .foreach { _ => + logger.info(s"execution started") + node.completeExecutionInitialization() + nodeStartedExecutionProbe.send(nodeStartedExecutionProbe.ref, "started") + } + node.executionShutdownFuture + .foreach { _ => + node.executionCompletedPromise.success(()) + nodeStoppedExecutionProbe.send(nodeStoppedExecutionProbe.ref, "stopped") + } + node.executionAbortedFuture + .foreach { t => + node.executionCompletedPromise.failure(t) + nodeStoppedExecutionProbe.send(nodeStoppedExecutionProbe.ref, "stopped") + } + node.protectedHandle.completed + .onComplete { _ => + removeNode(node) + nodeHaltedProbe.send(nodeHaltedProbe.ref, "halted") + } + nodes = nodes + node + info("As a node added") + } + + def removeNode(node: ProtectedSetup): Unit = synchronized { + nodes = nodes - node + } + + def verifyCleanSlate(expectedNumberOfNodes: Int): Unit = synchronized { + if (expectedNumberOfNodes == 0) { + nodes.size shouldBe 0 + } else { + nodes.size shouldBe expectedNumberOfNodes + nodes.exists(_.protectedHandle.completed.isCompleted) shouldBe false + nodes.count(_.connectionInitializerFuture.isCompleted) shouldBe 1 + val activeNode = nodes.find(_.connectionInitializerFuture.isCompleted).get + activeNode.executionAbortedFuture.isCompleted shouldBe false + activeNode.executionShutdownFuture.isCompleted shouldBe false + } + nodeStartedExecutionProbe.expectNoMessage(FiniteDuration(0, "seconds")) + nodeStoppedExecutionProbe.expectNoMessage(FiniteDuration(0, "seconds")) + nodeHaltedProbe.expectNoMessage(FiniteDuration(0, "seconds")) + info("Cluster is in expected shape") + } + + def wait(): Unit = { + val waitMillis: Long = Random.nextInt(100).toLong + Thread.sleep(waitMillis) + info(s"As waiting $waitMillis millis") + } + + addNode() + nodeStartedExecutionProbe.expectMsg("started") + info("The first node started execution") + verifyCleanSlate(1) + addNode() + verifyCleanSlate(2) + addNode() + verifyCleanSlate(3) + addNode() + verifyCleanSlate(4) + addNode() + verifyCleanSlate(5) + info("As adding 5 nodes") + wait() + verifyCleanSlate(5) + info("Cluster stabilized") + + for (_ <- 1 to 30) { + wait() + verifyCleanSlate(5) + dbLock.cutExclusiveLockHoldingConnection(mainLockId) + logger.info(s"main lock force-released") + val mainConnCutNanos = System.nanoTime() + info( + "As active node looses main connection (and main index lock is available for acquisition again)" + ) + nodeStartedExecutionProbe.expectMsg("started") + (System.nanoTime() - mainConnCutNanos) should be < (( + mainLockAcquireRetryMillis + + timeoutToleranceMillis + ) * 1000L * 1000L) + info("Some other node started execution within time bounds") + nodeStoppedExecutionProbe.expectMsg("stopped") + nodeHaltedProbe.expectMsg("halted") + info("And originally active node stopped") + verifyCleanSlate(4) + addNode() + verifyCleanSlate(5) + } + + def tearDown(): Unit = { + dbLock.cutExclusiveLockHoldingConnection(mainLockId) + nodeStartedExecutionProbe.expectMsg("started") + nodeStoppedExecutionProbe.expectMsg("stopped") + nodeHaltedProbe.expectMsg("halted") + () + } + + tearDown() + verifyCleanSlate(4) + tearDown() + verifyCleanSlate(3) + tearDown() + verifyCleanSlate(2) + tearDown() + verifyCleanSlate(1) + dbLock.cutExclusiveLockHoldingConnection(mainLockId) + nodeStoppedExecutionProbe.expectMsg("stopped") + nodeHaltedProbe.expectMsg("halted") + verifyCleanSlate(0) + + Future.successful(1 shouldBe 1) + } + + it should "successfully protect execution, and respect upper bound threshold as switching over in a 5 node setup, with worker connection locking: loosing only indexer lock" in { + val dbLock = new TestDBLockStorageBackend + + val keepUsingWorkerAfterShutdownMillis = 100L + + var nodes: Set[ProtectedSetup] = Set.empty + val nodeStartedExecutionProbe = TestProbe() + val nodeStoppedExecutionProbe = TestProbe() + val nodeHaltedProbe = TestProbe() + val concurrentWorkers = new AtomicInteger(0) + + def addNode(): Unit = synchronized { + val node = setup(dbLock = dbLock) + val workerConnection = new TestConnection + node.connectionInitializerFuture + .foreach { connectionInitializer => + logger.info(s"execution started") + node.completeExecutionInitialization() + connectionInitializer.initialize(workerConnection) + concurrentWorkers.incrementAndGet() + nodeStartedExecutionProbe.send(nodeStartedExecutionProbe.ref, "started") + } + node.executionShutdownFuture + .foreach { _ => + Thread.sleep(keepUsingWorkerAfterShutdownMillis) + concurrentWorkers.decrementAndGet() + dbLock.release(DBLockStorageBackend.Lock(worker, DBLockStorageBackend.LockMode.Shared))( + workerConnection + ) + node.executionCompletedPromise.success(()) + nodeStoppedExecutionProbe.send(nodeStoppedExecutionProbe.ref, "stopped") + } + node.executionAbortedFuture + .foreach { t => + Thread.sleep(keepUsingWorkerAfterShutdownMillis) + concurrentWorkers.decrementAndGet() + dbLock.release(DBLockStorageBackend.Lock(worker, DBLockStorageBackend.LockMode.Shared))( + workerConnection + ) + node.executionCompletedPromise.failure(t) + nodeStoppedExecutionProbe.send(nodeStoppedExecutionProbe.ref, "stopped") + } + node.protectedHandle.completed + .onComplete { _ => + removeNode(node) + nodeHaltedProbe.send(nodeHaltedProbe.ref, "halted") + } + nodes = nodes + node + info("As a node added") + } + + def removeNode(node: ProtectedSetup): Unit = synchronized { + nodes = nodes - node + } + + def verifyCleanSlate(expectedNumberOfNodes: Int): Unit = synchronized { + if (expectedNumberOfNodes == 0) { + nodes.size shouldBe 0 + } else { + concurrentWorkers.get() shouldBe 1 + nodes.size shouldBe expectedNumberOfNodes + nodes.exists(_.protectedHandle.completed.isCompleted) shouldBe false + nodes.count(_.connectionInitializerFuture.isCompleted) shouldBe 1 + val activeNode = nodes.find(_.connectionInitializerFuture.isCompleted).get + activeNode.executionAbortedFuture.isCompleted shouldBe false + activeNode.executionShutdownFuture.isCompleted shouldBe false + } + nodeStartedExecutionProbe.expectNoMessage(FiniteDuration(0, "seconds")) + nodeStoppedExecutionProbe.expectNoMessage(FiniteDuration(0, "seconds")) + nodeHaltedProbe.expectNoMessage(FiniteDuration(0, "seconds")) + info("Cluster is in expected shape") + } + + def wait(): Unit = { + val waitMillis: Long = Random.nextInt(100).toLong + Thread.sleep(waitMillis) + info(s"As waiting $waitMillis millis") + } + + addNode() + nodeStartedExecutionProbe.expectMsg("started") + info("The first node started execution") + verifyCleanSlate(1) + addNode() + verifyCleanSlate(2) + addNode() + verifyCleanSlate(3) + addNode() + verifyCleanSlate(4) + addNode() + verifyCleanSlate(5) + info("As adding 5 nodes") + wait() + verifyCleanSlate(5) + info("Cluster stabilized") + + for (_ <- 1 to 30) { + wait() + verifyCleanSlate(5) + dbLock.cutExclusiveLockHoldingConnection(mainLockId) + logger.info(s"main lock force-released") + val mainConnCutNanos = System.nanoTime() + info("As active node looses main connection (and with time drops worker connection as well)") + nodeStartedExecutionProbe.expectMsg("started") + (System.nanoTime() - mainConnCutNanos) should be < (( + mainLockCheckerPeriodMillis + // first active node has to realize that it lost the lock + keepUsingWorkerAfterShutdownMillis + // then it is shutting down, but it will take this long to release the worker lock as well + workerLockAcquireRetryMillis + // by the time of here the new active node already acquired the main lock, and it is polling for the worker lock, so maximum so much time we need to wait + timeoutToleranceMillis + ) * 1000L * 1000L) + info("Some other node started execution within time bounds") + nodeStoppedExecutionProbe.expectMsg("stopped") + nodeHaltedProbe.expectMsg("halted") + info("And originally active node stopped") + verifyCleanSlate(4) + addNode() + verifyCleanSlate(5) + } + + def tearDown(): Unit = { + dbLock.cutExclusiveLockHoldingConnection(mainLockId) + nodeStartedExecutionProbe.expectMsg("started") + nodeStoppedExecutionProbe.expectMsg("stopped") + nodeHaltedProbe.expectMsg("halted") + () + } + + tearDown() + verifyCleanSlate(4) + tearDown() + verifyCleanSlate(3) + tearDown() + verifyCleanSlate(2) + tearDown() + verifyCleanSlate(1) + dbLock.cutExclusiveLockHoldingConnection(mainLockId) + nodeStoppedExecutionProbe.expectMsg("stopped") + nodeHaltedProbe.expectMsg("halted") + verifyCleanSlate(0) + + Future.successful(1 shouldBe 1) + } + + private def setup( + connectionFactory: () => Connection = () => new TestConnection, + workerLockAcquireMaxRetry: Long = 100, + dbLock: TestDBLockStorageBackend = new TestDBLockStorageBackend, + ): ProtectedSetup = { + val connectionInitializerPromise = Promise[ConnectionInitializer]() + val executionHandlePromise = Promise[Unit]() + + val shutdownPromise = Promise[Unit]() + val abortPromise = Promise[Throwable]() + val completedPromise = Promise[Unit]() + + val protectedHandle = HaCoordinator + .databaseLockBasedHaCoordinator( + connectionFactory = connectionFactory, + storageBackend = dbLock, + executionContext = system.dispatcher, + timer = timer, + haConfig = HaConfig( + enable = true, + mainLockAcquireRetryMillis = mainLockAcquireRetryMillis, + workerLockAcquireRetryMillis = workerLockAcquireRetryMillis, + workerLockAcquireMaxRetry = workerLockAcquireMaxRetry, + mainLockCheckerPeriodMillis = mainLockCheckerPeriodMillis, + indexerLockId = 10, + indexerWorkerLockId = 20, + ), + ) + .protectedExecution { connectionInitializer => + connectionInitializerPromise.success(connectionInitializer) + executionHandlePromise.future.map(_ => + Handle( + killSwitch = new KillSwitch { + override def shutdown(): Unit = shutdownPromise.success(()) + override def abort(ex: Throwable): Unit = abortPromise.success(ex) + }, + completed = completedPromise.future, + ) + ) + } + + ProtectedSetup( + protectedHandle = protectedHandle, + connectionInitializerFuture = connectionInitializerPromise.future, + executionHandlePromise = executionHandlePromise, + executionShutdownFuture = shutdownPromise.future, + executionAbortedFuture = abortPromise.future, + executionCompletedPromise = completedPromise, + dbLock = dbLock, + ) + } + + case class ProtectedSetup( + protectedHandle: Handle, // the protected Handle to observe and interact with + connectionInitializerFuture: Future[ + ConnectionInitializer + ], // observe ConnectionInitializer, this completes as HA arrives to the stage when execution initialization starts + executionHandlePromise: Promise[Unit], // trigger end of execution initialization + executionShutdownFuture: Future[Unit], // observe shutdown in execution + executionAbortedFuture: Future[Throwable], // observe abort in execution + executionCompletedPromise: Promise[Unit], // trigger completion of execution + dbLock: TestDBLockStorageBackend, // the lock backend + ) { + + /** trigger end of execution initialization */ + def completeExecutionInitialization(): Unit = { + executionHandlePromise.success(()) + } + + /** simulate a failure during execution initialization */ + def failDuringExecutionInitialization(cause: Throwable): Unit = { + executionHandlePromise.failure(cause) + } + } +} diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/TestDBLockStorageBackendSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/TestDBLockStorageBackendSpec.scala new file mode 100644 index 00000000000..ca8571ffe71 --- /dev/null +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/ha/TestDBLockStorageBackendSpec.scala @@ -0,0 +1,26 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.indexer.ha + +import java.sql.Connection + +import com.daml.platform.store.backend.{DBLockStorageBackend, StorageBackendTestsDBLock} +import org.scalatest.BeforeAndAfter +import org.scalatest.flatspec.AsyncFlatSpec + +class TestDBLockStorageBackendSpec + extends AsyncFlatSpec + with StorageBackendTestsDBLock + with BeforeAndAfter { + + private var _dbLock: TestDBLockStorageBackend = _ + + before { + _dbLock = new TestDBLockStorageBackend + } + + override def dbLock: DBLockStorageBackend = _dbLock + + override def getConnection: Connection = new TestConnection +}