Fix Parallel Indexer initialization issue [DPP-542] (#10889)

RCA: if at parallel indexer initialization some error happening, then a promise never completes, which causes an initialization future never complete
Expected: failure should be propagated, and recovering indexer should retry 10 seconds later
Actual: failure not propagated, a zombie future freezes initialization, preventing retries
Impact: this is a corner case - if no problems at indexer initialization, the issues is not surfacing

* Extracts critical logic into helper function initializeHandle
* Adds unit tests for initializeHandle
* Fixes issue by completing the promise in all cases

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Marton Nagy 2021-09-15 13:12:17 +02:00 committed by GitHub
parent b3e4975795
commit 0c32e3baff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 382 additions and 46 deletions

View File

@ -75,56 +75,81 @@ object ParallelIndexerFactory {
)
else
ResourceOwner.successful(NoopHaCoordinator)
} yield toIndexer { resourceContext =>
implicit val rc: ResourceContext = resourceContext
} yield toIndexer { implicit resourceContext =>
implicit val ec: ExecutionContext = resourceContext.executionContext
haCoordinator.protectedExecution { connectionInitializer =>
val killSwitchPromise = Promise[KillSwitch]()
val completionFuture = DbDispatcher
.owner(
// this is the DataSource which will be wrapped by HikariCP, and which will drive the ingestion
// therefore this needs to be configured with the connection-init-hook, what we get from HaCoordinator
dataSource = storageBackend.createDataSource(
jdbcUrl = jdbcUrl,
dataSourceConfig = dataSourceConfig,
connectionInitHook = Some(connectionInitializer.initialize),
),
serverRole = ServerRole.Indexer,
connectionPoolSize = ingestionParallelism + 1, // + 1 for the tailing ledger_end updates
connectionTimeout = FiniteDuration(
250,
"millis",
), // 250 millis is the lowest possible value for this Hikari configuration (see HikariConfig JavaDoc)
metrics = metrics,
)
.use { dbDispatcher =>
initializeParallelIngestion(
haCoordinator.protectedExecution(connectionInitializer =>
initializeHandle(
DbDispatcher
.owner(
// this is the DataSource which will be wrapped by HikariCP, and which will drive the ingestion
// therefore this needs to be configured with the connection-init-hook, what we get from HaCoordinator
dataSource = storageBackend.createDataSource(
jdbcUrl = jdbcUrl,
dataSourceConfig = dataSourceConfig,
connectionInitHook = Some(connectionInitializer.initialize),
),
serverRole = ServerRole.Indexer,
connectionPoolSize =
ingestionParallelism + 1, // + 1 for the tailing ledger_end updates
connectionTimeout = FiniteDuration(
250,
"millis",
), // 250 millis is the lowest possible value for this Hikari configuration (see HikariConfig JavaDoc)
metrics = metrics,
)
) { dbDispatcher =>
initializeParallelIngestion(
dbDispatcher = dbDispatcher,
readService = readService,
ec = ec,
mat = mat,
).map(
parallelIndexerSubscription(
inputMapperExecutor = inputMapperExecutor,
batcherExecutor = batcherExecutor,
dbDispatcher = dbDispatcher,
readService = readService,
ec = ec,
mat = mat,
).map(
parallelIndexerSubscription(
inputMapperExecutor = inputMapperExecutor,
batcherExecutor = batcherExecutor,
dbDispatcher = dbDispatcher,
materializer = mat,
)
).andThen {
// the tricky bit:
// the future in the completion handler will be this one
// but the future for signaling for the HaCoordinator, that the protected execution is initialized, needs to complete precisely here
case Success(handle) => killSwitchPromise.success(handle.killSwitch)
case Failure(ex) => killSwitchPromise.failure(ex)
}.flatMap(_.completed)
}
killSwitchPromise.future
.map(Handle(completionFuture.map(_ => ()), _))
}
materializer = mat,
)
)
}
)
}
/** Helper function to combine a ResourceOwner and an initialization function to initialize a Handle.
*
* @param owner A ResourceOwner which needs to be used to spawn a resource needed by initHandle
* @param initHandle Asynchronous initialization function to create a Handle
* @return A Future of a Handle where Future encapsulates initialization (as completed initialization completed)
*/
def initializeHandle[T](
owner: ResourceOwner[T]
)(initHandle: T => Future[Handle])(implicit rc: ResourceContext): Future[Handle] = {
implicit val ec: ExecutionContext = rc.executionContext
val killSwitchPromise = Promise[KillSwitch]()
val completed = owner
.use(resource =>
initHandle(resource)
.andThen {
// the tricky bit:
// the future in the completion handler will be this one
// but the future for signaling completion of initialization (the Future of the result), needs to complete precisely here
case Success(handle) => killSwitchPromise.success(handle.killSwitch)
}
.flatMap(_.completed)
)
.andThen {
// if error happens:
// - at Resource initialization (inside ResourceOwner.acquire()): result should complete with a Failure
// - at initHandle: result should complete with a Failure
// - at the execution spawned by initHandle (represented by the result Handle's complete): result should be with a success
// In the last case it is already finished the promise with a success, and this tryFailure will not succeed (returning false).
// In the other two cases the promise was not completed, and we complete here successfully with a failure.
case Failure(ex) => killSwitchPromise.tryFailure(ex)
}
killSwitchPromise.future
.map(Handle(completed, _))
}
def toIndexer(subscription: ResourceContext => Handle): Indexer =
new Indexer {
override def acquire()(implicit context: ResourceContext): Resource[Future[Unit]] = {

View File

@ -0,0 +1,311 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.indexer.parallel
import akka.stream.KillSwitch
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.platform.indexer.ha.Handle
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import scala.concurrent.{ExecutionContext, Future, Promise}
class ParallelIndexerFactorySpec extends AsyncFlatSpec with Matchers with AkkaBeforeAndAfterAll {
// AsyncFlatSpec is with serial execution context
private implicit val ec: ExecutionContext = system.dispatcher
behavior of "initializeHandle"
it should "correctly chain initializations and teardown-steps in the happy path" in {
val t = test
import t._
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleStarted.isCompleted shouldBe false
waitALittle()
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleStarted.isCompleted shouldBe false
resourceInitPromise.success("happy")
val completePromise = Promise[Unit]()
for {
s <- initHandleStarted
_ = {
waitALittle()
s shouldBe "happy"
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleFinished.success(Handle(completePromise.future, SomeKillSwitch))
}
handle <- initialized
_ = {
waitALittle()
resourceReleasing.isCompleted shouldBe false
handle.completed.isCompleted shouldBe false
handle.killSwitch shouldBe SomeKillSwitch
completePromise.success(())
}
_ <- resourceReleasing
_ = {
waitALittle()
handle.completed.isCompleted shouldBe false
resourceReleased.success(())
}
_ <- handle.completed
} yield {
1 shouldBe 1
}
}
it should "propagate error from releasing resource" in {
val t = test
import t._
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleStarted.isCompleted shouldBe false
waitALittle()
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleStarted.isCompleted shouldBe false
resourceInitPromise.success("happy")
val completePromise = Promise[Unit]()
for {
s <- initHandleStarted
_ = {
waitALittle()
s shouldBe "happy"
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleFinished.success(Handle(completePromise.future, SomeKillSwitch))
}
handle <- initialized
_ = {
waitALittle()
resourceReleasing.isCompleted shouldBe false
handle.completed.isCompleted shouldBe false
handle.killSwitch shouldBe SomeKillSwitch
completePromise.success(())
}
_ <- resourceReleasing
_ = {
waitALittle()
handle.completed.isCompleted shouldBe false
resourceReleased.failure(new Exception("releasing resource failed"))
}
failure <- handle.completed.failed
} yield {
failure.getMessage shouldBe "releasing resource failed"
}
}
it should "propagate failure from resource initialization" in {
val t = test
import t._
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleStarted.isCompleted shouldBe false
waitALittle()
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleStarted.isCompleted shouldBe false
resourceInitPromise.failure(new Exception("resource init failed"))
for {
failure <- initialized.failed
} yield {
waitALittle()
failure.getMessage shouldBe "resource init failed"
initHandleStarted.isCompleted shouldBe false
resourceReleasing.isCompleted shouldBe false
}
}
it should "propagate failure from handle initialization, complete only after releasing resource" in {
val t = test
import t._
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleStarted.isCompleted shouldBe false
waitALittle()
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleStarted.isCompleted shouldBe false
resourceInitPromise.success("happy")
for {
s <- initHandleStarted
_ = {
waitALittle()
s shouldBe "happy"
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleFinished.failure(new Exception("handle initialization failed"))
}
_ <- resourceReleasing
_ = {
waitALittle()
initialized.isCompleted shouldBe false
resourceReleased.success(())
}
failure <- initialized.failed
} yield {
failure.getMessage shouldBe "handle initialization failed"
}
}
it should "propagate failure from handle initialization, complete only after releasing resource, even if releasing failed" in {
val t = test
import t._
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleStarted.isCompleted shouldBe false
waitALittle()
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleStarted.isCompleted shouldBe false
resourceInitPromise.success("happy")
for {
s <- initHandleStarted
_ = {
waitALittle()
s shouldBe "happy"
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleFinished.failure(new Exception("handle initialization failed"))
}
_ <- resourceReleasing
_ = {
waitALittle()
initialized.isCompleted shouldBe false
resourceReleased.failure(new Exception("releasing resource failed"))
}
failure <- initialized.failed
} yield {
failure.getMessage shouldBe "releasing resource failed"
}
}
it should "propagate failure from completion, but only after releasing resource finished" in {
val t = test
import t._
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleStarted.isCompleted shouldBe false
waitALittle()
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleStarted.isCompleted shouldBe false
resourceInitPromise.success("happy")
val completePromise = Promise[Unit]()
for {
s <- initHandleStarted
_ = {
waitALittle()
s shouldBe "happy"
resourceReleasing.isCompleted shouldBe false
initialized.isCompleted shouldBe false
initHandleFinished.success(Handle(completePromise.future, SomeKillSwitch))
}
handle <- initialized
_ = {
waitALittle()
resourceReleasing.isCompleted shouldBe false
handle.completed.isCompleted shouldBe false
handle.killSwitch shouldBe SomeKillSwitch
completePromise.failure(new Exception("completion failed"))
}
_ <- resourceReleasing
_ = {
waitALittle()
handle.completed.isCompleted shouldBe false
resourceReleased.success(())
}
failure <- handle.completed.failed
} yield {
failure.getMessage shouldBe "completion failed"
}
}
def test: TestHandle = {
val resourceInitPromise = Promise[String]()
val resourceReleasing = Promise[Unit]()
val resourceReleased = Promise[Unit]()
val initHandleStarted = Promise[String]()
val initHandleFinished = Promise[Handle]()
val result = ParallelIndexerFactory.initializeHandle(
new ResourceOwner[String] {
override def acquire()(implicit context: ResourceContext): Resource[String] =
Resource(
resourceInitPromise.future
) { _ =>
resourceReleasing.success(())
resourceReleased.future
}
}
) { s =>
initHandleStarted.success(s)
initHandleFinished.future
}(ResourceContext(implicitly))
TestHandle(
resourceInitPromise = resourceInitPromise,
initHandleStarted = initHandleStarted.future,
initHandleFinished = initHandleFinished,
initialized = result,
resourceReleasing = resourceReleasing.future,
resourceReleased = resourceReleased,
)
}
// Motivation: if we are expecting a stabilized state of the async system, but it would be not stable yet, then let's wait a little bit, so we give a chance to the system to stabilize, so we can observe our expectations fail
private def waitALittle(): Unit = Thread.sleep(10)
case class TestHandle(
resourceInitPromise: Promise[String],
initHandleStarted: Future[String],
initHandleFinished: Promise[Handle],
initialized: Future[Handle],
resourceReleasing: Future[Unit],
resourceReleased: Promise[Unit],
)
object SomeKillSwitch extends KillSwitch {
override def shutdown(): Unit = ()
override def abort(ex: Throwable): Unit = ()
}
}