participant-integration-api: Split LedgerConfigProvider into two, and add test cases. [KVL-1046] (#10455)

* participant-integration-api: Split `LedgerConfigProvider` into two.

We now have a read-only `CurrentLedgerConfiguration`, and a write-only
`LedgerConfigProvisioner`.

CHANGELOG_BEGIN
CHANGELOG_END

* participant-integration-api: Improve logging in LedgerConfigProvisioner.

Making use of the logging context.

* participant-integration-api: Extract a CurrentLedgerConfiguration trait.

This highlighted a few issues in the tests where we were unnecessarily
calling methods.

* participant-integration-api: Split LedgerConfigProviderSpec.

We can test the two components individually now.

* participant-integration-api: Add a test for streaming the config.

* participant-integration-api: Add tests for provisioning configuration.

* participant-integration-api: Move configuration code into a new package.

* participant-integration-api: Fake ledger config in tests.

Now that there's an interface, we can use it without worrying about
streaming from the index.

* participant-integration-api: Use `List`, not `Seq`, in test fixtures.

Scala 2.12 thinks `Seq.apply` yields a possibly-mutable sequence, which
is incompatible with `Source.apply`.

* participant-integration-api: Explain how `LedgerConfigProvider` works.
This commit is contained in:
Samir Talwar 2021-08-02 18:07:35 +02:00 committed by GitHub
parent 5157ad6df3
commit db7728ad4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 638 additions and 419 deletions

View File

@ -21,6 +21,7 @@ import com.daml.lf.data.Ref
import com.daml.lf.engine._
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.apiserver.configuration.{CurrentLedgerConfiguration, LedgerConfigProvider}
import com.daml.platform.apiserver.execution.{
LedgerTimeAwareCommandExecutor,
StoreBackedCommandExecutor,
@ -107,7 +108,7 @@ private[daml] object ApiServices {
for {
ledgerId <- Resource.fromFuture(indexService.getLedgerId())
ledgerConfigProvider <- LedgerConfigProvider
currentLedgerConfiguration <- LedgerConfigProvider
.owner(
indexService,
optWriteService,
@ -116,7 +117,7 @@ private[daml] object ApiServices {
)(materializer, loggingContext)
.acquire()
services <- Resource(
Future(createServices(ledgerId, ledgerConfigProvider)(servicesExecutionContext))
Future(createServices(ledgerId, currentLedgerConfiguration)(servicesExecutionContext))
)(services =>
Future {
services.foreach {
@ -130,7 +131,7 @@ private[daml] object ApiServices {
private def createServices(
ledgerId: LedgerId,
ledgerConfigProvider: LedgerConfigProvider,
currentLedgerConfiguration: CurrentLedgerConfiguration,
)(implicit executionContext: ExecutionContext): List[BindableService] = {
val apiTransactionService =
ApiTransactionService.create(ledgerId, transactionsService, metrics)
@ -160,7 +161,7 @@ private[daml] object ApiServices {
val writeServiceBackedApiServices =
intitializeWriteServiceBackedApiServices(
ledgerId,
ledgerConfigProvider,
currentLedgerConfiguration,
apiCompletionService,
apiTransactionService,
)
@ -186,7 +187,7 @@ private[daml] object ApiServices {
private def intitializeWriteServiceBackedApiServices(
ledgerId: LedgerId,
ledgerConfigProvider: LedgerConfigProvider,
currentLedgerConfiguration: CurrentLedgerConfiguration,
apiCompletionService: GrpcCommandCompletionService,
apiTransactionService: GrpcTransactionService,
)(implicit executionContext: ExecutionContext): List[BindableService] = {
@ -214,7 +215,7 @@ private[daml] object ApiServices {
partyManagementService,
timeProvider,
timeProviderType,
ledgerConfigProvider,
currentLedgerConfiguration,
seedService,
commandExecutor,
ApiSubmissionService.Configuration(
@ -243,7 +244,7 @@ private[daml] object ApiServices {
apiTransactionService.getFlatTransactionById,
),
timeProvider,
ledgerConfigProvider,
currentLedgerConfiguration,
metrics,
)

View File

@ -0,0 +1,15 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver.configuration
import com.daml.ledger.configuration.Configuration
/** Makes the current ledger configuration available in a centralized place. */
trait CurrentLedgerConfiguration {
/** The latest configuration found so far. There may be a delay between an update to the ledger
* configuration and that configuration becoming available through this method.
*/
def latestConfiguration: Option[Configuration]
}

View File

@ -1,45 +1,37 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver.services
package com.daml.platform.apiserver.configuration
import java.util.UUID
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.concurrent.atomic.AtomicReference
import akka.stream.scaladsl.{Keep, RestartSource, Sink}
import akka.stream.{KillSwitches, Materializer, RestartSettings, UniqueKillSwitch}
import akka.{Done, NotUsed}
import com.daml.api.util.TimeProvider
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.LedgerOffset
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.index.v2.IndexConfigManagementService
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.ledger.resources.ResourceOwner
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.configuration.LedgerConfiguration
import com.daml.telemetry.{DefaultTelemetry, SpanKind, SpanName}
import scala.compat.java8.FutureConverters
import scala.concurrent.duration.{DurationInt, DurationLong}
import scala.concurrent.{ExecutionContext, Future, Promise}
/** Subscribes to ledger configuration updates coming from the index,
* and makes the latest ledger configuration available to consumers.
/** Subscribes to ledger configuration updates coming from the index, and makes the latest ledger
* configuration available to consumers.
*
* This class helps avoiding code duplication and limiting the number of
* database lookups, as multiple services and validators require the latest ledger config.
* This class helps avoiding code duplication and limiting the number of database lookups, as
* multiple services and validators require the latest ledger config.
*/
private[apiserver] final class LedgerConfigProvider private (
private[apiserver] final class IndexStreamingCurrentLedgerConfiguration private (
index: IndexConfigManagementService,
optWriteService: Option[state.WriteConfigService],
timeProvider: TimeProvider,
config: LedgerConfiguration,
ledgerConfiguration: LedgerConfiguration,
materializer: Materializer,
)(implicit loggingContext: LoggingContext)
extends AutoCloseable {
extends CurrentLedgerConfiguration
with AutoCloseable {
private[this] val logger = ContextualizedLogger.get(this.getClass)
@ -47,7 +39,6 @@ private[apiserver] final class LedgerConfigProvider private (
private[this] type StateType = (Option[LedgerOffset.Absolute], Option[Configuration])
private[this] val latestConfigurationState: AtomicReference[StateType] =
new AtomicReference(None -> None)
private[this] val closed: AtomicBoolean = new AtomicBoolean(false)
private[this] val killSwitch: AtomicReference[Option[UniqueKillSwitch]] =
new AtomicReference(None)
private[this] val readyPromise: Promise[Unit] = Promise()
@ -58,25 +49,16 @@ private[apiserver] final class LedgerConfigProvider private (
// - Submit the initial config if none is found after a delay
startLoading()
materializer.scheduleOnce(
config.configurationLoadTimeout.toNanos.nanos,
ledgerConfiguration.configurationLoadTimeout.toNanos.nanos,
() => {
if (readyPromise.trySuccess(())) {
logger.warn(
s"No ledger configuration found after ${config.configurationLoadTimeout}. The ledger API server will now start but all services that depend on the ledger configuration will return UNAVAILABLE until at least one ledger configuration is found."
s"No ledger configuration found after ${ledgerConfiguration.configurationLoadTimeout}. The ledger API server will now start but all services that depend on the ledger configuration will return UNAVAILABLE until at least one ledger configuration is found."
)
}
()
},
)
optWriteService.foreach { writeService =>
materializer.scheduleOnce(
config.initialConfigurationSubmitDelay.toNanos.nanos,
() => {
if (latestConfiguration.isEmpty && !closed.get) submitInitialConfig(writeService)
()
},
)
}
// Looks up the latest ledger configuration, then subscribes to a
// stream of configuration changes.
@ -141,71 +123,31 @@ private[apiserver] final class LedgerConfigProvider private (
()
}
private[this] def submitInitialConfig(writeService: state.WriteConfigService): Future[Unit] = {
implicit val executionContext: ExecutionContext = materializer.executionContext
// There are several reasons why the change could be rejected:
// - The participant is not authorized to set the configuration
// - There already is a configuration, it just didn't appear in the index yet
// This method therefore does not try to re-submit the initial configuration in case of failure.
val submissionId = Ref.SubmissionId.assertFromString(UUID.randomUUID.toString)
logger.info(s"No ledger configuration found, submitting an initial configuration $submissionId")
DefaultTelemetry.runFutureInSpan(
SpanName.LedgerConfigProviderInitialConfig,
SpanKind.Internal,
) { implicit telemetryContext =>
FutureConverters
.toScala(
writeService.submitConfiguration(
Timestamp.assertFromInstant(timeProvider.getCurrentTime.plusSeconds(60)),
submissionId,
config.initialConfiguration,
)
)
.map {
case state.SubmissionResult.Acknowledged =>
logger.info(s"Initial configuration submission $submissionId was successful")
case result: state.SubmissionResult.SynchronousError =>
logger.warn(
s"Initial configuration submission $submissionId failed. Code: ${result.status.getCode}, Reason: ${result.description}"
)
}
}
}
/** The latest configuration found so far.
* This may not be the currently active ledger configuration, e.g., if the index is lagging behind the ledger.
*/
def latestConfiguration: Option[Configuration] = latestConfigurationState.get._2
/** Completes:
* - when some ledger configuration was found
* - after [[com.daml.platform.configuration.LedgerConfiguration.configurationLoadTimeout]]
* , whichever happens first.
/** This future will resolve successfully:
*
* - when a ledger configuration is found, or
* - after [[LedgerConfiguration.configurationLoadTimeout]].
*
* Whichever is first.
*/
def ready: Future[Unit] = readyPromise.future
override def latestConfiguration: Option[Configuration] = latestConfigurationState.get._2
override def close(): Unit = {
closed.set(true)
killSwitch.get.foreach(k => k.shutdown())
}
}
private[apiserver] object LedgerConfigProvider {
private[apiserver] object IndexStreamingCurrentLedgerConfiguration {
def owner(
index: IndexConfigManagementService,
optWriteService: Option[state.WriteConfigService],
timeProvider: TimeProvider,
config: LedgerConfiguration,
ledgerConfiguration: LedgerConfiguration,
)(implicit
materializer: Materializer,
loggingContext: LoggingContext,
): ResourceOwner[LedgerConfigProvider] =
for {
provider <- ResourceOwner.forCloseable(() =>
new LedgerConfigProvider(index, optWriteService, timeProvider, config, materializer)
)
_ <- ResourceOwner.forFuture(() =>
provider.ready.map(_ => provider)(materializer.executionContext)
)
} yield provider
): ResourceOwner[IndexStreamingCurrentLedgerConfiguration] =
ResourceOwner.forCloseable(() =>
new IndexStreamingCurrentLedgerConfiguration(index, ledgerConfiguration, materializer)
)
}

View File

@ -0,0 +1,45 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver.configuration
import akka.stream.Materializer
import com.daml.api.util.TimeProvider
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.participant.state.index.v2.IndexConfigManagementService
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.ledger.resources.ResourceOwner
import com.daml.logging.LoggingContext
import com.daml.platform.configuration.LedgerConfiguration
object LedgerConfigProvider {
def owner(
index: IndexConfigManagementService,
optWriteService: Option[state.WriteConfigService],
timeProvider: TimeProvider,
ledgerConfiguration: LedgerConfiguration,
)(implicit
materializer: Materializer,
loggingContext: LoggingContext,
): ResourceOwner[CurrentLedgerConfiguration] =
for {
// First, we acquire the mechanism for looking up the current ledger configuration.
currentLedgerConfiguration <-
IndexStreamingCurrentLedgerConfiguration.owner(index, ledgerConfiguration)
// Next, we provision the configuration if one does not already exist on the ledger.
_ <- optWriteService match {
case None => ResourceOwner.unit
case Some(writeService) =>
LedgerConfigProvisioner.owner(
currentLedgerConfiguration = currentLedgerConfiguration,
writeService = writeService,
timeProvider = timeProvider,
submissionIdGenerator = SubmissionIdGenerator.Random,
ledgerConfiguration = ledgerConfiguration,
)
}
// Finally, we wait until either an existing configuration or the provisioned configuration
// appears on the index.
_ <- ResourceOwner.forFuture(() => currentLedgerConfiguration.ready)
} yield currentLedgerConfiguration
}

View File

@ -0,0 +1,108 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver.configuration
import java.util.concurrent.atomic.AtomicBoolean
import akka.stream.Materializer
import com.daml.api.util.TimeProvider
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.ledger.resources.ResourceOwner
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.configuration.LedgerConfiguration
import com.daml.telemetry.{DefaultTelemetry, SpanKind, SpanName}
import scala.compat.java8.FutureConverters
import scala.concurrent.Future
import scala.concurrent.duration.DurationLong
/** Writes a default ledger configuration to the ledger, after a configurable delay. The
* configuration is only written if the ledger does not already have a configuration.
*
* Used by the participant to initialize a new ledger.
*/
private[apiserver] final class LedgerConfigProvisioner private (
currentLedgerConfiguration: CurrentLedgerConfiguration,
writeService: state.WriteConfigService,
timeProvider: TimeProvider,
submissionIdGenerator: SubmissionIdGenerator,
config: LedgerConfiguration,
materializer: Materializer,
)(implicit loggingContext: LoggingContext)
extends AutoCloseable {
private val logger = ContextualizedLogger.get(getClass)
private val closed: AtomicBoolean = new AtomicBoolean(false)
materializer.scheduleOnce(
config.initialConfigurationSubmitDelay.toNanos.nanos,
() => {
if (currentLedgerConfiguration.latestConfiguration.isEmpty && !closed.get)
submitInitialConfig(writeService)
()
},
)
// There are several reasons why the change could be rejected:
// - The participant is not authorized to set the configuration
// - There already is a configuration, it just didn't appear in the index yet
// This method therefore does not try to re-submit the initial configuration in case of failure.
private def submitInitialConfig(writeService: state.WriteConfigService): Future[Unit] = {
val submissionId = submissionIdGenerator.generate()
withEnrichedLoggingContext("submissionId" -> submissionId) { implicit loggingContext =>
logger.info(s"No ledger configuration found, submitting an initial configuration.")
DefaultTelemetry.runFutureInSpan(
SpanName.LedgerConfigProviderInitialConfig,
SpanKind.Internal,
) { implicit telemetryContext =>
FutureConverters
.toScala(
writeService.submitConfiguration(
Timestamp.assertFromInstant(timeProvider.getCurrentTime.plusSeconds(60)),
submissionId,
config.initialConfiguration,
)
)
.map {
case state.SubmissionResult.Acknowledged =>
logger.info(s"Initial configuration submission was successful.")
case result: state.SubmissionResult.SynchronousError =>
withEnrichedLoggingContext("error" -> result) { implicit loggingContext =>
logger.warn(s"Initial configuration submission failed.")
}
}(materializer.executionContext)
}
}
}
override def close(): Unit = {
closed.set(true)
}
}
object LedgerConfigProvisioner {
def owner(
currentLedgerConfiguration: CurrentLedgerConfiguration,
writeService: state.WriteConfigService,
timeProvider: TimeProvider,
submissionIdGenerator: SubmissionIdGenerator,
ledgerConfiguration: LedgerConfiguration,
)(implicit
materializer: Materializer,
loggingContext: LoggingContext,
): ResourceOwner[LedgerConfigProvisioner] =
ResourceOwner.forCloseable(() =>
new LedgerConfigProvisioner(
currentLedgerConfiguration,
writeService,
timeProvider,
submissionIdGenerator,
ledgerConfiguration,
materializer,
)
)
}

View File

@ -32,6 +32,7 @@ import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.apiserver.configuration.CurrentLedgerConfiguration
import com.daml.platform.apiserver.services.ApiCommandService._
import com.daml.platform.apiserver.services.tracking.{TrackerImpl, TrackerMap}
import com.daml.platform.server.api.ApiException
@ -50,7 +51,7 @@ import scala.util.Try
private[apiserver] final class ApiCommandService private (
services: LocalServices,
configuration: ApiCommandService.Configuration,
ledgerConfigProvider: LedgerConfigProvider,
currentLedgerConfiguration: CurrentLedgerConfiguration,
metrics: Metrics,
)(implicit
materializer: Materializer,
@ -86,7 +87,7 @@ private[apiserver] final class ApiCommandService private (
logging.readAsStrings(request.getCommands.readAs),
) { implicit loggingContext =>
if (running) {
ledgerConfigProvider.latestConfiguration.fold[Future[Completion]](
currentLedgerConfiguration.latestConfiguration.fold[Future[Completion]](
Future.failed(ErrorFactories.missingLedgerConfig())
)(ledgerConfig => track(request, ledgerConfig))
} else {
@ -196,7 +197,7 @@ private[apiserver] object ApiCommandService {
configuration: Configuration,
services: LocalServices,
timeProvider: TimeProvider,
ledgerConfigProvider: LedgerConfigProvider,
currentLedgerConfiguration: CurrentLedgerConfiguration,
metrics: Metrics,
)(implicit
materializer: Materializer,
@ -204,12 +205,12 @@ private[apiserver] object ApiCommandService {
loggingContext: LoggingContext,
): CommandServiceGrpc.CommandService with GrpcApiService =
new GrpcCommandService(
new ApiCommandService(services, configuration, ledgerConfigProvider, metrics),
new ApiCommandService(services, configuration, currentLedgerConfiguration, metrics),
ledgerId = configuration.ledgerId,
currentLedgerTime = () => timeProvider.getCurrentTime,
currentUtcTime = () => Instant.now,
maxDeduplicationTime = () =>
ledgerConfigProvider.latestConfiguration.map(_.maxDeduplicationTime),
currentLedgerConfiguration.latestConfiguration.map(_.maxDeduplicationTime),
generateSubmissionId = SubmissionIdGenerator.Random,
)

View File

@ -23,6 +23,7 @@ import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.apiserver.SeedService
import com.daml.platform.apiserver.configuration.CurrentLedgerConfiguration
import com.daml.platform.apiserver.execution.{CommandExecutionResult, CommandExecutor}
import com.daml.platform.server.api.services.domain.CommandSubmissionService
import com.daml.platform.server.api.services.grpc.GrpcCommandSubmissionService
@ -47,7 +48,7 @@ private[apiserver] object ApiSubmissionService {
partyManagementService: IndexPartyManagementService,
timeProvider: TimeProvider,
timeProviderType: TimeProviderType,
ledgerConfigProvider: LedgerConfigProvider,
currentLedgerConfiguration: CurrentLedgerConfiguration,
seedService: SeedService,
commandExecutor: CommandExecutor,
configuration: ApiSubmissionService.Configuration,
@ -63,7 +64,7 @@ private[apiserver] object ApiSubmissionService {
partyManagementService,
timeProvider,
timeProviderType,
ledgerConfigProvider,
currentLedgerConfiguration,
seedService,
commandExecutor,
configuration,
@ -73,7 +74,7 @@ private[apiserver] object ApiSubmissionService {
currentLedgerTime = () => timeProvider.getCurrentTime,
currentUtcTime = () => Instant.now,
maxDeduplicationTime = () =>
ledgerConfigProvider.latestConfiguration.map(_.maxDeduplicationTime),
currentLedgerConfiguration.latestConfiguration.map(_.maxDeduplicationTime),
submissionIdGenerator = SubmissionIdGenerator.Random,
metrics = metrics,
)
@ -90,7 +91,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
partyManagementService: IndexPartyManagementService,
timeProvider: TimeProvider,
timeProviderType: TimeProviderType,
ledgerConfigProvider: LedgerConfigProvider,
currentLedgerConfiguration: CurrentLedgerConfiguration,
seedService: SeedService,
commandExecutor: CommandExecutor,
configuration: ApiSubmissionService.Configuration,
@ -110,7 +111,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
withEnrichedLoggingContext(logging.commands(request.commands)) { implicit loggingContext =>
logger.info("Submitting transaction")
logger.trace(s"Commands: ${request.commands.commands.commands}")
ledgerConfigProvider.latestConfiguration
currentLedgerConfiguration.latestConfiguration
.map(deduplicateAndRecordOnLedger(seedService.nextSeed(), request.commands, _))
.getOrElse(Future.failed(ErrorFactories.missingLedgerConfig()))
.andThen(logger.logErrorsOnCall[Unit])

View File

@ -0,0 +1,128 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver.configuration
import java.time.Duration
import akka.stream.scaladsl.Source
import com.daml.ledger.api.domain.{ConfigurationEntry, LedgerOffset}
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.index.v2.IndexConfigManagementService
import com.daml.ledger.resources.ResourceContext
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext
import com.daml.platform.apiserver.configuration.IndexStreamingCurrentLedgerConfigurationSpec._
import com.daml.platform.configuration.LedgerConfiguration
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import scala.concurrent.Future
final class IndexStreamingCurrentLedgerConfigurationSpec
extends AsyncWordSpec
with Matchers
with Eventually
with AkkaBeforeAndAfterAll
with MockitoSugar
with ArgumentMatchersSugar {
private implicit val resourceContext: ResourceContext = ResourceContext(executionContext)
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
"the current ledger configuration" should {
"look up the latest configuration from the index on startup" in {
val currentConfiguration =
Configuration(7, LedgerTimeModel.reasonableDefault, Duration.ofDays(1))
val ledgerConfiguration = LedgerConfiguration(
initialConfiguration = Configuration(0, LedgerTimeModel.reasonableDefault, Duration.ZERO),
initialConfigurationSubmitDelay = Duration.ZERO,
configurationLoadTimeout = Duration.ofSeconds(5),
)
val index = mock[IndexConfigManagementService]
when(index.lookupConfiguration())
.thenReturn(Future.successful(Some(offset("0001") -> currentConfiguration)))
IndexStreamingCurrentLedgerConfiguration
.owner(index, ledgerConfiguration)
.use { currentLedgerConfiguration =>
currentLedgerConfiguration.ready.map { _ =>
currentLedgerConfiguration.latestConfiguration should be(Some(currentConfiguration))
succeed
}
}
}
"stream the latest configuration from the index" in {
val configurations = List(
offset("000a") -> Configuration(
generation = 3,
timeModel = LedgerTimeModel.reasonableDefault.copy(maxSkew = Duration.ofMinutes(1)),
maxDeduplicationTime = Duration.ofDays(1),
),
offset("0023") -> Configuration(
generation = 4,
timeModel = LedgerTimeModel.reasonableDefault.copy(maxSkew = Duration.ofMinutes(2)),
maxDeduplicationTime = Duration.ofDays(1),
),
offset("01ef") -> Configuration(
generation = 5,
timeModel = LedgerTimeModel.reasonableDefault.copy(maxSkew = Duration.ofMinutes(2)),
maxDeduplicationTime = Duration.ofHours(6),
),
)
val configurationEntries = configurations.zipWithIndex.map {
case ((offset, configuration), index) =>
offset -> ConfigurationEntry.Accepted(s"submission ID #$index", configuration)
}
val ledgerConfiguration = LedgerConfiguration(
initialConfiguration = Configuration(0, LedgerTimeModel.reasonableDefault, Duration.ZERO),
initialConfigurationSubmitDelay = Duration.ZERO,
configurationLoadTimeout = Duration.ofSeconds(5),
)
val index = mock[IndexConfigManagementService]
when(index.lookupConfiguration()).thenReturn(Future.successful(None))
when(index.configurationEntries(None))
.thenReturn(Source(configurationEntries).concat(Source.never))
IndexStreamingCurrentLedgerConfiguration
.owner(index, ledgerConfiguration)
.use { currentLedgerConfiguration =>
currentLedgerConfiguration.ready.map { _ =>
eventually {
currentLedgerConfiguration.latestConfiguration should be(Some(configurations.last._2))
}
succeed
}
}
}
"give up waiting if the configuration takes too long to appear" in {
val index = mock[IndexConfigManagementService]
when(index.lookupConfiguration()).thenReturn(Future.successful(None))
when(index.configurationEntries(None)).thenReturn(Source.never)
val ledgerConfiguration = LedgerConfiguration(
initialConfiguration = Configuration(0, LedgerTimeModel.reasonableDefault, Duration.ZERO),
initialConfigurationSubmitDelay = Duration.ZERO,
configurationLoadTimeout = Duration.ofMillis(500),
)
IndexStreamingCurrentLedgerConfiguration
.owner(index, ledgerConfiguration)
.use { ledgerConfigProvider =>
ledgerConfigProvider.latestConfiguration should be(None)
}
}
}
}
object IndexStreamingCurrentLedgerConfigurationSpec {
private def offset(value: String): LedgerOffset.Absolute =
LedgerOffset.Absolute(Ref.LedgerString.assertFromString(value))
}

View File

@ -0,0 +1,164 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver.configuration
import java.time.{Duration, Instant}
import java.util.concurrent.atomic.AtomicReference
import com.daml.api.util.TimeProvider
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.ledger.resources.ResourceContext
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.SubmissionId
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext
import com.daml.platform.configuration.LedgerConfiguration
import com.daml.telemetry.TelemetryContext
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import scala.concurrent.duration.DurationInt
final class LedgerConfigProvisionerSpec
extends AsyncWordSpec
with Matchers
with Eventually
with AkkaBeforeAndAfterAll
with MockitoSugar
with ArgumentMatchersSugar {
private implicit val resourceContext: ResourceContext = ResourceContext(executionContext)
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
"Ledger Config Provider" should {
"write a ledger configuration to the index if one is not provided" in {
val configurationToSubmit =
Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofDays(1))
val ledgerConfiguration = LedgerConfiguration(
configurationToSubmit,
initialConfigurationSubmitDelay = Duration.ofMillis(100),
configurationLoadTimeout = Duration.ZERO,
)
val submissionId = Ref.SubmissionId.assertFromString("the submission ID")
val currentLedgerConfiguration = new CurrentLedgerConfiguration {
override def latestConfiguration: Option[Configuration] = None
}
val writeService = mock[state.WriteConfigService]
val timeProvider = TimeProvider.Constant(Instant.EPOCH)
val submissionIdGenerator = new SubmissionIdGenerator {
override def generate(): SubmissionId = submissionId
}
LedgerConfigProvisioner
.owner(
currentLedgerConfiguration = currentLedgerConfiguration,
writeService = writeService,
timeProvider = timeProvider,
submissionIdGenerator = submissionIdGenerator,
ledgerConfiguration = ledgerConfiguration,
)
.use { _ =>
eventually(PatienceConfiguration.Timeout(1.second)) {
verify(writeService).submitConfiguration(
eqTo(Timestamp.assertFromInstant(timeProvider.getCurrentTime.plusSeconds(60))),
eqTo(submissionId),
eqTo(configurationToSubmit),
)(any[TelemetryContext])
}
succeed
}
}
"not write a configuration if one is provided" in {
val currentConfiguration =
Configuration(6, LedgerTimeModel.reasonableDefault, Duration.ofHours(12))
val ledgerConfiguration = LedgerConfiguration(
initialConfiguration =
Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofDays(1)),
initialConfigurationSubmitDelay = Duration.ofMillis(100),
configurationLoadTimeout = Duration.ZERO,
)
val currentLedgerConfiguration = new CurrentLedgerConfiguration {
override def latestConfiguration: Option[Configuration] = Some(currentConfiguration)
}
val writeService = mock[state.WriteConfigService]
val timeProvider = TimeProvider.Constant(Instant.EPOCH)
val submissionIdGenerator = new SubmissionIdGenerator {
override def generate(): SubmissionId = {
fail("We should never generate a submission.")
}
}
LedgerConfigProvisioner
.owner(
currentLedgerConfiguration = currentLedgerConfiguration,
writeService = writeService,
timeProvider = timeProvider,
submissionIdGenerator = submissionIdGenerator,
ledgerConfiguration = ledgerConfiguration,
)
.use { _ =>
verify(writeService, after(1.second.toMillis.toInt).never())
.submitConfiguration(
any[Timestamp],
any[Ref.SubmissionId],
any[Configuration],
)(any[TelemetryContext])
succeed
}
}
}
"not write a configuration if one is provided within the time window" in {
val eventualConfiguration =
Configuration(8, LedgerTimeModel.reasonableDefault, Duration.ofDays(3))
val ledgerConfiguration = LedgerConfiguration(
initialConfiguration =
Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofDays(1)),
initialConfigurationSubmitDelay = Duration.ofSeconds(1),
configurationLoadTimeout = Duration.ZERO,
)
val currentConfiguration = new AtomicReference[Option[Configuration]](None)
val currentLedgerConfiguration = new CurrentLedgerConfiguration {
override def latestConfiguration: Option[Configuration] = currentConfiguration.get
}
val writeService = mock[state.WriteConfigService]
val timeProvider = TimeProvider.Constant(Instant.EPOCH)
val submissionIdGenerator = new SubmissionIdGenerator {
override def generate(): SubmissionId = {
fail("We should never generate a submission.")
}
}
LedgerConfigProvisioner
.owner(
currentLedgerConfiguration = currentLedgerConfiguration,
writeService = writeService,
timeProvider = timeProvider,
submissionIdGenerator = submissionIdGenerator,
ledgerConfiguration = ledgerConfiguration,
)
.use { _ =>
materializer.scheduleOnce(
100.millis,
{ () => currentConfiguration.set(Some(eventualConfiguration)) },
)
verify(writeService, after(1.second.toMillis.toInt).never())
.submitConfiguration(
any[Timestamp],
any[Ref.SubmissionId],
any[Configuration],
)(any[TelemetryContext])
succeed
}
}
}

View File

@ -8,31 +8,19 @@ import java.util.UUID
import java.util.concurrent.CompletableFuture.completedFuture
import java.util.concurrent.atomic.AtomicInteger
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.codahale.metrics.MetricRegistry
import com.daml.api.util.TimeProvider
import com.daml.ledger.api.DomainMocks
import com.google.rpc.status.{Status => RpcStatus}
import com.daml.ledger.api.domain.{
CommandId,
Commands,
LedgerId,
LedgerOffset,
PartyDetails,
SubmissionId,
}
import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails, SubmissionId}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.index.v2.{
CommandDeduplicationNew,
IndexConfigManagementService,
IndexPartyManagementService,
IndexSubmissionService,
}
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.ledger.resources.{ResourceOwner, TestResourceContext}
import com.daml.ledger.resources.TestResourceContext
import com.daml.lf
import com.daml.lf.command.{Commands => LfCommands}
import com.daml.lf.crypto.Hash
@ -47,10 +35,12 @@ import com.daml.lf.value.Value
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.apiserver.SeedService
import com.daml.platform.apiserver.configuration.CurrentLedgerConfiguration
import com.daml.platform.apiserver.execution.CommandExecutor
import com.daml.platform.configuration.LedgerConfiguration
import com.daml.platform.apiserver.services.ApiSubmissionServiceSpec._
import com.daml.platform.store.ErrorCause
import com.daml.telemetry.{NoOpTelemetryContext, TelemetryContext}
import com.google.rpc.status.{Status => RpcStatus}
import io.grpc.Status
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
import org.scalatest.Inside
@ -100,7 +90,6 @@ class ApiSubmissionServiceSpec
it should "allocate missing informees" in {
val partyManagementService = mock[IndexPartyManagementService]
val writeService = mock[state.WriteService]
when(partyManagementService.getParties(any[Seq[Ref.Party]])(any[LoggingContext]))
.thenAnswer[Seq[Ref.Party]] { parties =>
Future.successful(
@ -110,7 +99,6 @@ class ApiSubmissionServiceSpec
.toList
)
}
when(
writeService.allocateParty(
any[Option[Ref.Party]],
@ -119,38 +107,33 @@ class ApiSubmissionServiceSpec
)(any[TelemetryContext])
).thenReturn(completedFuture(state.SubmissionResult.Acknowledged))
submissionService(
writeService,
partyManagementService,
implicitPartyAllocation = true,
).use { service =>
for {
results <- service.allocateMissingInformees(transaction)
} yield {
results should have size 100
all(results) should be(state.SubmissionResult.Acknowledged)
missingParties.foreach { party =>
verify(writeService).allocateParty(
eqTo(Some(Ref.Party.assertFromString(party))),
eqTo(Some(party)),
any[Ref.SubmissionId],
)(any[TelemetryContext])
}
verifyNoMoreInteractions(writeService)
succeed
val service =
newSubmissionService(writeService, partyManagementService, implicitPartyAllocation = true)
for {
results <- service.allocateMissingInformees(transaction)
} yield {
results should have size 100
all(results) should be(state.SubmissionResult.Acknowledged)
missingParties.foreach { party =>
verify(writeService).allocateParty(
eqTo(Some(Ref.Party.assertFromString(party))),
eqTo(Some(party)),
any[Ref.SubmissionId],
)(any[TelemetryContext])
}
verifyNoMoreInteractions(writeService)
succeed
}
}
it should "not allocate if all parties are already known" in {
val partyManagementService = mock[IndexPartyManagementService]
val writeService = mock[state.WriteService]
when(partyManagementService.getParties(any[Seq[Ref.Party]])(any[LoggingContext]))
.thenAnswer[Seq[Ref.Party]] { parties =>
Future.successful(parties.view.map(PartyDetails(_, Option.empty, isLocal = true)).toList)
}
when(
writeService.allocateParty(
any[Option[Ref.Party]],
@ -159,44 +142,41 @@ class ApiSubmissionServiceSpec
)(any[TelemetryContext])
).thenReturn(completedFuture(state.SubmissionResult.Acknowledged))
submissionService(
writeService,
partyManagementService,
implicitPartyAllocation = true,
).use { service =>
for {
result <- service.allocateMissingInformees(transaction)
} yield {
result shouldBe Seq.empty[state.SubmissionResult]
verify(writeService, never).allocateParty(
any[Option[Ref.Party]],
any[Option[String]],
any[Ref.SubmissionId],
)(any[TelemetryContext])
succeed
}
val service =
newSubmissionService(writeService, partyManagementService, implicitPartyAllocation = true)
for {
result <- service.allocateMissingInformees(transaction)
} yield {
result shouldBe Seq.empty[state.SubmissionResult]
verify(writeService, never).allocateParty(
any[Option[Ref.Party]],
any[Option[String]],
any[Ref.SubmissionId],
)(any[TelemetryContext])
succeed
}
}
it should "not allocate missing informees if implicit party allocation is disabled" in {
val partyManagementService = mock[IndexPartyManagementService]
val writeService = mock[state.WriteService]
submissionService(
mock[state.WriteService],
mock[IndexPartyManagementService],
val service = newSubmissionService(
writeService,
partyManagementService,
implicitPartyAllocation = false,
).use { service =>
for {
result <- service.allocateMissingInformees(transaction)
} yield {
result shouldBe Seq.empty[state.SubmissionResult]
verify(writeService, never).allocateParty(
any[Option[Ref.Party]],
any[Option[String]],
any[Ref.SubmissionId],
)(any[TelemetryContext])
succeed
}
)
for {
result <- service.allocateMissingInformees(transaction)
} yield {
result shouldBe Seq.empty[state.SubmissionResult]
verify(writeService, never).allocateParty(
any[Option[Ref.Party]],
any[Option[String]],
any[Ref.SubmissionId],
)(any[TelemetryContext])
succeed
}
}
@ -231,16 +211,16 @@ class ApiSubmissionServiceSpec
)
val transaction = builder.buildSubmitted()
submissionService(
val service = newSubmissionService(
writeService,
partyManagementService,
implicitPartyAllocation = true,
).use { service =>
for {
result <- service.allocateMissingInformees(transaction)
} yield {
result shouldBe Seq(submissionFailure)
}
)
for {
result <- service.allocateMissingInformees(transaction)
} yield {
result shouldBe Seq(submissionFailure)
}
}
@ -302,64 +282,71 @@ class ApiSubmissionServiceSpec
val commandId = new AtomicInteger()
val mockCommandExecutor = mock[CommandExecutor]
submissionService(
val service = newSubmissionService(
writeService,
partyManagementService,
implicitPartyAllocation = true,
commandExecutor = mockCommandExecutor,
).use { service =>
Future
.sequence(errorsToStatuses.map { case (error, code) =>
val submitRequest = SubmitRequest(
Commands(
ledgerId = LedgerId("ledger-id"),
workflowId = None,
applicationId = DomainMocks.applicationId,
commandId = CommandId(
Ref.CommandId.assertFromString(s"commandId-${commandId.incrementAndGet()}")
),
submissionId =
SubmissionId(Ref.SubmissionId.assertFromString(UUID.randomUUID().toString)),
actAs = Set.empty,
readAs = Set.empty,
submittedAt = Instant.MIN,
deduplicationDuration = Duration.ZERO,
commands = LfCommands(ImmArray.empty, Timestamp.MinValue, ""),
)
)
Future
.sequence(errorsToStatuses.map { case (error, code) =>
val submitRequest = SubmitRequest(
Commands(
ledgerId = LedgerId("ledger-id"),
workflowId = None,
applicationId = DomainMocks.applicationId,
commandId = CommandId(
Ref.CommandId.assertFromString(s"commandId-${commandId.incrementAndGet()}")
),
submissionId =
SubmissionId(Ref.SubmissionId.assertFromString(UUID.randomUUID().toString)),
actAs = Set.empty,
readAs = Set.empty,
submittedAt = Instant.MIN,
deduplicationDuration = Duration.ZERO,
commands = LfCommands(ImmArray.empty, Timestamp.MinValue, ""),
)
when(
mockCommandExecutor.execute(
eqTo(submitRequest.commands),
any[Hash],
any[Configuration],
)(any[ExecutionContext], any[LoggingContext])
).thenReturn(Future.successful(Left(error)))
)
when(
mockCommandExecutor.execute(
eqTo(submitRequest.commands),
any[Hash],
any[Configuration],
)(any[ExecutionContext], any[LoggingContext])
).thenReturn(Future.successful(Left(error)))
service.submit(submitRequest).transform(result => Success(code -> result))
})
}.map { results =>
results.foreach { case (code, result) =>
inside(result) { case Failure(exception) =>
exception.getMessage should startWith(code.getCode.toString)
service.submit(submitRequest).transform(result => Success(code -> result))
})
.map { results =>
results.foreach { case (code, result) =>
inside(result) { case Failure(exception) =>
exception.getMessage should startWith(code.getCode.toString)
}
}
succeed
}
succeed
}
}
}
private def submissionService(
object ApiSubmissionServiceSpec {
import ArgumentMatchersSugar._
import MockitoSugar._
private def newSubmissionService(
writeService: state.WriteService,
partyManagementService: IndexPartyManagementService,
implicitPartyAllocation: Boolean,
commandExecutor: CommandExecutor = null,
)(implicit materializer: Materializer): ResourceOwner[ApiSubmissionService] = {
val configManagementService = mock[IndexConfigManagementService]
val offset = LedgerOffset.Absolute(Ref.LedgerString.assertFromString("offset"))
val configuration = Configuration(0L, LedgerTimeModel.reasonableDefault, Duration.ZERO)
when(configManagementService.lookupConfiguration())
.thenReturn(Future.successful(Some((offset, configuration))))
when(configManagementService.configurationEntries(Some(offset)))
.thenReturn(Source.empty)
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): ApiSubmissionService = {
val currentLedgerConfiguration = new CurrentLedgerConfiguration {
override def latestConfiguration: Option[Configuration] =
Some(Configuration(0L, LedgerTimeModel.reasonableDefault, Duration.ZERO))
}
val indexSubmissionService = mock[IndexSubmissionService]
when(
@ -377,30 +364,17 @@ class ApiSubmissionServiceSpec
)(any[LoggingContext])
).thenReturn(Future.unit)
LedgerConfigProvider
.owner(
configManagementService,
optWriteService = None,
timeProvider = TimeProvider.Constant(Instant.EPOCH),
config = LedgerConfiguration(
initialConfiguration = configuration,
initialConfigurationSubmitDelay = Duration.ZERO,
configurationLoadTimeout = Duration.ZERO,
),
)
.map(configProvider =>
new ApiSubmissionService(
writeService = writeService,
submissionService = indexSubmissionService,
partyManagementService = partyManagementService,
timeProvider = null,
timeProviderType = null,
ledgerConfigProvider = configProvider,
seedService = SeedService.WeakRandom,
commandExecutor = commandExecutor,
configuration = ApiSubmissionService.Configuration(implicitPartyAllocation),
metrics = new Metrics(new MetricRegistry),
)
)
new ApiSubmissionService(
writeService = writeService,
submissionService = indexSubmissionService,
partyManagementService = partyManagementService,
timeProvider = null,
timeProviderType = null,
currentLedgerConfiguration = currentLedgerConfiguration,
seedService = SeedService.WeakRandom,
commandExecutor = commandExecutor,
configuration = ApiSubmissionService.Configuration(implicitPartyAllocation),
metrics = new Metrics(new MetricRegistry),
)
}
}

View File

@ -1,172 +0,0 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver.services
import java.time.{Duration, Instant}
import java.util.concurrent.{CompletableFuture, CompletionStage}
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.{Materializer, OverflowStrategy}
import com.daml.api.util.TimeProvider
import com.daml.ledger.api.domain.{ConfigurationEntry, LedgerOffset}
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.index.v2.IndexConfigManagementService
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.ledger.resources.ResourceContext
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext
import com.daml.platform.apiserver.services.LedgerConfigProviderSpec._
import com.daml.platform.configuration.LedgerConfiguration
import com.daml.telemetry.TelemetryContext
import org.mockito.MockitoSugar
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
final class LedgerConfigProviderSpec
extends AsyncWordSpec
with Matchers
with AkkaBeforeAndAfterAll
with MockitoSugar {
private implicit val resourceContext: ResourceContext = ResourceContext(executionContext)
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
"Ledger Config Provider" should {
"read an existing ledger configuration from the index" in {
val index = mock[IndexConfigManagementService]
val writeService = mock[state.WriteConfigService]
val configuration = configurationWith(generation = 7)
when(index.lookupConfiguration())
.thenReturn(Future.successful(Some(offset("0001") -> configuration)))
LedgerConfigProvider
.owner(
index,
optWriteService = Some(writeService),
timeProvider = someTimeProvider,
config = LedgerConfiguration(
initialConfiguration = configurationWith(generation = 3),
initialConfigurationSubmitDelay = Duration.ofSeconds(1),
configurationLoadTimeout = Duration.ofSeconds(5),
),
)
.use { ledgerConfigProvider =>
ledgerConfigProvider.ready
.map { _ =>
verifyZeroInteractions(writeService)
ledgerConfigProvider.latestConfiguration should be(Some(configuration))
}
.andThen { case _ =>
ledgerConfigProvider.close()
}
}
}
"write a ledger configuration to the index if one is not provided" in {
val index = mock[IndexConfigManagementService]
when(index.lookupConfiguration()).thenReturn(Future.successful(None))
val writeService = new FakeWriteConfigService
when(index.configurationEntries(None)).thenReturn(writeService.configurationSource)
val configurationToSubmit = configurationWith(generation = 1)
LedgerConfigProvider
.owner(
index,
optWriteService = Some(writeService),
timeProvider = someTimeProvider,
config = LedgerConfiguration(
configurationToSubmit,
initialConfigurationSubmitDelay = Duration.ofMillis(100),
configurationLoadTimeout = Duration.ofSeconds(5),
),
)
.use { ledgerConfigProvider =>
ledgerConfigProvider.ready
.map { _ =>
ledgerConfigProvider.latestConfiguration should be(Some(configurationToSubmit))
}
.andThen { case _ =>
ledgerConfigProvider.close()
}
}
}
"if the write takes too long, give up waiting" in {
val index = mock[IndexConfigManagementService]
when(index.lookupConfiguration()).thenReturn(Future.successful(None))
val writeService = new FakeWriteConfigService(delay = 5.seconds)
when(index.configurationEntries(None)).thenReturn(writeService.configurationSource)
val configurationToSubmit = configurationWith(generation = 1)
LedgerConfigProvider
.owner(
index,
optWriteService = Some(writeService),
timeProvider = someTimeProvider,
config = LedgerConfiguration(
configurationToSubmit,
initialConfigurationSubmitDelay = Duration.ZERO,
configurationLoadTimeout = Duration.ofMillis(500),
),
)
.use { ledgerConfigProvider =>
ledgerConfigProvider.ready
.map { _ =>
ledgerConfigProvider.latestConfiguration should be(None)
}
.andThen { case _ =>
ledgerConfigProvider.close()
}
}
}
}
}
object LedgerConfigProviderSpec {
private type ConfigurationSourceEntry = (LedgerOffset.Absolute, ConfigurationEntry)
private val someTimeProvider = TimeProvider.Constant(Instant.EPOCH)
private def offset(value: String): LedgerOffset.Absolute = {
LedgerOffset.Absolute(Ref.LedgerString.assertFromString(value))
}
private def configurationWith(generation: Long): Configuration = {
Configuration(generation, LedgerTimeModel.reasonableDefault, Duration.ofDays(1))
}
private final class FakeWriteConfigService(
delay: FiniteDuration = scala.concurrent.duration.Duration.Zero
)(implicit materializer: Materializer)
extends state.WriteConfigService {
private var currentOffset = 0
private val (queue, source) = Source
.queue[ConfigurationSourceEntry](bufferSize = 8, OverflowStrategy.backpressure)
.preMaterialize()
val configurationSource: Source[ConfigurationSourceEntry, NotUsed] = source
override def submitConfiguration(
maxRecordTime: Timestamp,
submissionId: Ref.SubmissionId,
config: Configuration,
)(implicit telemetryContext: TelemetryContext): CompletionStage[state.SubmissionResult] =
CompletableFuture.supplyAsync { () =>
Thread.sleep(delay.toMillis)
currentOffset += 1
queue.offer(
offset(currentOffset.toString) -> ConfigurationEntry.Accepted(submissionId, config)
)
state.SubmissionResult.Acknowledged
}
}
}

View File

@ -3,6 +3,7 @@
package com.daml.ledger.participant.state.v2
import com.daml.logging.entries.{LoggingValue, ToLoggingValue}
import io.grpc.{Status, StatusRuntimeException}
sealed abstract class SubmissionResult extends Product with Serializable {
@ -30,4 +31,15 @@ object SubmissionResult {
def exception: StatusRuntimeException = status.asRuntimeException
}
object SynchronousError {
implicit val `SynchronousError to LoggingValue`: ToLoggingValue[SynchronousError] =
error =>
LoggingValue.Nested.fromEntries(
"status" -> LoggingValue.Nested.fromEntries(
"code" -> error.grpcError.code,
"message" -> error.grpcError.message,
)
)
}
}