Dpp 494 unit testing ha coordinator (#10862)

* Unit testing: HA Coordinator

* Preparation: switch to fix-thread-pool in AkkaBeforeAndAfterAll to have more stable test runtimes

CHANGELOG_BEGIN
CHANGELOG_END

* Preparation: switch to timer from akka-pattern  and Scheduler in waiting-futures, for higher precision timing

CHANGELOG_BEGIN
CHANGELOG_END

* Preparation: prevent race in PollingChecker (corner-case uncovered via sustained testing)

CHANGELOG_BEGIN
CHANGELOG_END

* Preparation - HaCoordinator refactorings: switch from DataSource to connection factory functions to clean dependencies, fixing some typos

CHANGELOG_BEGIN
CHANGELOG_END

* Add test infrastructure for locking

CHANGELOG_BEGIN
CHANGELOG_END

* Reuse StorageBackendTestsDBLock to test TestDBLockStorageBackend

CHANGELOG_BEGIN
CHANGELOG_END

* Add unit test suite for HaCoordinator

CHANGELOG_BEGIN
CHANGELOG_END

* Fix random generation to respect scala 2.12

CHANGELOG_BEGIN
CHANGELOG_END

* Adds test cases for graceful-shutdown during initialization

CHANGELOG_BEGIN
CHANGELOG_END

* Minor changes based on review

CHANGELOG_BEGIN
CHANGELOG_END

* Some comment rewording based on review

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Marton Nagy 2021-09-16 17:08:45 +02:00 committed by GitHub
parent 30f74adc99
commit 308f938512
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1244 additions and 108 deletions

View File

@ -23,14 +23,15 @@ trait AkkaBeforeAndAfterAll extends BeforeAndAfterAll {
private implicit lazy val executionContext: ExecutionContext =
ExecutionContext.fromExecutorService(
Executors.newCachedThreadPool(
Executors.newFixedThreadPool(
16,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(s"$actorSystemName-thread-pool-worker-%d")
.setUncaughtExceptionHandler((thread, e) =>
logger.error(s"got an uncaught exception on thread: ${thread.getName}", e)
)
.build()
.build(),
)
)

View File

@ -4,13 +4,12 @@
package com.daml.platform.indexer.ha
import java.sql.Connection
import java.util.Timer
import akka.actor.Scheduler
import akka.stream.KillSwitch
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.store.backend.DBLockStorageBackend.{Lock, LockId, LockMode}
import com.daml.platform.store.backend.DBLockStorageBackend
import javax.sql.DataSource
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
@ -55,9 +54,9 @@ trait HaCoordinator {
}
case class HaConfig(
mainLockAquireRetryMillis: Long = 500,
workerLockAquireRetryMillis: Long = 500,
workerLockAquireMaxRetry: Long = 1000,
mainLockAcquireRetryMillis: Long = 500,
workerLockAcquireRetryMillis: Long = 500,
workerLockAcquireMaxRetry: Long = 1000,
mainLockCheckerPeriodMillis: Long = 1000,
indexerLockId: Int = 0x646d6c00, // note 0x646d6c equals ASCII encoded "dml"
indexerWorkerLockId: Int = 0x646d6c01,
@ -75,22 +74,22 @@ object HaCoordinator {
* - provides a ConnectionInitializer function which is mandatory to execute on all worker connections during execution
* - will spawn a polling-daemon to observe continuous presence of the main lock
*
* @param dataSource to spawn the main connection which keeps the Indexer Main Lock
* @param connectionFactory to spawn the main connection which keeps the Indexer Main Lock
* @param storageBackend is the database-independent abstraction of session/connection level database locking
* @param executionContext which is use to execute initialisation, will do blocking/IO work, so dedicated execution context is recommended
*/
def databaseLockBasedHaCoordinator(
dataSource: DataSource,
connectionFactory: () => Connection,
storageBackend: DBLockStorageBackend,
executionContext: ExecutionContext,
scheduler: Scheduler,
timer: Timer,
haConfig: HaConfig,
)(implicit loggingContext: LoggingContext): HaCoordinator = {
implicit val ec: ExecutionContext = executionContext
val indexerLockId = storageBackend.lock(haConfig.indexerLockId)
val indexerWorkerLockId = storageBackend.lock(haConfig.indexerWorkerLockId)
val preemptableSequence = PreemptableSequence(scheduler)
val preemptableSequence = PreemptableSequence(timer)
new HaCoordinator {
override def protectedExecution(
@ -114,18 +113,18 @@ object HaCoordinator {
import sequenceHelper._
logger.info("Starting databaseLockBasedHaCoordinator")
for {
mainConnection <- go[Connection](dataSource.getConnection)
mainConnection <- go[Connection](connectionFactory())
_ = logger.info("Step 1: creating main-connection - DONE")
_ = registerRelease {
logger.info("Releasing main connection...")
mainConnection.close()
logger.info("Released main connection")
}
_ <- retry(haConfig.mainLockAquireRetryMillis)(acquireMainLock(mainConnection))
_ <- retry(haConfig.mainLockAcquireRetryMillis)(acquireMainLock(mainConnection))
_ = logger.info("Step 2: acquire exclusive Indexer Main Lock on main-connection - DONE")
exclusiveWorkerLock <- retry[Lock](
haConfig.workerLockAquireRetryMillis,
haConfig.workerLockAquireMaxRetry,
haConfig.workerLockAcquireRetryMillis,
haConfig.workerLockAcquireMaxRetry,
)(
acquireLock(mainConnection, indexerWorkerLockId, LockMode.Exclusive)
)

View File

@ -51,6 +51,9 @@ class PollingChecker(
def check(): Unit = synchronized {
logger.debug(s"Checking...")
Try(checkBody) match {
case _ if closed =>
throw new Exception("Checker is already closed")
case Success(_) =>
logger.debug(s"Check successful.")
@ -61,5 +64,10 @@ class PollingChecker(
}
}
def close(): Unit = timer.cancel()
private var closed: Boolean = false
def close(): Unit = synchronized {
closed = true
timer.cancel()
}
}

View File

@ -3,10 +3,10 @@
package com.daml.platform.indexer.ha
import akka.actor.Scheduler
import java.util.{Timer, TimerTask}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
@ -88,7 +88,7 @@ object PreemptableSequence {
* - and encapsulate synchronous work in futures (this could be possibly blocking)
* Because of the possible blocking nature a dedicated pool is recommended.
*/
def apply(scheduler: Scheduler)(implicit
def apply(timer: Timer)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): PreemptableSequence = { sequence =>
@ -98,8 +98,16 @@ object PreemptableSequence {
var releaseStack: List[() => Future[Unit]] = Nil
val helper: SequenceHelper = new SequenceHelper {
private def waitFor(delayMillis: Long): Future[Unit] =
goF(akka.pattern.after(FiniteDuration(delayMillis, "millis"), scheduler)(Future.unit))
private def waitFor(delayMillis: Long): Future[Unit] = {
val p = Promise[Unit]()
timer.schedule(
new TimerTask {
override def run(): Unit = p.success(())
},
delayMillis,
)
goF(p.future)
}
override def registerRelease(release: => Unit): Unit = synchronized {
logger.info(s"Registered release function")

View File

@ -3,6 +3,7 @@
package com.daml.platform.indexer.parallel
import java.util.Timer
import java.util.concurrent.Executors
import akka.stream.{KillSwitch, Materializer}
@ -52,28 +53,29 @@ object ParallelIndexerFactory {
Some(metrics.daml.parallelIndexer.batching.executor -> metrics.registry),
)
haCoordinator <-
if (storageBackend.dbLockSupported && haConfig.enable)
ResourceOwner
.forExecutorService(() =>
ExecutionContext.fromExecutorService(
Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder().setNameFormat(s"ha-coordinator-%d").build,
if (storageBackend.dbLockSupported && haConfig.enable) {
for {
executionContext <- ResourceOwner
.forExecutorService(() =>
ExecutionContext.fromExecutorService(
Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder().setNameFormat(s"ha-coordinator-%d").build,
)
)
)
)
.map(
HaCoordinator.databaseLockBasedHaCoordinator(
// this DataSource will be used to spawn the main connection where we keep the Indexer Main Lock
// The life-cycle of such connections matches the life-cycle of a protectedExecution
dataSource = storageBackend.createDataSource(jdbcUrl),
storageBackend = storageBackend,
_,
scheduler = mat.system.scheduler,
haConfig = haConfig,
)
)
else
timer <- ResourceOwner.forTimer(() => new Timer)
// this DataSource will be used to spawn the main connection where we keep the Indexer Main Lock
// The life-cycle of such connections matches the life-cycle of a protectedExecution
dataSource = storageBackend.createDataSource(jdbcUrl)
} yield HaCoordinator.databaseLockBasedHaCoordinator(
connectionFactory = () => dataSource.getConnection,
storageBackend = storageBackend,
executionContext = executionContext,
timer = timer,
haConfig = haConfig,
)
} else
ResourceOwner.successful(NoopHaCoordinator)
} yield toIndexer { implicit resourceContext =>
implicit val ec: ExecutionContext = resourceContext.executionContext

View File

@ -0,0 +1,226 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.indexer.ha
import java.sql.{
Blob,
CallableStatement,
Clob,
Connection,
DatabaseMetaData,
NClob,
PreparedStatement,
SQLWarning,
SQLXML,
Savepoint,
Statement,
Struct,
}
import java.util.Properties
import java.util.concurrent.Executor
import java.{sql, util}
import com.daml.platform.store.backend.DBLockStorageBackend
class TestDBLockStorageBackend extends DBLockStorageBackend {
private var locks: Map[DBLockStorageBackend.Lock, Set[Connection]] = Map.empty
override def tryAcquire(
lockId: DBLockStorageBackend.LockId,
lockMode: DBLockStorageBackend.LockMode,
)(connection: Connection): Option[DBLockStorageBackend.Lock] = synchronized {
if (connection.isClosed) throw new Exception("trying to acquire on a closed connection")
if (!lockId.isInstanceOf[TestLockId]) throw new Exception("foreign lockId")
removeClosedConnectionRelatedLocks()
val lock = DBLockStorageBackend.Lock(lockId, lockMode)
def doLock(): Option[DBLockStorageBackend.Lock] = {
locks = locks + (lock -> (locks.getOrElse(lock, Set.empty) + connection))
Some(lock)
}
lockMode match {
case DBLockStorageBackend.LockMode.Exclusive =>
(
locks.get(lock),
locks.get(lock.copy(lockMode = DBLockStorageBackend.LockMode.Shared)),
) match {
case (None, None) => doLock()
case (Some(connections), None) if connections == Set(connection) => doLock()
case _ => None // if any shared lock held, we cannot lock exclusively
}
case DBLockStorageBackend.LockMode.Shared =>
(
locks.get(lock),
locks.get(lock.copy(lockMode = DBLockStorageBackend.LockMode.Exclusive)),
) match {
case (_, None) => doLock()
case _ => None // if any exclusive lock held, we cannot lock shared
}
}
}
override def release(lock: DBLockStorageBackend.Lock)(connection: Connection): Boolean =
synchronized {
if (connection.isClosed) throw new Exception("trying to release on a closed connection")
if (!lock.lockId.isInstanceOf[TestLockId]) throw new Exception("foreign lockId")
removeClosedConnectionRelatedLocks()
locks.get(lock) match {
case None => false
case Some(connections) if connections contains connection =>
if (connections.size == 1) locks = locks - lock
else locks = locks + (lock -> (connections - connection))
true
case _ => false
}
}
private def removeClosedConnectionRelatedLocks(): Unit =
locks = locks.collect {
case (lock, conns) if conns.exists(!_.isClosed) => (lock, conns.filter(!_.isClosed))
}
override def lock(id: Int): DBLockStorageBackend.LockId =
TestLockId(id)
def cutExclusiveLockHoldingConnection(lockId: Int): Unit = synchronized {
val mainExclusiveIndexerLock =
DBLockStorageBackend.Lock(TestLockId(lockId), DBLockStorageBackend.LockMode.Exclusive)
locks
.getOrElse(mainExclusiveIndexerLock, Set.empty)
.foreach(_.close())
locks = locks - mainExclusiveIndexerLock
}
override def dbLockSupported: Boolean = true
}
case class TestLockId(id: Int) extends DBLockStorageBackend.LockId
class TestConnection extends Connection {
override def createStatement(): Statement = throw new UnsupportedOperationException
override def prepareStatement(s: String): PreparedStatement =
throw new UnsupportedOperationException
override def prepareCall(s: String): CallableStatement = throw new UnsupportedOperationException
override def nativeSQL(s: String): String = throw new UnsupportedOperationException
override def setAutoCommit(b: Boolean): Unit = throw new UnsupportedOperationException
override def getAutoCommit: Boolean = throw new UnsupportedOperationException
override def commit(): Unit = throw new UnsupportedOperationException
override def rollback(): Unit = throw new UnsupportedOperationException
private var _closed: Boolean = false
override def close(): Unit = synchronized {
_closed = true
}
override def isClosed: Boolean = synchronized(_closed)
override def getMetaData: DatabaseMetaData = throw new UnsupportedOperationException
override def setReadOnly(b: Boolean): Unit = throw new UnsupportedOperationException
override def isReadOnly: Boolean = throw new UnsupportedOperationException
override def setCatalog(s: String): Unit = throw new UnsupportedOperationException
override def getCatalog: String = throw new UnsupportedOperationException
override def setTransactionIsolation(i: Int): Unit = throw new UnsupportedOperationException
override def getTransactionIsolation: Int = throw new UnsupportedOperationException
override def getWarnings: SQLWarning = throw new UnsupportedOperationException
override def clearWarnings(): Unit = throw new UnsupportedOperationException
override def createStatement(i: Int, i1: Int): Statement = throw new UnsupportedOperationException
override def prepareStatement(s: String, i: Int, i1: Int): PreparedStatement =
throw new UnsupportedOperationException
override def prepareCall(s: String, i: Int, i1: Int): CallableStatement =
throw new UnsupportedOperationException
override def getTypeMap: util.Map[String, Class[_]] = throw new UnsupportedOperationException
override def setTypeMap(map: util.Map[String, Class[_]]): Unit =
throw new UnsupportedOperationException
override def setHoldability(i: Int): Unit = throw new UnsupportedOperationException
override def getHoldability: Int = throw new UnsupportedOperationException
override def setSavepoint(): Savepoint = throw new UnsupportedOperationException
override def setSavepoint(s: String): Savepoint = throw new UnsupportedOperationException
override def rollback(savepoint: Savepoint): Unit = throw new UnsupportedOperationException
override def releaseSavepoint(savepoint: Savepoint): Unit =
throw new UnsupportedOperationException
override def createStatement(i: Int, i1: Int, i2: Int): Statement =
throw new UnsupportedOperationException
override def prepareStatement(s: String, i: Int, i1: Int, i2: Int): PreparedStatement =
throw new UnsupportedOperationException
override def prepareCall(s: String, i: Int, i1: Int, i2: Int): CallableStatement =
throw new UnsupportedOperationException
override def prepareStatement(s: String, i: Int): PreparedStatement =
throw new UnsupportedOperationException
override def prepareStatement(s: String, ints: Array[Int]): PreparedStatement =
throw new UnsupportedOperationException
override def prepareStatement(s: String, strings: Array[String]): PreparedStatement =
throw new UnsupportedOperationException
override def createClob(): Clob = throw new UnsupportedOperationException
override def createBlob(): Blob = throw new UnsupportedOperationException
override def createNClob(): NClob = throw new UnsupportedOperationException
override def createSQLXML(): SQLXML = throw new UnsupportedOperationException
override def isValid(i: Int): Boolean = throw new UnsupportedOperationException
override def setClientInfo(s: String, s1: String): Unit = throw new UnsupportedOperationException
override def setClientInfo(properties: Properties): Unit = throw new UnsupportedOperationException
override def getClientInfo(s: String): String = throw new UnsupportedOperationException
override def getClientInfo: Properties = throw new UnsupportedOperationException
override def createArrayOf(s: String, objects: Array[AnyRef]): sql.Array =
throw new UnsupportedOperationException
override def createStruct(s: String, objects: Array[AnyRef]): Struct =
throw new UnsupportedOperationException
override def setSchema(s: String): Unit = throw new UnsupportedOperationException
override def getSchema: String = throw new UnsupportedOperationException
override def abort(executor: Executor): Unit = throw new UnsupportedOperationException
override def setNetworkTimeout(executor: Executor, i: Int): Unit =
throw new UnsupportedOperationException
override def getNetworkTimeout: Int = throw new UnsupportedOperationException
override def unwrap[T](aClass: Class[T]): T = throw new UnsupportedOperationException
override def isWrapperFor(aClass: Class[_]): Boolean = throw new UnsupportedOperationException
}

View File

@ -6,14 +6,13 @@ package com.daml.platform.store.backend
import org.scalatest.flatspec.AsyncFlatSpec
trait StorageBackendSuite
extends StorageBackendSpec
with StorageBackendTestsInitialization
extends StorageBackendTestsInitialization
with StorageBackendTestsInitializeIngestion
with StorageBackendTestsIngestion
with StorageBackendTestsCompletions
with StorageBackendTestsReset
with StorageBackendTestsPruning
with StorageBackendTestsDBLock
with StorageBackendTestsDBLockForSuite
with StorageBackendTestsTimestamps {
this: AsyncFlatSpec =>
}

View File

@ -5,6 +5,7 @@ package com.daml.platform.store.backend
import java.sql.Connection
import com.daml.logging.LoggingContext
import com.daml.platform.store.backend.DBLockStorageBackend.{Lock, LockId, LockMode}
import org.scalatest.Assertion
import org.scalatest.concurrent.Eventually
@ -16,134 +17,134 @@ import org.scalatest.time.{Seconds, Span}
import scala.concurrent.Future
import scala.util.Try
private[backend] trait StorageBackendTestsDBLock
extends Matchers
with StorageBackendSpec
with Eventually {
private[platform] trait StorageBackendTestsDBLock extends Matchers with Eventually {
this: AsyncFlatSpec =>
protected def dbLock: DBLockStorageBackend
protected def getConnection: Connection
behavior of "DBLockStorageBackend"
it should "allow to acquire the same shared lock many times" in dbLockTestCase(1) { c =>
backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty
backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty
backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty
backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty
}
it should "allow to acquire the same exclusive lock many times" in dbLockTestCase(1) { c =>
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)) should not be empty
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)) should not be empty
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)) should not be empty
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)) should not be empty
}
it should "allow shared locking" in dbLockTestCase(2) { c =>
backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty
backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(2)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(2)) should not be empty
}
it should "allow shared locking many times" in dbLockTestCase(5) { c =>
backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty
backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(2)) should not be empty
backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(3)) should not be empty
backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(4)) should not be empty
backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(5)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(2)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(3)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(4)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(5)) should not be empty
}
it should "not allow exclusive when locked by shared" in dbLockTestCase(2) { c =>
backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty
}
it should "not allow exclusive when locked by exclusive" in dbLockTestCase(2) { c =>
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)) should not be empty
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty
}
it should "not allow shared when locked by exclusive" in dbLockTestCase(2) { c =>
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)) should not be empty
backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(2)) shouldBe empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(2)) shouldBe empty
}
it should "unlock successfully a shared lock" in dbLockTestCase(2) { c =>
val lock = backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)).get
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty
backend.release(lock)(c(1)) shouldBe true
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) should not be empty
val lock = dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)).get
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty
dbLock.release(lock)(c(1)) shouldBe true
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) should not be empty
}
it should "release successfully a shared lock if connection closed" in dbLockTestCase(2) { c =>
backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)).get
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)).get
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty
c(1).close()
eventually(timeout)(
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) should not be empty
)
}
it should "unlock successfully an exclusive lock" in dbLockTestCase(2) { c =>
val lock = backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)).get
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty
backend.release(lock)(c(1)) shouldBe true
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) should not be empty
val lock = dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)).get
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty
dbLock.release(lock)(c(1)) shouldBe true
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) should not be empty
}
it should "release successfully an exclusive lock if connection closed" in dbLockTestCase(2) {
c =>
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)).get
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)).get
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty
c(1).close()
eventually(timeout)(
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) should not be empty
)
}
it should "be able to lock exclusive, if all shared locks are released" in dbLockTestCase(4) {
c =>
val shared1 = backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)).get
val shared2 = backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(2)).get
val shared3 = backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(3)).get
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(4)) shouldBe empty
val shared1 = dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)).get
val shared2 = dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(2)).get
val shared3 = dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(3)).get
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(4)) shouldBe empty
backend.release(shared1)(c(1))
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(4)) shouldBe empty
backend.release(shared2)(c(2))
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(4)) shouldBe empty
backend.release(shared3)(c(3))
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(4)) should not be empty
dbLock.release(shared1)(c(1))
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(4)) shouldBe empty
dbLock.release(shared2)(c(2))
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(4)) shouldBe empty
dbLock.release(shared3)(c(3))
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(4)) should not be empty
}
it should "lock immediately, or fail immediately" in dbLockTestCase(2) { c =>
backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)) should not be empty
val start = System.currentTimeMillis()
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(2)) shouldBe empty
(System.currentTimeMillis() - start) should be < 500L
}
it should "fail to unlock lock held by others" in dbLockTestCase(2) { c =>
val lock = backend.tryAcquire(backend.lock(1), LockMode.Shared)(c(1)).get
backend.release(lock)(c(2)) shouldBe false
val lock = dbLock.tryAcquire(dbLock.lock(1), LockMode.Shared)(c(1)).get
dbLock.release(lock)(c(2)) shouldBe false
}
it should "fail to unlock lock which is not held by anyone" in dbLockTestCase(1) { c =>
backend.release(Lock(backend.lock(1), LockMode.Shared))(c(1)) shouldBe false
dbLock.release(Lock(dbLock.lock(1), LockMode.Shared))(c(1)) shouldBe false
}
it should "fail if attempt to use backend-foreign lock-id for locking" in dbLockTestCase(1) { c =>
Try(backend.tryAcquire(new LockId {}, LockMode.Shared)(c(1))).isFailure shouldBe true
Try(dbLock.tryAcquire(new LockId {}, LockMode.Shared)(c(1))).isFailure shouldBe true
}
it should "fail if attempt to use backend-foreign lock-id for un-locking" in dbLockTestCase(1) {
c =>
Try(backend.release(Lock(new LockId {}, LockMode.Shared))(c(1))).isFailure shouldBe true
Try(dbLock.release(Lock(new LockId {}, LockMode.Shared))(c(1))).isFailure shouldBe true
}
it should "lock successfully exclusive different locks" in dbLockTestCase(1) { c =>
backend.tryAcquire(backend.lock(1), LockMode.Exclusive)(c(1)) should not be empty
backend.tryAcquire(backend.lock(2), LockMode.Exclusive)(c(1)) should not be empty
backend.tryAcquire(backend.lock(3), LockMode.Exclusive)(c(1)) should not be empty
backend.tryAcquire(backend.lock(4), LockMode.Exclusive)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(1), LockMode.Exclusive)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(2), LockMode.Exclusive)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(3), LockMode.Exclusive)(c(1)) should not be empty
dbLock.tryAcquire(dbLock.lock(4), LockMode.Exclusive)(c(1)) should not be empty
}
private val timeout = Timeout(Span(10, Seconds))
@ -151,18 +152,28 @@ private[backend] trait StorageBackendTestsDBLock
private def dbLockTestCase(
numOfConnectionsNeeded: Int
)(test: List[Connection] => Any): Future[Assertion] = {
val dataSource = backend.createDataSource(jdbcUrl)
if (backend.dbLockSupported) {
if (dbLock.dbLockSupported) {
// prepending with null so we can refer to connections 1 based in tests
val connections = null :: List.fill(numOfConnectionsNeeded)(dataSource.getConnection)
val connections = null :: List.fill(numOfConnectionsNeeded)(getConnection)
val result = Try(test(connections))
connections.foreach(c => Try(c.close()))
Future.fromTry(result).map(_ => 1 shouldBe 1)
} else {
info(
s"This test makes sense only for StorageBackend which supports DB-Locks. For ${backend.getClass.getName} StorageBackend this test is disabled."
s"This test makes sense only for StorageBackend which supports DB-Locks. For ${dbLock.getClass.getName} StorageBackend this test is disabled."
)
Future.successful(1 shouldBe 1)
}
}
}
trait StorageBackendTestsDBLockForSuite
extends StorageBackendTestsDBLock
with StorageBackendProvider {
this: AsyncFlatSpec =>
override def dbLock: DBLockStorageBackend = backend
override def getConnection: Connection =
backend.createDataSource(jdbcUrl)(LoggingContext.ForTesting).getConnection
}

