Fix akka-streams shutdown at LedgerConfigurationSubscriptionFromIndex (#11462)

changelog_begin
changelog_end
This commit is contained in:
Marton Nagy 2021-10-29 13:57:13 +02:00 committed by GitHub
parent e89da7c1ed
commit 379996348f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -57,11 +57,7 @@ private[apiserver] final class LedgerConfigurationSubscriptionFromIndex(
.map { startingConfiguration =>
new Subscription(startingConfiguration, configurationLoadTimeout)
}(context.executionContext)
)(subscription =>
Future {
subscription.stop()
}(context.executionContext)
)
)(subscription => subscription.stop())
}
private final class Subscription(
@ -101,7 +97,7 @@ private[apiserver] final class LedgerConfigurationSubscriptionFromIndex(
(new AtomicReference[State](state), scheduledTimeout)
}
private val killSwitch = RestartSource
private val (killSwitch, completionFuture) = RestartSource
.withBackoff(
RestartSettings(
minBackoff = 1.seconds,
@ -131,7 +127,7 @@ private[apiserver] final class LedgerConfigurationSubscriptionFromIndex(
}
}
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
.toMat(Sink.ignore)(Keep.left[UniqueKillSwitch, Future[Done]])
.toMat(Sink.ignore)(Keep.both[UniqueKillSwitch, Future[Done]])
.run()(materializer)
override def ready: Future[Unit] = readyPromise.future
@ -139,9 +135,10 @@ private[apiserver] final class LedgerConfigurationSubscriptionFromIndex(
override def latestConfiguration(): Option[Configuration] =
currentState.get.latestConfiguration
def stop(): Unit = {
def stop(): Future[Unit] = {
scheduledTimeout.cancel()
killSwitch.shutdown()
completionFuture.map(_ => ())(servicesExecutionContext)
}
}
}