View File

@ -0,0 +1,856 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.indexer.ha
import java.sql.Connection
import java.util.Timer
import java.util.concurrent.atomic.AtomicInteger
import akka.stream.KillSwitch
import akka.testkit.TestProbe
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.store.backend.DBLockStorageBackend
import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Random, Try}
class HaCoordinatorSpec
extends AsyncFlatSpec
with Matchers
with AkkaBeforeAndAfterAll
with Eventually {
implicit val ec: ExecutionContext =
system.dispatcher // we need this to not use the default EC which is coming from AsyncTestSuite, and which is serial
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
private val logger = ContextualizedLogger.get(this.getClass)
private val timer = new Timer(true)
private val mainLockAcquireRetryMillis = 20L
private val workerLockAcquireRetryMillis = 20L
private val mainLockCheckerPeriodMillis = 20L
private val timeoutToleranceMillis =
500L // unfortunately this needs to be a insanely big tolerance, not to render the test flaky. under normal circumstances this should pass with +5 millis
private val mainLockId = 10
private val main = TestLockId(mainLockId)
private val workerLockId = 20
private val worker = TestLockId(workerLockId)
behavior of "databaseLockBasedHaCoordinator graceful shutdown"
it should "successfully propagate successful end of execution" in {
val protectedSetup = setup()
import protectedSetup._
for {
_ <- connectionInitializerFuture
_ = {
info("As HACoordinator is initialized")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle is not completed")
completeExecutionInitialization()
info("As protected execution initialization is finished")
executionCompletedPromise.success(())
info("As execution is completed")
}
_ <- protectedHandle.completed
} yield {
info("Protected Handle is completed successfully")
1 shouldBe 1
}
}
it should "successfully propagate failed end of execution" in {
val protectedSetup = setup()
import protectedSetup._
for {
_ <- connectionInitializerFuture
_ = {
info("As HACoordinator is initialized")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle is not completed")
completeExecutionInitialization()
info("As protected execution initialization is finished")
executionCompletedPromise.failure(new Exception("failed execution"))
info("As execution completes with failure")
}
ex <- protectedHandle.completed.failed
} yield {
info("Protected Handle is completed with failure")
ex.getMessage shouldBe "failed execution"
}
}
it should "propagate graceful shutdown to the protected execution" in {
val protectedSetup = setup()
import protectedSetup._
for {
connectionInitializer <- connectionInitializerFuture
_ = {
info("As HACoordinator is initialized")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle is not completed")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
completeExecutionInitialization()
info("As protected execution initialization is finished")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
protectedHandle.killSwitch.shutdown()
info("And as graceful shutdown started")
}
_ <- executionShutdownFuture
_ = {
info("Shutdown is observed at execution")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
executionAbortedFuture.isCompleted shouldBe false
info("Abort is not observed at execution")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle not completed")
Thread.sleep(200)
info("As waiting 200 millis")
protectedHandle.completed.isCompleted shouldBe false
info(
"Protected Handle is still not completed (hence it is waiting for execution to finish as the first step of the teardown process)"
)
executionCompletedPromise.success(())
info("As execution is completed")
}
_ <- protectedHandle.completed
} yield {
info("Protected Handle is completed successfully")
Try(connectionInitializer.initialize(new TestConnection)).isFailure shouldBe true
info("Connection initializer does not work anymore")
1 shouldBe 1
}
}
it should "propagate graceful shutdown to the protected execution if shutdown initiated before execution initialization is finished" in {
val protectedSetup = setup()
import protectedSetup._
for {
connectionInitializer <- connectionInitializerFuture
_ = {
info("As HACoordinator is initialized")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle is not completed")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
protectedHandle.killSwitch.shutdown()
info("As graceful shutdown started")
completeExecutionInitialization()
info("And As protected execution initialization is finished")
connectionInitializer.initialize(new TestConnection)
info(
"Connection initializer still works (release process first releases the execution, and only then the main connection and the poller)"
)
}
_ <- executionShutdownFuture
_ = {
info("Shutdown is observed at execution")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
executionAbortedFuture.isCompleted shouldBe false
info("Abort is not observed at execution")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle not completed")
executionCompletedPromise.success(())
info("As execution is completed")
}
_ <- protectedHandle.completed
} yield {
info("Protected Handle is completed successfully")
Try(connectionInitializer.initialize(new TestConnection)).isFailure shouldBe true
info("Connection initializer does not work anymore")
1 shouldBe 1
}
}
it should "swallow failures if graceful shutdown is underway" in {
val protectedSetup = setup()
import protectedSetup._
for {
connectionInitializer <- connectionInitializerFuture
_ = {
info("As HACoordinator is initialized")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle is not completed")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
completeExecutionInitialization()
info("As protected execution initialization is finished")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
protectedHandle.killSwitch.shutdown()
info("And as graceful shutdown started")
}
_ <- executionShutdownFuture
_ = {
info("Shutdown is observed at execution")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
executionAbortedFuture.isCompleted shouldBe false
info("Abort is not observed at execution")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle not completed")
executionCompletedPromise.failure(new Exception("some exception"))
info("As execution is completes with failure")
}
_ <- protectedHandle.completed
} yield {
info("Protected Handle is completed successfully")
Try(connectionInitializer.initialize(new TestConnection)).isFailure shouldBe true
info("Connection initializer does not work anymore")
1 shouldBe 1
}
}
behavior of "databaseLockBasedHaCoordinator initialization"
it should "fail if getting connection fails" in {
val protectedSetup =
setup(connectionFactory = () => throw new Exception("as getting connection"))
import protectedSetup._
for {
result <- protectedHandle.completed.failed
} yield {
info("Protected Handle is completed with failure")
result.getMessage shouldBe "as getting connection"
}
}
it should "wait if main lock cannot be acquired" in {
val dbLock = new TestDBLockStorageBackend
val blockingConnection = new TestConnection
val blockingLock =
dbLock.tryAcquire(main, DBLockStorageBackend.LockMode.Exclusive)(blockingConnection).get
info("As acquiring the main lock from the outside")
val protectedSetup = setup(dbLock = dbLock)
import protectedSetup._
info("As acquiring the main lock from the outside")
Thread.sleep(200)
info("And as waiting for 200 millis")
connectionInitializerFuture.isCompleted shouldBe false
protectedHandle.completed.isCompleted shouldBe false
info("Initialization should be waiting")
dbLock.release(blockingLock)(blockingConnection) shouldBe true
info("As releasing the blocking lock")
for {
_ <- connectionInitializerFuture
_ = {
info("Initialisation should completed successfully")
completeExecutionInitialization() // cleanup
executionCompletedPromise.success(()) // cleanup
}
_ <- protectedHandle.completed
} yield {
1 shouldBe 1
}
}
it should "wait for main lock can be interrupted by graceful shutdown" in {
val dbLock = new TestDBLockStorageBackend
val blockingConnection = new TestConnection
dbLock.tryAcquire(main, DBLockStorageBackend.LockMode.Exclusive)(blockingConnection).get
info("As acquiring the main lock from the outside")
val protectedSetup = setup(dbLock = dbLock)
import protectedSetup._
Thread.sleep(200)
info("And as waiting for 200 millis")
connectionInitializerFuture.isCompleted shouldBe false
protectedHandle.completed.isCompleted shouldBe false
info("Initialization should be waiting")
protectedHandle.killSwitch.shutdown()
info("As graceful shutdown started")
for {
_ <- protectedHandle.completed
} yield {
info("Protected Handle is completed successfully")
connectionInitializerFuture.isCompleted shouldBe false
}
}
it should "wait if worker lock cannot be acquired due to exclusive blocking" in {
val dbLock = new TestDBLockStorageBackend
val blockingConnection = new TestConnection
val blockingLock =
dbLock.tryAcquire(worker, DBLockStorageBackend.LockMode.Exclusive)(blockingConnection).get
info("As acquiring the worker lock from the outside")
val protectedSetup = setup(dbLock = dbLock)
import protectedSetup._
Thread.sleep(200)
info("And as waiting for 200 millis")
connectionInitializerFuture.isCompleted shouldBe false
protectedHandle.completed.isCompleted shouldBe false
info("Initialization should be waiting")
dbLock.release(blockingLock)(blockingConnection) shouldBe true
info("As releasing the blocking lock")
for {
_ <- connectionInitializerFuture
_ = {
info("Initialisation should completed successfully")
completeExecutionInitialization() // cleanup
executionCompletedPromise.success(()) // cleanup
}
_ <- protectedHandle.completed
} yield {
1 shouldBe 1
}
}
it should "wait if worker lock cannot be acquired due to shared blocking" in {
val dbLock = new TestDBLockStorageBackend
val blockingConnection = new TestConnection
val blockingLock =
dbLock.tryAcquire(worker, DBLockStorageBackend.LockMode.Shared)(blockingConnection).get
info("As acquiring the worker lock from the outside")
val protectedSetup = setup(dbLock = dbLock)
import protectedSetup._
Thread.sleep(200)
info("And as waiting for 200 millis")
connectionInitializerFuture.isCompleted shouldBe false
protectedHandle.completed.isCompleted shouldBe false
info("Initialization should be waiting")
dbLock.release(blockingLock)(blockingConnection) shouldBe true
info("As releasing the blocking lock")
for {
_ <- connectionInitializerFuture
_ = {
info("Initialisation should completed successfully")
completeExecutionInitialization() // cleanup
executionCompletedPromise.success(()) // cleanup
}
_ <- protectedHandle.completed
} yield {
1 shouldBe 1
}
}
it should "wait for worker lock can be interrupted by graceful shutdown" in {
val dbLock = new TestDBLockStorageBackend
val blockingConnection = new TestConnection
dbLock.tryAcquire(worker, DBLockStorageBackend.LockMode.Shared)(blockingConnection).get
info("As acquiring the worker lock from the outside")
val protectedSetup = setup(dbLock = dbLock)
import protectedSetup._
Thread.sleep(200)
info("And as waiting for 200 millis")
connectionInitializerFuture.isCompleted shouldBe false
protectedHandle.completed.isCompleted shouldBe false
info("Initialization should be waiting")
protectedHandle.killSwitch.shutdown()
info("As graceful shutdown starts")
for {
_ <- protectedHandle.completed
} yield {
info("Protected Handle completes successfully")
connectionInitializerFuture.isCompleted shouldBe false
}
}
it should "fail if worker lock cannot be acquired in time due to shared blocking" in {
val dbLock = new TestDBLockStorageBackend
val blockingConnection = new TestConnection
dbLock.tryAcquire(worker, DBLockStorageBackend.LockMode.Shared)(blockingConnection).get
info("As acquiring the worker lock from the outside")
val protectedSetup = setup(
dbLock = dbLock,
workerLockAcquireMaxRetry = 2,
)
import protectedSetup._
Thread.sleep(200)
info("And as waiting for 200 millis")
for {
failure <- protectedHandle.completed.failed
} yield {
info("Initialisation should completed with failure")
failure.getMessage shouldBe "Cannot acquire lock TestLockId(20) in lock-mode Exclusive: lock busy"
}
}
it should "fail if execution initialization fails" in {
val protectedSetup = setup()
import protectedSetup._
for {
_ <- connectionInitializerFuture
_ = {
info("As execution initialization starts")
failDuringExecutionInitialization(new Exception("failed as initializing"))
info("Initialization fails")
}
failure <- protectedHandle.completed.failed
} yield {
info("Protected execution fails with failure")
failure.getMessage shouldBe "failed as initializing"
}
}
behavior of "databaseLockBasedHaCoordinator main connection polling"
it should "successfully prevent further worker connection-spawning, and trigger shutdown in execution, if main lock cannot be acquired anymore, triggered by connection-spawning" in {
val protectedSetup = setup()
import protectedSetup._
for {
connectionInitializer <- connectionInitializerFuture
_ = {
info("As HACoordinator is initialized")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle is not completed")
completeExecutionInitialization()
info("As protected execution initialization is finished")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
Thread.sleep(200)
info("As waiting 200 millis")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle is still not completed")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer still works")
executionAbortedFuture.isCompleted shouldBe false
info("Execution is not aborted yet")
dbLock.cutExclusiveLockHoldingConnection(mainLockId)
info("As main connection is severed")
Try(connectionInitializer.initialize(new TestConnection)).isFailure shouldBe true
info("Connection initializer not working anymore")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle is still not completed")
}
abortException <- executionAbortedFuture
_ = {
info("Execution is aborted")
abortException.getMessage shouldBe "check failed, killSwitch aborted"
executionCompletedPromise.failure(
new Exception("execution failed due to abort coming from outside")
)
info("As execution fails")
}
failure <- protectedHandle.completed.failed
} yield {
info("Protected Handle is completed with failure")
failure.getMessage shouldBe "check failed, killSwitch aborted"
info("And completion failure is populated by check-failure")
1 shouldBe 1
}
}
it should "successfully prevent further worker connection-spawning, and trigger shutdown in execution, if main lock cannot be acquired anymore, triggered by timeout" in {
val protectedSetup = setup()
import protectedSetup._
for {
connectionInitializer <- connectionInitializerFuture
mainConnectionSeveredAtNanos = {
info("As HACoordinator is initialized")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle is not completed")
completeExecutionInitialization()
info("As protected execution initialization is finished")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
Thread.sleep(200)
info("As waiting 200 millis")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle is still not completed")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer still works")
protectedSetup.executionAbortedFuture.isCompleted shouldBe false
info("Execution is not aborted yet")
dbLock.cutExclusiveLockHoldingConnection(mainLockId)
info("As main connection is severed")
System.nanoTime()
}
abortException <- executionAbortedFuture
_ = {
info("Execution is aborted")
abortException.getMessage shouldBe "check failed, killSwitch aborted"
(System.nanoTime() - mainConnectionSeveredAtNanos) should be < ((
mainLockAcquireRetryMillis +
timeoutToleranceMillis
) * 1000L * 1000L)
info("Within polling time-bounds")
Try(connectionInitializer.initialize(new TestConnection)).isFailure shouldBe true
info("Connection initializer not working anymore")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle is still not completed")
executionCompletedPromise.failure(
new Exception("execution failed due to abort coming from outside")
)
info("As execution fails")
}
failure <- protectedHandle.completed.failed
} yield {
info("Protected Handle is completed with failure")
failure.getMessage shouldBe "check failed, killSwitch aborted"
info("And completion failure is populated by check-failure")
1 shouldBe 1
}
}
behavior of "databaseLockBasedHaCoordinator in multi-node setup"
it should "successfully protect execution, and respect upper bound threshold as switching over in a 5 node setup, without worker locking" in {
val dbLock = new TestDBLockStorageBackend
var nodes: Set[ProtectedSetup] = Set.empty
val nodeStartedExecutionProbe = TestProbe()
val nodeStoppedExecutionProbe = TestProbe()
val nodeHaltedProbe = TestProbe()
def addNode(): Unit = synchronized {
val node = setup(dbLock = dbLock)
node.connectionInitializerFuture
.foreach { _ =>
logger.info(s"execution started")
node.completeExecutionInitialization()
nodeStartedExecutionProbe.send(nodeStartedExecutionProbe.ref, "started")
}
node.executionShutdownFuture
.foreach { _ =>
node.executionCompletedPromise.success(())
nodeStoppedExecutionProbe.send(nodeStoppedExecutionProbe.ref, "stopped")
}
node.executionAbortedFuture
.foreach { t =>
node.executionCompletedPromise.failure(t)
nodeStoppedExecutionProbe.send(nodeStoppedExecutionProbe.ref, "stopped")
}
node.protectedHandle.completed
.onComplete { _ =>
removeNode(node)
nodeHaltedProbe.send(nodeHaltedProbe.ref, "halted")
}
nodes = nodes + node
info("As a node added")
}
def removeNode(node: ProtectedSetup): Unit = synchronized {
nodes = nodes - node
}
def verifyCleanSlate(expectedNumberOfNodes: Int): Unit = synchronized {
if (expectedNumberOfNodes == 0) {
nodes.size shouldBe 0
} else {
nodes.size shouldBe expectedNumberOfNodes
nodes.exists(_.protectedHandle.completed.isCompleted) shouldBe false
nodes.count(_.connectionInitializerFuture.isCompleted) shouldBe 1
val activeNode = nodes.find(_.connectionInitializerFuture.isCompleted).get
activeNode.executionAbortedFuture.isCompleted shouldBe false
activeNode.executionShutdownFuture.isCompleted shouldBe false
}
nodeStartedExecutionProbe.expectNoMessage(FiniteDuration(0, "seconds"))
nodeStoppedExecutionProbe.expectNoMessage(FiniteDuration(0, "seconds"))
nodeHaltedProbe.expectNoMessage(FiniteDuration(0, "seconds"))
info("Cluster is in expected shape")
}
def wait(): Unit = {
val waitMillis: Long = Random.nextInt(100).toLong
Thread.sleep(waitMillis)
info(s"As waiting $waitMillis millis")
}
addNode()
nodeStartedExecutionProbe.expectMsg("started")
info("The first node started execution")
verifyCleanSlate(1)
addNode()
verifyCleanSlate(2)
addNode()
verifyCleanSlate(3)
addNode()
verifyCleanSlate(4)
addNode()
verifyCleanSlate(5)
info("As adding 5 nodes")
wait()
verifyCleanSlate(5)
info("Cluster stabilized")
for (_ <- 1 to 30) {
wait()
verifyCleanSlate(5)
dbLock.cutExclusiveLockHoldingConnection(mainLockId)
logger.info(s"main lock force-released")
val mainConnCutNanos = System.nanoTime()
info(
"As active node looses main connection (and main index lock is available for acquisition again)"
)
nodeStartedExecutionProbe.expectMsg("started")
(System.nanoTime() - mainConnCutNanos) should be < ((
mainLockAcquireRetryMillis +
timeoutToleranceMillis
) * 1000L * 1000L)
info("Some other node started execution within time bounds")
nodeStoppedExecutionProbe.expectMsg("stopped")
nodeHaltedProbe.expectMsg("halted")
info("And originally active node stopped")
verifyCleanSlate(4)
addNode()
verifyCleanSlate(5)
}
def tearDown(): Unit = {
dbLock.cutExclusiveLockHoldingConnection(mainLockId)
nodeStartedExecutionProbe.expectMsg("started")
nodeStoppedExecutionProbe.expectMsg("stopped")
nodeHaltedProbe.expectMsg("halted")
()
}
tearDown()
verifyCleanSlate(4)
tearDown()
verifyCleanSlate(3)
tearDown()
verifyCleanSlate(2)
tearDown()
verifyCleanSlate(1)
dbLock.cutExclusiveLockHoldingConnection(mainLockId)
nodeStoppedExecutionProbe.expectMsg("stopped")
nodeHaltedProbe.expectMsg("halted")
verifyCleanSlate(0)
Future.successful(1 shouldBe 1)
}
it should "successfully protect execution, and respect upper bound threshold as switching over in a 5 node setup, with worker connection locking: loosing only indexer lock" in {
val dbLock = new TestDBLockStorageBackend
val keepUsingWorkerAfterShutdownMillis = 100L
var nodes: Set[ProtectedSetup] = Set.empty
val nodeStartedExecutionProbe = TestProbe()
val nodeStoppedExecutionProbe = TestProbe()
val nodeHaltedProbe = TestProbe()
val concurrentWorkers = new AtomicInteger(0)
def addNode(): Unit = synchronized {
val node = setup(dbLock = dbLock)
val workerConnection = new TestConnection
node.connectionInitializerFuture
.foreach { connectionInitializer =>
logger.info(s"execution started")
node.completeExecutionInitialization()
connectionInitializer.initialize(workerConnection)
concurrentWorkers.incrementAndGet()
nodeStartedExecutionProbe.send(nodeStartedExecutionProbe.ref, "started")
}
node.executionShutdownFuture
.foreach { _ =>
Thread.sleep(keepUsingWorkerAfterShutdownMillis)
concurrentWorkers.decrementAndGet()
dbLock.release(DBLockStorageBackend.Lock(worker, DBLockStorageBackend.LockMode.Shared))(
workerConnection
)
node.executionCompletedPromise.success(())
nodeStoppedExecutionProbe.send(nodeStoppedExecutionProbe.ref, "stopped")
}
node.executionAbortedFuture
.foreach { t =>
Thread.sleep(keepUsingWorkerAfterShutdownMillis)
concurrentWorkers.decrementAndGet()
dbLock.release(DBLockStorageBackend.Lock(worker, DBLockStorageBackend.LockMode.Shared))(
workerConnection
)
node.executionCompletedPromise.failure(t)
nodeStoppedExecutionProbe.send(nodeStoppedExecutionProbe.ref, "stopped")
}
node.protectedHandle.completed
.onComplete { _ =>
removeNode(node)
nodeHaltedProbe.send(nodeHaltedProbe.ref, "halted")
}
nodes = nodes + node
info("As a node added")
}
def removeNode(node: ProtectedSetup): Unit = synchronized {
nodes = nodes - node
}
def verifyCleanSlate(expectedNumberOfNodes: Int): Unit = synchronized {
if (expectedNumberOfNodes == 0) {
nodes.size shouldBe 0
} else {
concurrentWorkers.get() shouldBe 1
nodes.size shouldBe expectedNumberOfNodes
nodes.exists(_.protectedHandle.completed.isCompleted) shouldBe false
nodes.count(_.connectionInitializerFuture.isCompleted) shouldBe 1
val activeNode = nodes.find(_.connectionInitializerFuture.isCompleted).get
activeNode.executionAbortedFuture.isCompleted shouldBe false
activeNode.executionShutdownFuture.isCompleted shouldBe false
}
nodeStartedExecutionProbe.expectNoMessage(FiniteDuration(0, "seconds"))
nodeStoppedExecutionProbe.expectNoMessage(FiniteDuration(0, "seconds"))
nodeHaltedProbe.expectNoMessage(FiniteDuration(0, "seconds"))
info("Cluster is in expected shape")
}
def wait(): Unit = {
val waitMillis: Long = Random.nextInt(100).toLong
Thread.sleep(waitMillis)
info(s"As waiting $waitMillis millis")
}
addNode()
nodeStartedExecutionProbe.expectMsg("started")
info("The first node started execution")
verifyCleanSlate(1)
addNode()
verifyCleanSlate(2)
addNode()
verifyCleanSlate(3)
addNode()
verifyCleanSlate(4)
addNode()
verifyCleanSlate(5)
info("As adding 5 nodes")
wait()
verifyCleanSlate(5)
info("Cluster stabilized")
for (_ <- 1 to 30) {
wait()
verifyCleanSlate(5)
dbLock.cutExclusiveLockHoldingConnection(mainLockId)
logger.info(s"main lock force-released")
val mainConnCutNanos = System.nanoTime()
info("As active node looses main connection (and with time drops worker connection as well)")
nodeStartedExecutionProbe.expectMsg("started")
(System.nanoTime() - mainConnCutNanos) should be < ((
mainLockCheckerPeriodMillis + // first active node has to realize that it lost the lock
keepUsingWorkerAfterShutdownMillis + // then it is shutting down, but it will take this long to release the worker lock as well
workerLockAcquireRetryMillis + // by the time of here the new active node already acquired the main lock, and it is polling for the worker lock, so maximum so much time we need to wait
timeoutToleranceMillis
) * 1000L * 1000L)
info("Some other node started execution within time bounds")
nodeStoppedExecutionProbe.expectMsg("stopped")
nodeHaltedProbe.expectMsg("halted")
info("And originally active node stopped")
verifyCleanSlate(4)
addNode()
verifyCleanSlate(5)
}
def tearDown(): Unit = {
dbLock.cutExclusiveLockHoldingConnection(mainLockId)
nodeStartedExecutionProbe.expectMsg("started")
nodeStoppedExecutionProbe.expectMsg("stopped")
nodeHaltedProbe.expectMsg("halted")
()
}
tearDown()
verifyCleanSlate(4)
tearDown()
verifyCleanSlate(3)
tearDown()
verifyCleanSlate(2)
tearDown()
verifyCleanSlate(1)
dbLock.cutExclusiveLockHoldingConnection(mainLockId)
nodeStoppedExecutionProbe.expectMsg("stopped")
nodeHaltedProbe.expectMsg("halted")
verifyCleanSlate(0)
Future.successful(1 shouldBe 1)
}
private def setup(
connectionFactory: () => Connection = () => new TestConnection,
workerLockAcquireMaxRetry: Long = 100,
dbLock: TestDBLockStorageBackend = new TestDBLockStorageBackend,
): ProtectedSetup = {
val connectionInitializerPromise = Promise[ConnectionInitializer]()
val executionHandlePromise = Promise[Unit]()
val shutdownPromise = Promise[Unit]()
val abortPromise = Promise[Throwable]()
val completedPromise = Promise[Unit]()
val protectedHandle = HaCoordinator
.databaseLockBasedHaCoordinator(
connectionFactory = connectionFactory,
storageBackend = dbLock,
executionContext = system.dispatcher,
timer = timer,
haConfig = HaConfig(
enable = true,
mainLockAcquireRetryMillis = mainLockAcquireRetryMillis,
workerLockAcquireRetryMillis = workerLockAcquireRetryMillis,
workerLockAcquireMaxRetry = workerLockAcquireMaxRetry,
mainLockCheckerPeriodMillis = mainLockCheckerPeriodMillis,
indexerLockId = 10,
indexerWorkerLockId = 20,
),
)
.protectedExecution { connectionInitializer =>
connectionInitializerPromise.success(connectionInitializer)
executionHandlePromise.future.map(_ =>
Handle(
killSwitch = new KillSwitch {
override def shutdown(): Unit = shutdownPromise.success(())
override def abort(ex: Throwable): Unit = abortPromise.success(ex)
},
completed = completedPromise.future,
)
)
}
ProtectedSetup(
protectedHandle = protectedHandle,
connectionInitializerFuture = connectionInitializerPromise.future,
executionHandlePromise = executionHandlePromise,
executionShutdownFuture = shutdownPromise.future,
executionAbortedFuture = abortPromise.future,
executionCompletedPromise = completedPromise,
dbLock = dbLock,
)
}
case class ProtectedSetup(
protectedHandle: Handle, // the protected Handle to observe and interact with
connectionInitializerFuture: Future[
ConnectionInitializer
], // observe ConnectionInitializer, this completes as HA arrives to the stage when execution initialization starts
executionHandlePromise: Promise[Unit], // trigger end of execution initialization
executionShutdownFuture: Future[Unit], // observe shutdown in execution
executionAbortedFuture: Future[Throwable], // observe abort in execution
executionCompletedPromise: Promise[Unit], // trigger completion of execution
dbLock: TestDBLockStorageBackend, // the lock backend
) {
/** trigger end of execution initialization */
def completeExecutionInitialization(): Unit = {
executionHandlePromise.success(())
}
/** simulate a failure during execution initialization */
def failDuringExecutionInitialization(cause: Throwable): Unit = {
executionHandlePromise.failure(cause)
}
}
}

View File

@ -0,0 +1,26 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.indexer.ha
import java.sql.Connection
import com.daml.platform.store.backend.{DBLockStorageBackend, StorageBackendTestsDBLock}
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AsyncFlatSpec
class TestDBLockStorageBackendSpec
extends AsyncFlatSpec
with StorageBackendTestsDBLock
with BeforeAndAfter {
private var _dbLock: TestDBLockStorageBackend = _
before {
_dbLock = new TestDBLockStorageBackend
}
override def dbLock: DBLockStorageBackend = _dbLock
override def getConnection: Connection = new TestConnection
}