Always use the latest ledger config (#5669)

CHANGELOG_BEGIN
- [Sandbox] The ledger API server will now always use the most recent ledger configuration.
  Until a ledger configuration is read from the ledger, command submissions will fail with the UNAVAILABLE error.
CHANGELOG_END

In kvutils, the first ledger configuration change needs
to have a generation one higher than the one returned
by getLedgerInitialConditions().

Remove initial config writing from sandbox as it's now written by the ledger API server
This commit is contained in:
Robert Autenrieth 2020-05-06 12:12:23 +02:00 committed by GitHub
parent c21768c616
commit 7e448d810c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 487 additions and 244 deletions

View File

@ -35,7 +35,7 @@ final class CommandsValidator(ledgerId: LedgerId) {
commands: ProtoCommands,
currentLedgerTime: Instant,
currentUtcTime: Instant,
maxDeduplicationTime: Duration): Either[StatusRuntimeException, domain.Commands] =
maxDeduplicationTime: Option[Duration]): Either[StatusRuntimeException, domain.Commands] =
for {
cmdLegerId <- requireLedgerString(commands.ledgerId, "ledger_id")
ledgerId <- matchLedgerId(ledgerId)(LedgerId(cmdLegerId))

View File

@ -17,7 +17,8 @@ class SubmitAndWaitRequestValidator(commandsValidator: CommandsValidator) {
req: SubmitAndWaitRequest,
currentLedgerTime: Instant,
currentUtcTime: Instant,
maxDeduplicationTime: Duration): Either[StatusRuntimeException, submission.SubmitRequest] =
maxDeduplicationTime: Option[Duration])
: Either[StatusRuntimeException, submission.SubmitRequest] =
for {
commands <- requirePresence(req.commands, "commands")
validatedCommands <- commandsValidator.validateCommands(

View File

@ -17,7 +17,8 @@ class SubmitRequestValidator(commandsValidator: CommandsValidator) {
req: SubmitRequest,
currentLedgerTime: Instant,
currentUtcTime: Instant,
maxDeduplicationTime: Duration): Either[StatusRuntimeException, submission.SubmitRequest] =
maxDeduplicationTime: Option[Duration])
: Either[StatusRuntimeException, submission.SubmitRequest] =
for {
commands <- requirePresence(req.commands, "commands")
validatedCommands <- commandsValidator.validateCommands(

View File

@ -23,7 +23,7 @@ class GrpcCommandService(
val ledgerId: LedgerId,
currentLedgerTime: () => Instant,
currentUtcTime: () => Instant,
maxDeduplicationTime: () => Duration
maxDeduplicationTime: () => Option[Duration]
) extends CommandService
with GrpcApiService
with ProxyCloseable {

View File

@ -30,7 +30,7 @@ class GrpcCommandSubmissionService(
ledgerId: LedgerId,
currentLedgerTime: () => Instant,
currentUtcTime: () => Instant,
maxDeduplicationTime: () => Duration,
maxDeduplicationTime: () => Option[Duration],
metrics: Metrics,
) extends ApiCommandSubmissionService
with ProxyCloseable

View File

@ -43,6 +43,9 @@ trait ErrorFactories {
def unauthenticated(): StatusRuntimeException =
grpcError(Status.UNAUTHENTICATED)
def missingLedgerConfig(): StatusRuntimeException =
grpcError(Status.UNAVAILABLE.withDescription("The ledger configuration is not available."))
def resourceExhausted(description: String): StatusRuntimeException =
grpcError(Status.RESOURCE_EXHAUSTED.withDescription(description))

View File

@ -92,21 +92,24 @@ trait FieldValidations {
def validateDeduplicationTime(
durationO: Option[com.google.protobuf.duration.Duration],
maxDeduplicationTime: Duration,
fieldName: String): Either[StatusRuntimeException, Duration] = durationO match {
case None =>
Right(maxDeduplicationTime)
case Some(duration) =>
val result = Duration.ofSeconds(duration.seconds, duration.nanos.toLong)
if (result.isNegative)
Left(invalidField(fieldName, "Duration must be positive"))
else if (result.compareTo(maxDeduplicationTime) > 0)
Left(invalidField(
fieldName,
s"The given deduplication time of $result exceeds the maximum deduplication time of $maxDeduplicationTime"))
else
Right(result)
}
maxDeduplicationTimeO: Option[Duration],
fieldName: String): Either[StatusRuntimeException, Duration] =
maxDeduplicationTimeO.fold[Either[StatusRuntimeException, Duration]](
Left(missingLedgerConfig()))(maxDeduplicationTime =>
durationO match {
case None =>
Right(maxDeduplicationTime)
case Some(duration) =>
val result = Duration.ofSeconds(duration.seconds, duration.nanos.toLong)
if (result.isNegative)
Left(invalidField(fieldName, "Duration must be positive"))
else if (result.compareTo(maxDeduplicationTime) > 0)
Left(invalidField(
fieldName,
s"The given deduplication time of $result exceeds the maximum deduplication time of $maxDeduplicationTime"))
else
Right(result)
})
def validateIdentifier(identifier: Identifier): Either[StatusRuntimeException, Ref.Identifier] =
for {

View File

@ -19,7 +19,7 @@ import com.daml.ledger.api.v1.value.Value.Sum
import com.daml.ledger.api.v1.value.{List => ApiList, Map => ApiMap, Optional => ApiOptional, _}
import com.google.protobuf.duration.Duration
import com.google.protobuf.empty.Empty
import io.grpc.Status.Code.INVALID_ARGUMENT
import io.grpc.Status.Code.{INVALID_ARGUMENT, UNAVAILABLE}
import org.scalatest.WordSpec
import org.scalatest.prop.TableDrivenPropertyChecks
import scalaz.syntax.tag._
@ -124,7 +124,7 @@ class SubmitRequestValidatorTest
api.commands.withCommands(Seq.empty),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime),
Some(internal.maxDeduplicationTime)),
INVALID_ARGUMENT,
"Missing field: commands"
)
@ -137,7 +137,7 @@ class SubmitRequestValidatorTest
api.commands.withLedgerId(""),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime),
Some(internal.maxDeduplicationTime)),
INVALID_ARGUMENT,
"Missing field: ledger_id"
)
@ -148,7 +148,7 @@ class SubmitRequestValidatorTest
api.commands.withWorkflowId(""),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime) shouldEqual Right(
Some(internal.maxDeduplicationTime)) shouldEqual Right(
internal.emptyCommands.copy(
workflowId = None,
commands = internal.emptyCommands.commands.copy(commandsReference = "")))
@ -160,7 +160,7 @@ class SubmitRequestValidatorTest
api.commands.withApplicationId(""),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime),
Some(internal.maxDeduplicationTime)),
INVALID_ARGUMENT,
"Missing field: application_id"
)
@ -172,7 +172,7 @@ class SubmitRequestValidatorTest
api.commands.withCommandId(""),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime),
Some(internal.maxDeduplicationTime)),
INVALID_ARGUMENT,
"Missing field: command_id"
)
@ -185,7 +185,7 @@ class SubmitRequestValidatorTest
api.commands.withParty(""),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime),
Some(internal.maxDeduplicationTime)),
INVALID_ARGUMENT,
"""Missing field: party"""
)
@ -198,7 +198,7 @@ class SubmitRequestValidatorTest
minLedgerTimeAbs = Some(TimestampConversion.fromInstant(minLedgerTimeAbs))),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime
Some(internal.maxDeduplicationTime)
) shouldEqual Right(withLedgerTime(internal.emptyCommands, minLedgerTimeAbs))
}
@ -209,7 +209,7 @@ class SubmitRequestValidatorTest
minLedgerTimeRel = Some(DurationConversion.toProto(internal.timeDelta))),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime
Some(internal.maxDeduplicationTime)
) shouldEqual Right(withLedgerTime(internal.emptyCommands, minLedgerTimeAbs))
}
@ -219,7 +219,7 @@ class SubmitRequestValidatorTest
api.commands.copy(deduplicationTime = Some(Duration.of(-1, 0))),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime),
Some(internal.maxDeduplicationTime)),
INVALID_ARGUMENT,
"Invalid field deduplication_time: Duration must be positive"
)
@ -232,7 +232,7 @@ class SubmitRequestValidatorTest
api.commands.copy(deduplicationTime = Some(Duration.of(manySeconds, 0))),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime),
Some(internal.maxDeduplicationTime)),
INVALID_ARGUMENT,
s"Invalid field deduplication_time: The given deduplication time of ${java.time.Duration
.ofSeconds(manySeconds)} exceeds the maximum deduplication time of ${internal.maxDeduplicationTime}"
@ -244,11 +244,20 @@ class SubmitRequestValidatorTest
api.commands.copy(deduplicationTime = None),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime) shouldEqual Right(
Some(internal.maxDeduplicationTime)) shouldEqual Right(
internal.emptyCommands.copy(
deduplicateUntil = internal.submittedAt.plus(internal.maxDeduplicationTime)))
}
"not allow missing ledger configuration" in {
requestMustFailWith(
commandsValidator
.validateCommands(api.commands, internal.ledgerTime, internal.submittedAt, None),
UNAVAILABLE,
"The ledger configuration is not available."
)
}
}
"validating contractId values" should {

View File

@ -18,6 +18,7 @@ import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.apiserver.ApiServerConfig
import com.daml.platform.configuration.LedgerConfiguration
import com.daml.resources.{ProgramResource, ResourceOwner}
import scopt.OptionParser
@ -66,6 +67,9 @@ object Main {
engine = engine,
)
override def ledgerConfig(config: Config[ExtraConfig]): LedgerConfiguration =
LedgerConfiguration.defaultLocalLedger
override val defaultExtraConfig: ExtraConfig = ExtraConfig.default
override final def extraConfigParser(parser: OptionParser[Config[ExtraConfig]]): Unit = {

View File

@ -3,6 +3,8 @@
package com.daml.ledger.on.sql
import java.time.Duration
import akka.stream.Materializer
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.kvutils.app.{
@ -16,6 +18,7 @@ import com.daml.ledger.participant.state.v1.SeedService
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.configuration.LedgerConfiguration
import com.daml.resources.{Resource, ResourceOwner}
import scopt.OptionParser
@ -26,6 +29,9 @@ object SqlLedgerFactory extends LedgerFactory[ReadWriteService, ExtraConfig] {
jdbcUrl = None,
)
override def ledgerConfig(config: Config[ExtraConfig]): LedgerConfiguration =
super.ledgerConfig(config).copy(initialConfigurationSubmitDelay = Duration.ZERO)
override def extraConfigParser(parser: OptionParser[Config[ExtraConfig]]): Unit = {
parser
.opt[String]("jdbc-url")

View File

@ -23,7 +23,7 @@ trait IndexConfigManagementService {
def lookupConfiguration(): Future[Option[(LedgerOffset.Absolute, Configuration)]]
/** Retrieve configuration entries. */
def configurationEntries(
startExclusive: Option[LedgerOffset.Absolute]): Source[ConfigurationEntry, NotUsed]
def configurationEntries(startExclusive: Option[LedgerOffset.Absolute])
: Source[(LedgerOffset.Absolute, ConfigurationEntry), NotUsed]
}

View File

@ -3,8 +3,6 @@
package com.daml.ledger.participant.state.kvutils.app
import java.time.Duration
import akka.stream.Materializer
import com.codahale.metrics.{MetricRegistry, SharedMetricRegistries}
import com.daml.ledger.api.auth.{AuthService, AuthServiceWildcard}
@ -17,8 +15,7 @@ import com.daml.platform.apiserver.{ApiServerConfig, TimeServiceBackend}
import com.daml.platform.configuration.{
CommandConfiguration,
LedgerConfiguration,
PartyConfiguration,
SubmissionConfiguration
PartyConfiguration
}
import com.daml.platform.indexer.{IndexerConfig, IndexerStartupMode}
import com.daml.resources.ResourceOwner
@ -73,13 +70,8 @@ trait ConfigProvider[ExtraConfig] {
def partyConfig(config: Config[ExtraConfig]): PartyConfiguration =
PartyConfiguration.default
def submissionConfig(config: Config[ExtraConfig]): SubmissionConfiguration =
SubmissionConfiguration.default
def ledgerConfig(config: Config[ExtraConfig]): LedgerConfiguration =
LedgerConfiguration.default.copy(
initialConfigurationSubmitDelay = Duration.ofSeconds(5)
)
LedgerConfiguration.defaultRemote
def timeServiceBackend(config: Config[ExtraConfig]): Option[TimeServiceBackend] = None

View File

@ -81,7 +81,6 @@ final class Runner[T <: ReadWriteService, Extra](
config = factory.apiServerConfig(participantConfig, config),
commandConfig = factory.commandConfig(participantConfig, config),
partyConfig = factory.partyConfig(config),
submissionConfig = factory.submissionConfig(config),
ledgerConfig = factory.ledgerConfig(config),
readService = readService,
writeService = writeService,

View File

@ -33,8 +33,7 @@ import com.daml.platform.apiserver.services._
import com.daml.platform.configuration.{
CommandConfiguration,
LedgerConfiguration,
PartyConfiguration,
SubmissionConfiguration
PartyConfiguration
}
import com.daml.platform.server.api.services.grpc.GrpcHealthService
import com.daml.platform.services.time.TimeProviderType
@ -74,7 +73,6 @@ object ApiServices {
ledgerConfiguration: LedgerConfiguration,
commandConfig: CommandConfiguration,
partyConfig: PartyConfiguration,
submissionConfig: SubmissionConfiguration,
optTimeServiceBackend: Option[TimeServiceBackend],
metrics: Metrics,
healthChecks: HealthChecks,
@ -97,17 +95,28 @@ object ApiServices {
override def acquire()(implicit executionContext: ExecutionContext): Resource[ApiServices] =
Resource(
indexService
.getLedgerId()
.map(ledgerId => createServices(ledgerId)(mat.system.dispatcher)))(services =>
Future {
services.foreach {
case closeable: AutoCloseable => closeable.close()
case _ => ()
for {
ledgerId <- identityService.getLedgerId()
ledgerConfigProvider = LedgerConfigProvider.create(
configManagementService,
writeService,
timeProvider,
ledgerConfiguration)
services = createServices(ledgerId, ledgerConfigProvider)(mat.system.dispatcher)
_ <- ledgerConfigProvider.ready
} yield (ledgerConfigProvider, services)
) {
case (ledgerConfigProvider, services) =>
Future {
services.foreach {
case closeable: AutoCloseable => closeable.close()
case _ => ()
}
ledgerConfigProvider.close()
}
}).map(ApiServicesBundle(_))
}.map(x => ApiServicesBundle(x._2))
private def createServices(ledgerId: LedgerId)(
private def createServices(ledgerId: LedgerId, ledgerConfigProvider: LedgerConfigProvider)(
implicit executionContext: ExecutionContext): List[BindableService] = {
val commandExecutor = new TimedCommandExecutor(
new LedgerTimeAwareCommandExecutor(
@ -130,13 +139,12 @@ object ApiServices {
writeService,
submissionService,
partyManagementService,
ledgerConfiguration.initialConfiguration.timeModel,
timeProvider,
timeProviderType,
ledgerConfigProvider,
seedService,
commandExecutor,
ApiSubmissionService.Configuration(
submissionConfig.maxDeduplicationTime,
partyConfig.implicitPartyAllocation,
),
metrics,
@ -168,7 +176,6 @@ object ApiServices {
commandConfig.maxCommandsInFlight,
commandConfig.limitMaxCommandsInFlight,
commandConfig.retentionPeriod,
submissionConfig.maxDeduplicationTime,
),
// Using local services skips the gRPC layer, improving performance.
ApiCommandService.LocalServices(
@ -179,6 +186,7 @@ object ApiServices {
apiTransactionService.getFlatTransactionById
),
timeProvider,
ledgerConfigProvider,
)
val apiActiveContractsService =

View File

@ -26,7 +26,6 @@ import com.daml.platform.configuration.{
LedgerConfiguration,
PartyConfiguration,
ServerRole,
SubmissionConfiguration
}
import com.daml.platform.index.JdbcIndex
import com.daml.platform.packages.InMemoryPackageStore
@ -45,7 +44,6 @@ final class StandaloneApiServer(
config: ApiServerConfig,
commandConfig: CommandConfiguration,
partyConfig: PartyConfiguration,
submissionConfig: SubmissionConfiguration,
ledgerConfig: LedgerConfiguration,
readService: ReadService,
writeService: WriteService,
@ -91,10 +89,6 @@ final class StandaloneApiServer(
"read" -> readService,
"write" -> writeService,
)
ledgerConfiguration = ledgerConfig.copy(
// TODO: Remove the initial ledger config from readService.getLedgerInitialConditions()
initialConfiguration = initialConditions.config,
)
executionSequencerFactory <- new ExecutionSequencerFactoryOwner()
apiServicesOwner = new ApiServices.Owner(
participantId = participantId,
@ -106,10 +100,9 @@ final class StandaloneApiServer(
timeProviderType =
timeServiceBackend.fold[TimeProviderType](TimeProviderType.WallClock)(_ =>
TimeProviderType.Static),
ledgerConfiguration = ledgerConfiguration,
ledgerConfiguration = ledgerConfig,
commandConfig = commandConfig,
partyConfig = partyConfig,
submissionConfig = submissionConfig,
optTimeServiceBackend = timeServiceBackend,
metrics = metrics,
healthChecks = healthChecks,

View File

@ -9,7 +9,14 @@ import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.{ApplicationId, CommandId, LedgerId, LedgerOffset, TransactionId}
import com.daml.ledger.api.domain.{
ApplicationId,
CommandId,
ConfigurationEntry,
LedgerId,
LedgerOffset,
TransactionId
}
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
@ -158,7 +165,7 @@ final class TimedIndexService(delegate: IndexService, metrics: Metrics) extends
override def configurationEntries(
startExclusive: Option[LedgerOffset.Absolute]
): Source[domain.ConfigurationEntry, NotUsed] =
): Source[(LedgerOffset.Absolute, ConfigurationEntry), NotUsed] =
Timed.source(
metrics.daml.services.indexService.configurationEntries,
delegate.configurationEntries(startExclusive))

View File

@ -26,6 +26,7 @@ import com.daml.ledger.api.v1.transaction_service.{
GetTransactionResponse
}
import com.daml.ledger.client.services.commands.{CommandCompletionSource, CommandTrackerFlow}
import com.daml.ledger.participant.state.v1.{Configuration => LedgerConfiguration}
import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.api.grpc.GrpcApiService
@ -33,6 +34,7 @@ import com.daml.platform.apiserver.services.ApiCommandService._
import com.daml.platform.apiserver.services.tracking.{TrackerImpl, TrackerMap}
import com.daml.platform.server.api.ApiException
import com.daml.platform.server.api.services.grpc.GrpcCommandService
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.util.Ctx
import com.daml.util.akkastreams.MaxInFlight
import com.google.protobuf.empty.Empty
@ -46,6 +48,7 @@ import scala.util.Try
final class ApiCommandService private (
services: LocalServices,
configuration: ApiCommandService.Configuration,
ledgerConfigProvider: LedgerConfigProvider,
)(
implicit grpcExecutionContext: ExecutionContext,
actorMaterializer: Materializer,
@ -76,7 +79,9 @@ final class ApiCommandService private (
logging.commandId(request.getCommands.commandId),
logging.party(request.getCommands.party)) { implicit logCtx =>
if (running) {
track(request)
ledgerConfigProvider.latestConfiguration.fold[Future[Completion]](
Future.failed(ErrorFactories.missingLedgerConfig()))(ledgerConfig =>
track(request, ledgerConfig))
} else {
Future.failed(
new ApiException(Status.UNAVAILABLE.withDescription("Service has been shut down.")))
@ -84,7 +89,9 @@ final class ApiCommandService private (
}
@SuppressWarnings(Array("org.wartremover.warts.Any"))
private def track(request: SubmitAndWaitRequest): Future[Completion] = {
private def track(
request: SubmitAndWaitRequest,
ledgerConfig: LedgerConfiguration): Future[Completion] = {
val appId = request.getCommands.applicationId
val submitter = TrackerMap.Key(application = appId, party = request.getCommands.party)
submissionTracker.track(submitter, request) {
@ -103,7 +110,7 @@ final class ApiCommandService private (
Some(offset)))
.mapConcat(CommandCompletionSource.toStreamElements),
ledgerEnd,
() => configuration.maxDeduplicationTime
() => ledgerConfig.maxDeduplicationTime
)
val trackingFlow =
if (configuration.limitMaxCommandsInFlight)
@ -157,6 +164,7 @@ object ApiCommandService {
configuration: Configuration,
services: LocalServices,
timeProvider: TimeProvider,
ledgerConfigProvider: LedgerConfigProvider,
)(
implicit grpcExecutionContext: ExecutionContext,
actorMaterializer: Materializer,
@ -164,11 +172,12 @@ object ApiCommandService {
logCtx: LoggingContext
): CommandServiceGrpc.CommandService with GrpcApiService =
new GrpcCommandService(
new ApiCommandService(services, configuration),
new ApiCommandService(services, configuration, ledgerConfigProvider),
ledgerId = configuration.ledgerId,
currentLedgerTime = () => timeProvider.getCurrentTime,
currentUtcTime = () => Instant.now,
maxDeduplicationTime = () => configuration.maxDeduplicationTime,
maxDeduplicationTime =
() => ledgerConfigProvider.latestConfiguration.map(_.maxDeduplicationTime),
)
final case class Configuration(
@ -177,8 +186,6 @@ object ApiCommandService {
maxCommandsInFlight: Int,
limitMaxCommandsInFlight: Boolean,
retentionPeriod: FiniteDuration,
// TODO(RA): this should be updated dynamically from the ledger configuration
maxDeduplicationTime: java.time.Duration,
)
final case class LocalServices(

View File

@ -19,7 +19,12 @@ import com.daml.ledger.participant.state.v1.SubmissionResult.{
NotSupported,
Overloaded
}
import com.daml.ledger.participant.state.v1.{SeedService, SubmissionResult, TimeModel, WriteService}
import com.daml.ledger.participant.state.v1.{
Configuration,
SeedService,
SubmissionResult,
WriteService
}
import com.daml.lf.crypto
import com.daml.lf.data.Ref.Party
import com.daml.lf.engine.{Error => LfError}
@ -53,9 +58,9 @@ object ApiSubmissionService {
writeService: WriteService,
submissionService: IndexSubmissionService,
partyManagementService: IndexPartyManagementService,
timeModel: TimeModel,
timeProvider: TimeProvider,
timeProviderType: TimeProviderType,
ledgerConfigProvider: LedgerConfigProvider,
seedService: Option[SeedService],
commandExecutor: CommandExecutor,
configuration: ApiSubmissionService.Configuration,
@ -71,9 +76,9 @@ object ApiSubmissionService {
writeService,
submissionService,
partyManagementService,
timeModel,
timeProvider,
timeProviderType,
ledgerConfigProvider,
seedService,
commandExecutor,
configuration,
@ -82,7 +87,8 @@ object ApiSubmissionService {
ledgerId = ledgerId,
currentLedgerTime = () => timeProvider.getCurrentTime,
currentUtcTime = () => Instant.now,
maxDeduplicationTime = () => configuration.maxDeduplicationTime,
maxDeduplicationTime =
() => ledgerConfigProvider.latestConfiguration.map(_.maxDeduplicationTime),
metrics = metrics,
)
@ -91,8 +97,6 @@ object ApiSubmissionService {
}
final case class Configuration(
// TODO(RA): this should be updated dynamically from the ledger configuration
maxDeduplicationTime: Duration,
implicitPartyAllocation: Boolean,
)
@ -103,9 +107,9 @@ final class ApiSubmissionService private (
writeService: WriteService,
submissionService: IndexSubmissionService,
partyManagementService: IndexPartyManagementService,
timeModel: TimeModel,
timeProvider: TimeProvider,
timeProviderType: TimeProviderType,
ledgerConfigProvider: LedgerConfigProvider,
seedService: Option[SeedService],
commandExecutor: CommandExecutor,
configuration: ApiSubmissionService.Configuration,
@ -117,8 +121,10 @@ final class ApiSubmissionService private (
private val logger = ContextualizedLogger.get(this.getClass)
private def deduplicateAndRecordOnLedger(seed: Option[crypto.Hash], commands: ApiCommands)(
implicit logCtx: LoggingContext): Future[Unit] = {
private def deduplicateAndRecordOnLedger(
seed: Option[crypto.Hash],
commands: ApiCommands,
ledgerConfig: Configuration)(implicit logCtx: LoggingContext): Future[Unit] = {
val submittedAt = commands.submittedAt
val deduplicateUntil = commands.deduplicateUntil
@ -126,7 +132,7 @@ final class ApiSubmissionService private (
.deduplicateCommand(commands.commandId, commands.submitter, submittedAt, deduplicateUntil)
.flatMap {
case CommandDeduplicationNew =>
recordOnLedger(seed, commands)
recordOnLedger(seed, commands, ledgerConfig)
.transform(mapSubmissionResult)
.recoverWith {
case error =>
@ -151,8 +157,12 @@ final class ApiSubmissionService private (
logger.trace(s"Received composite commands: $commands")
logger.debug(s"Received composite command let ${commands.commands.ledgerEffectiveTime}.")
deduplicateAndRecordOnLedger(seedService.map(_.nextSeed()), commands)
.andThen(logger.logErrorsOnCall[Unit])(DirectExecutionContext)
ledgerConfigProvider.latestConfiguration.fold[Future[Unit]](
Future.failed(ErrorFactories.missingLedgerConfig())
)(
ledgerConfig =>
deduplicateAndRecordOnLedger(seedService.map(_.nextSeed()), commands, ledgerConfig)
.andThen(logger.logErrorsOnCall[Unit])(DirectExecutionContext))
}
private def mapSubmissionResult(result: Try[SubmissionResult])(
@ -181,6 +191,7 @@ final class ApiSubmissionService private (
private def recordOnLedger(
submissionSeed: Option[crypto.Hash],
commands: ApiCommands,
ledgerConfig: Configuration,
)(implicit logCtx: LoggingContext): Future[SubmissionResult] =
for {
res <- commandExecutor.execute(commands, submissionSeed)
@ -189,7 +200,7 @@ final class ApiSubmissionService private (
Future.failed(grpcError(toStatus(error)))
}, Future.successful)
partyAllocationResults <- allocateMissingInformees(transactionInfo.transaction)
submissionResult <- submitTransaction(transactionInfo, partyAllocationResults)
submissionResult <- submitTransaction(transactionInfo, partyAllocationResults, ledgerConfig)
} yield submissionResult
private def allocateMissingInformees(
@ -222,6 +233,7 @@ final class ApiSubmissionService private (
private def submitTransaction(
transactionInfo: CommandExecutionResult,
partyAllocationResults: Seq[SubmissionResult],
ledgerConfig: Configuration,
): Future[SubmissionResult] =
partyAllocationResults.find(_ != SubmissionResult.Acknowledged) match {
case Some(result) =>
@ -233,7 +245,7 @@ final class ApiSubmissionService private (
// If the ledger time of the transaction is far in the future (farther than the expected latency),
// the submission to the WriteService is delayed.
val submitAt = transactionInfo.transactionMeta.ledgerEffectiveTime.toInstant
.minus(timeModel.avgTransactionLatency)
.minus(ledgerConfig.timeModel.avgTransactionLatency)
val submissionDelay = Duration.between(timeProvider.getCurrentTime, submitAt)
if (submissionDelay.isNegative)
submitTransaction(transactionInfo)

View File

@ -0,0 +1,177 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver.services
import java.util.UUID
import java.util.concurrent.atomic.AtomicReference
import akka.{Done, NotUsed}
import akka.stream.{KillSwitches, Materializer, UniqueKillSwitch}
import akka.stream.scaladsl.{Keep, RestartSource, Sink}
import com.daml.api.util.TimeProvider
import com.daml.dec.{DirectExecutionContext => DE}
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.LedgerOffset
import com.daml.ledger.participant.state.index.v2.IndexConfigManagementService
import com.daml.ledger.participant.state.v1.{
Configuration,
SubmissionId,
SubmissionResult,
WriteService
}
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.configuration.LedgerConfiguration
import scala.compat.java8.FutureConverters
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration.{DurationInt, DurationLong}
/**
* Subscribes to ledger configuration updates coming from the index,
* and makes the latest ledger configuration available to consumers.
*
* This class helps avoiding code duplication and limiting the number of
* database lookups, as multiple services and validators require the latest ledger config.
*/
final class LedgerConfigProvider private (
index: IndexConfigManagementService,
writeService: WriteService,
timeProvider: TimeProvider,
config: LedgerConfiguration,
materializer: Materializer,
)(implicit logCtx: LoggingContext)
extends AutoCloseable {
private[this] val logger = ContextualizedLogger.get(this.getClass)
// The latest offset that was read (if any), and the latest ledger configuration found (if any)
private[this] type StateType = (Option[LedgerOffset.Absolute], Option[Configuration])
private[this] val state: AtomicReference[StateType] = new AtomicReference(None -> None)
private[this] val killSwitch: AtomicReference[Option[UniqueKillSwitch]] = new AtomicReference(
None)
private[this] val readyPromise: Promise[Unit] = Promise()
// At startup, do the following:
// - Start loading the ledger configuration
// - Mark the provider as ready if no configuration was found after a timeout
// - Submit the initial config if none is found after a delay
startLoading()
materializer.scheduleOnce(config.configurationLoadTimeout.toNanos.nanos, () => {
readyPromise.trySuccess(())
()
})
materializer.scheduleOnce(config.initialConfigurationSubmitDelay.toNanos.nanos, () => {
if (latestConfiguration.isEmpty) submitInitialConfig()
()
})
// Looks up the latest ledger configuration, then subscribes to a
// stream of configuration changes.
// If the source of configuration changes proves to be a performance bottleneck,
// it could be replaced by regular polling.
private[this] def startLoading(): Future[Unit] =
index
.lookupConfiguration()
.map {
case Some(result) =>
logger.info(
s"Initial ledger configuration lookup found configuration ${result._2} at ${result._1}")
configFound(result._1, result._2)
case None =>
logger.info(s"Initial ledger configuration lookup did not find any configuration")
state.set(None -> None)
}(DE)
.map(_ => startStreamingUpdates())(DE)
private[this] def configFound(offset: LedgerOffset.Absolute, config: Configuration): Unit = {
state.set(Some(offset) -> Some(config))
readyPromise.trySuccess(())
()
}
private[this] def startStreamingUpdates(): Unit = {
killSwitch.set(
Some(
RestartSource
.withBackoff(
minBackoff = 1.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.1,
) { () =>
index
.configurationEntries(state.get._1)
.map {
case (offset, domain.ConfigurationEntry.Accepted(_, _, config)) =>
logger.info(s"New ledger configuration $config found at $offset")
configFound(offset, config)
case (offset, domain.ConfigurationEntry.Rejected(_, _, _, _)) =>
logger.trace(s"New ledger configuration rejection found at $offset")
state.updateAndGet(previous => Some(offset) -> previous._2)
()
}
}
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
.toMat(Sink.ignore)(Keep.left[UniqueKillSwitch, Future[Done]])
.run()(materializer)))
()
}
private[this] def submitInitialConfig(): Future[Unit] = {
implicit val executionContext: ExecutionContext = DE
// There are several reasons why the change could be rejected:
// - The participant is not authorized to set the configuration
// - There already is a configuration, it just didn't appear in the index yet
// This method therefore does not try to re-submit the initial configuration in case of failure.
val submissionId = SubmissionId.assertFromString(UUID.randomUUID.toString)
logger.info(s"No ledger configuration found, submitting an initial configuration $submissionId")
FutureConverters
.toScala(
writeService.submitConfiguration(
Timestamp.assertFromInstant(timeProvider.getCurrentTime.plusSeconds(60)),
submissionId,
config.initialConfiguration
))
.map {
case SubmissionResult.Acknowledged =>
logger.info(s"Initial configuration submission $submissionId was successful")
()
case SubmissionResult.NotSupported =>
logger.info("Setting an initial ledger configuration is not supported")
()
case result =>
logger.warn(
s"Initial configuration submission $submissionId failed. Reason: ${result.description}")
()
}(DE)
}
/** The latest configuration found so far.
* This may not be the currently active ledger configuration, e.g., if the index is lagging behind the ledger.
*/
def latestConfiguration: Option[Configuration] = state.get._2
/** Completes:
* - when some ledger configuration was found
* - after [[com.daml.platform.configuration.LedgerConfiguration.configurationLoadTimeout]]
* , whichever happens first.
*/
def ready: Future[Unit] = readyPromise.future
override def close(): Unit = {
killSwitch.get.foreach(k => k.shutdown())
}
}
object LedgerConfigProvider {
def create(
index: IndexConfigManagementService,
writeService: WriteService,
timeProvider: TimeProvider,
config: LedgerConfiguration)(
implicit materializer: Materializer,
logCtx: LoggingContext): LedgerConfigProvider =
new LedgerConfigProvider(index, writeService, timeProvider, config, materializer)
}

View File

@ -3,8 +3,6 @@
package com.daml.platform.apiserver.services.admin
import java.util.UUID
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import com.daml.ledger.participant.state.index.v2.IndexConfigManagementService
@ -22,13 +20,11 @@ import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.LedgerOffset
import com.daml.ledger.api.v1.admin.config_management_service.ConfigManagementServiceGrpc.ConfigManagementService
import com.daml.ledger.api.v1.admin.config_management_service._
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.configuration.LedgerConfiguration
import com.daml.platform.server.api.validation
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.timer.Delayed
import io.grpc.{ServerServiceDefinition, StatusRuntimeException}
import scala.compat.java8.FutureConverters
@ -50,10 +46,6 @@ final class ApiConfigManagementService private (
private val defaultConfigResponse = configToResponse(ledgerConfiguration.initialConfiguration)
// After a short delay, check if there exists a ledger configuration,
// and submit an initial configuration if no ledger configuration exists.
submitInitialConfig()
override def close(): Unit = ()
override def bindService(): ServerServiceDefinition =
@ -65,46 +57,6 @@ final class ApiConfigManagementService private (
.map(_.fold(defaultConfigResponse) { case (_, conf) => configToResponse(conf) })(DE)
.andThen(logger.logErrorsOnCall[GetTimeModelResponse])(DE)
private def submitInitialConfig() = {
implicit val executionContext: ExecutionContext = DE
// There are several reasons why the change could be rejected:
// - The participant is not authorized to set the configuration
// - There already is a configuration, it just didn't appear in the index yet
// This method therefore does not try to re-submit the initial configuration in case of failure.
Delayed.Future.by(
Duration.fromNanos(ledgerConfiguration.initialConfigurationSubmitDelay.toNanos))(
for {
optConfig <- index.lookupConfiguration()
_ <- if (optConfig.isDefined)
Future.successful(())
else {
val submissionId = SubmissionId.assertFromString(UUID.randomUUID.toString)
logger.info(
s"No ledger configuration found, submitting an initial configuration $submissionId")
FutureConverters
.toScala(
writeService.submitConfiguration(
Timestamp.assertFromInstant(timeProvider.getCurrentTime.plusSeconds(60)),
submissionId,
ledgerConfiguration.initialConfiguration
))
.map {
case SubmissionResult.Acknowledged =>
logger.info(s"Initial configuration submission $submissionId was successful")
()
case SubmissionResult.NotSupported =>
logger.info("Setting an initial ledger configuration is not supported")
()
case result =>
logger.warn(
s"Initial configuration submission $submissionId failed. Reason: ${result.description}")
()
}(DE)
}
} yield ()
)
}
private def configToResponse(config: Configuration): GetTimeModelResponse = {
val tm = config.timeModel
GetTimeModelResponse(
@ -224,8 +176,8 @@ final class ApiConfigManagementService private (
index
.configurationEntries(offset)
.collect {
case entry @ domain.ConfigurationEntry.Accepted(`submissionId`, _, _) => entry
case entry @ domain.ConfigurationEntry.Rejected(`submissionId`, _, _, _) => entry
case (_, entry @ domain.ConfigurationEntry.Accepted(`submissionId`, _, _)) => entry
case (_, entry @ domain.ConfigurationEntry.Rejected(`submissionId`, _, _, _)) => entry
}
.completionTimeout(timeToLive)
.runWith(Sink.head)(materializer)

View File

@ -19,15 +19,57 @@ case class LedgerConfiguration(
* The delay until the ledger API server tries to submit an initial configuration at startup if none exists.
*/
initialConfigurationSubmitDelay: Duration,
/**
* The amount of time the ledger API server will wait to load a ledger configuration.
*
* The ledger API server waits at startup until it reads at least one configuration changed update from the ledger.
* If none is found within this timeout, the ledger API server will start anyway, but services that depend
* on the ledger configuration (e.g., SubmissionService or CommandService) will reject all requests with the
* UNAVAILABLE error.
*/
configurationLoadTimeout: Duration,
)
object LedgerConfiguration {
val default: LedgerConfiguration = LedgerConfiguration(
/** Default configuration for a ledger-backed index,
* i.e., if there is zero delay between the ledger and the index.
* Example: Sandbox classic.
*/
val defaultLedgerBackedIndex: LedgerConfiguration = LedgerConfiguration(
initialConfiguration = Configuration(
generation = 0,
generation = 1,
timeModel = TimeModel.reasonableDefault,
maxDeduplicationTime = Duration.ofDays(1)
),
initialConfigurationSubmitDelay = Duration.ZERO
initialConfigurationSubmitDelay = Duration.ZERO,
configurationLoadTimeout = Duration.ofSeconds(1),
)
/** Default configuration for a local single-participant ledger,
* i.e., if there is minimal delay between the ledger and the index.
* Example: Sandbox next.
*/
val defaultLocalLedger: LedgerConfiguration = LedgerConfiguration(
initialConfiguration = Configuration(
generation = 1,
timeModel = TimeModel.reasonableDefault,
maxDeduplicationTime = Duration.ofDays(1)
),
initialConfigurationSubmitDelay = Duration.ofMillis(500),
configurationLoadTimeout = Duration.ofSeconds(5),
)
/** Default configuration for a participant connecting to a remote ledger,
* i.e., if there may be significant delay between the ledger and the index.
*/
val defaultRemote: LedgerConfiguration = LedgerConfiguration(
initialConfiguration = Configuration(
generation = 1,
timeModel = TimeModel.reasonableDefault,
maxDeduplicationTime = Duration.ofDays(1)
),
initialConfigurationSubmitDelay = Duration.ofSeconds(5),
configurationLoadTimeout = Duration.ofSeconds(10),
)
}

View File

@ -1,17 +0,0 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.configuration
import java.time.Duration
final case class SubmissionConfiguration(
maxDeduplicationTime: Duration,
)
object SubmissionConfiguration {
lazy val default: SubmissionConfiguration =
SubmissionConfiguration(
maxDeduplicationTime = Duration.ofDays(1),
)
}

View File

@ -235,14 +235,16 @@ abstract class LedgerBackedIndexService(
.map(_.map { case (offset, config) => (toAbsolute(offset), config) })(DEC)
/** Retrieve configuration entries. */
override def configurationEntries(
startExclusive: Option[LedgerOffset.Absolute]): Source[domain.ConfigurationEntry, NotUsed] =
override def configurationEntries(startExclusive: Option[LedgerOffset.Absolute])
: Source[(domain.LedgerOffset.Absolute, domain.ConfigurationEntry), NotUsed] =
Source
.future(
startExclusive
.map(off => Future.fromTry(ApiOffset.fromString(off.value).map(Some(_))))
.getOrElse(Future.successful(None)))
.flatMapConcat(ledger.configurationEntries(_).map(_._2.toDomain))
.flatMapConcat(ledger.configurationEntries(_).map {
case (offset, config) => toAbsolute(offset) -> config.toDomain
})
/** Deduplicate commands */
override def deduplicateCommand(

View File

@ -5,7 +5,7 @@ package com.daml.platform.sandbox
import java.io.File
import java.nio.file.Files
import java.time.{Duration, Instant}
import java.time.Instant
import akka.actor.ActorSystem
import akka.stream.Materializer
@ -25,7 +25,7 @@ import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.apiserver._
import com.daml.platform.configuration.{LedgerConfiguration, PartyConfiguration}
import com.daml.platform.configuration.PartyConfiguration
import com.daml.platform.packages.InMemoryPackageStore
import com.daml.platform.sandbox.SandboxServer._
import com.daml.platform.sandbox.banner.Banner
@ -265,11 +265,7 @@ final class SandboxServer(
() => resetAndRestartServer(),
authorizer,
)
ledgerConfiguration = LedgerConfiguration(
initialConfiguration = defaultConfiguration,
// In SandboxServer, there is no delay between indexer and ledger
initialConfigurationSubmitDelay = Duration.ZERO,
)
ledgerConfiguration = config.ledgerConfig
executionSequencerFactory <- new ExecutionSequencerFactoryOwner().acquire()
apiServicesOwner = new ApiServices.Owner(
participantId = participantId,
@ -285,7 +281,6 @@ final class SandboxServer(
// sandbox-classic always allocates party implicitly
implicitPartyAllocation = true,
),
submissionConfig = config.submissionConfig,
optTimeServiceBackend = timeServiceBackendO,
metrics = metrics,
healthChecks = healthChecks,

View File

@ -12,12 +12,7 @@ import com.daml.ledger.api.auth.AuthService
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.ledger.participant.state.v1.SeedService.Seeding
import com.daml.platform.common.LedgerIdMode
import com.daml.platform.configuration.{
CommandConfiguration,
LedgerConfiguration,
MetricsReporter,
SubmissionConfiguration
}
import com.daml.platform.configuration.{CommandConfiguration, LedgerConfiguration, MetricsReporter}
import com.daml.platform.services.time.TimeProviderType
import com.daml.ports.Port
@ -31,7 +26,6 @@ final case class SandboxConfig(
damlPackages: List[File],
timeProviderType: Option[TimeProviderType],
commandConfig: CommandConfiguration,
submissionConfig: SubmissionConfiguration,
ledgerConfig: LedgerConfiguration,
tlsConfig: Option[TlsConfiguration],
scenario: Option[String],
@ -65,8 +59,7 @@ object SandboxConfig {
damlPackages = Nil,
timeProviderType = None,
commandConfig = CommandConfiguration.default,
submissionConfig = SubmissionConfiguration.default,
ledgerConfig = LedgerConfiguration.default,
ledgerConfig = LedgerConfiguration.defaultLocalLedger,
tlsConfig = None,
scenario = None,
implicitPartyAllocation = true,
@ -85,5 +78,6 @@ object SandboxConfig {
lazy val default: SandboxConfig =
nextDefault.copy(
seeding = None,
ledgerConfig = LedgerConfiguration.defaultLedgerBackedIndex,
)
}

View File

@ -74,7 +74,6 @@ object SandboxIndexAndWriteService {
acs = acs,
packages = templateStore,
initialLedgerEntries = ledgerEntries,
initialConfig = initialConfig,
queueDepth = queueDepth,
startMode = startMode,
eventsPageSize = eventsPageSize,
@ -101,7 +100,6 @@ object SandboxIndexAndWriteService {
acs,
templateStore,
ledgerEntries,
intialConfig,
)
owner(MeteredLedger(ledger, metrics), participantId, intialConfig, timeProvider)
}

View File

@ -4,7 +4,6 @@
package com.daml.platform.sandbox.stores.ledger.inmemory
import java.time.Instant
import java.util.UUID
import java.util.concurrent.atomic.AtomicReference
import akka.NotUsed
@ -93,20 +92,12 @@ class InMemoryLedger(
acs0: InMemoryActiveLedgerState,
packageStoreInit: InMemoryPackageStore,
ledgerEntries: ImmArray[LedgerEntryOrBump],
initialConfig: Configuration,
) extends Ledger {
private val logger = LoggerFactory.getLogger(this.getClass)
private val entries = {
val l = new LedgerEntries[InMemoryEntry](_.toString)
l.publish(
InMemoryConfigEntry(
ConfigurationEntry.Accepted(
submissionId = UUID.randomUUID.toString,
participantId = participantId,
configuration = initialConfig,
)))
ledgerEntries.foreach {
case LedgerEntryOrBump.Bump(increment) =>
l.incrementOffset(increment)
@ -185,7 +176,7 @@ class InMemoryLedger(
// mutable state
private var acs = acs0
private var ledgerConfiguration: Option[Configuration] = Some(initialConfig)
private var ledgerConfiguration: Option[Configuration] = None
private val commands: scala.collection.mutable.Map[String, CommandDeduplicationEntry] =
scala.collection.mutable.Map.empty
@ -292,6 +283,16 @@ class InMemoryLedger(
}
)
// Validates the given ledger time according to the ledger time model
private def checkTimeModel(ledgerTime: Instant, recordTime: Instant): Either[String, Unit] = {
ledgerConfiguration
.fold[Either[String, Unit]](
Left("No ledger configuration available, cannot validate ledger time")
)(
config => config.timeModel.checkTime(ledgerTime, recordTime)
)
}
private def handleSuccessfulTx(
transactionId: LedgerString,
submitterInfo: SubmitterInfo,
@ -299,9 +300,7 @@ class InMemoryLedger(
transaction: SubmittedTransaction): Unit = {
val ledgerTime = transactionMeta.ledgerEffectiveTime.toInstant
val recordTime = timeProvider.getCurrentTime
val timeModel = ledgerConfiguration.get.timeModel
timeModel
.checkTime(ledgerTime, recordTime)
checkTimeModel(ledgerTime, recordTime)
.fold(
reason => handleError(submitterInfo, RejectionReason.InvalidLedgerTime(reason)),
_ => {

View File

@ -4,7 +4,6 @@
package com.daml.platform.sandbox.stores.ledger.sql
import java.time.Instant
import java.util.UUID
import java.util.concurrent.atomic.AtomicReference
import akka.Done
@ -56,7 +55,6 @@ object SqlLedger {
acs: InMemoryActiveLedgerState,
packages: InMemoryPackageStore,
initialLedgerEntries: ImmArray[LedgerEntryOrBump],
initialConfig: Configuration,
queueDepth: Int,
startMode: SqlStartMode = SqlStartMode.ContinueIfExists,
eventsPageSize: Int,
@ -75,7 +73,6 @@ object SqlLedger {
acs,
packages,
initialLedgerEntries,
initialConfig,
queueDepth,
))
} yield ledger
@ -157,8 +154,9 @@ private final class SqlLedger(
new AtomicReference[Option[Configuration]](configAtInitialization)
// Validates the given ledger time according to the ledger time model
private def checkTimeModel(ledgerTime: Instant): Either[RejectionReason, Unit] = {
val recordTime = timeProvider.getCurrentTime
private def checkTimeModel(
ledgerTime: Instant,
recordTime: Instant): Either[RejectionReason, Unit] = {
currentConfiguration
.get()
.fold[Either[RejectionReason, Unit]](
@ -180,7 +178,7 @@ private final class SqlLedger(
val ledgerTime = transactionMeta.ledgerEffectiveTime.toInstant
val recordTime = timeProvider.getCurrentTime
checkTimeModel(ledgerTime)
checkTimeModel(ledgerTime, recordTime)
.fold(
reason =>
ledgerDao.storeRejection(
@ -354,7 +352,6 @@ private final class SqlLedgerFactory(ledgerDao: LedgerDao)(implicit logCtx: Logg
acs: InMemoryActiveLedgerState,
packages: InMemoryPackageStore,
initialLedgerEntries: ImmArray[LedgerEntryOrBump],
initialConfig: Configuration,
queueDepth: Int,
)(implicit mat: Materializer): Future[SqlLedger] = {
implicit val ec: ExecutionContext = DEC
@ -370,7 +367,7 @@ private final class SqlLedgerFactory(ledgerDao: LedgerDao)(implicit logCtx: Logg
acs,
packages,
initialLedgerEntries,
initialConfig)
)
} yield ledgerId
case SqlStartMode.ContinueIfExists =>
initialize(
@ -380,7 +377,7 @@ private final class SqlLedgerFactory(ledgerDao: LedgerDao)(implicit logCtx: Logg
acs,
packages,
initialLedgerEntries,
initialConfig)
)
}
for {
@ -410,7 +407,6 @@ private final class SqlLedgerFactory(ledgerDao: LedgerDao)(implicit logCtx: Logg
acs: InMemoryActiveLedgerState,
packages: InMemoryPackageStore,
initialLedgerEntries: ImmArray[LedgerEntryOrBump],
initialConfig: Configuration,
): Future[LedgerId] = {
// Note that here we only store the ledger entry and we do not update anything else, such as the
// headRef. This is OK since this initialization
@ -444,7 +440,6 @@ private final class SqlLedgerFactory(ledgerDao: LedgerDao)(implicit logCtx: Logg
_ <- ledgerDao.initializeLedger(ledgerId, Offset.begin)
_ <- initializeLedgerEntries(
initialLedgerEntries,
initialConfig,
timeProvider,
packages,
acs,
@ -475,7 +470,6 @@ private final class SqlLedgerFactory(ledgerDao: LedgerDao)(implicit logCtx: Logg
private def initializeLedgerEntries(
initialLedgerEntries: ImmArray[LedgerEntryOrBump],
initialConfig: Configuration,
timeProvider: TimeProvider,
packages: InMemoryPackageStore,
acs: InMemoryActiveLedgerState,
@ -498,14 +492,6 @@ private final class SqlLedgerFactory(ledgerDao: LedgerDao)(implicit logCtx: Logg
for {
_ <- copyPackages(packages, timeProvider.getCurrentTime, SandboxOffset.toOffset(ledgerEnd))
_ <- ledgerDao.storeConfigurationEntry(
offset = SandboxOffset.toOffset(0),
recordedAt = timeProvider.getCurrentTime,
submissionId = UUID.randomUUID.toString,
participantId = participantId,
configuration = initialConfig,
rejectionReason = None,
)
_ <- ledgerDao.storeInitialState(ledgerEntries, SandboxOffset.toOffset(ledgerEnd))
} yield ()
}

View File

@ -201,7 +201,6 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
partyConfig = PartyConfiguration.default.copy(
implicitPartyAllocation = config.implicitPartyAllocation,
),
submissionConfig = config.submissionConfig,
ledgerConfig = config.ledgerConfig,
readService = readService,
writeService = writeService,

View File

@ -5,7 +5,7 @@ package com.daml.platform.sandbox
import akka.stream.Materializer
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.v1.{Configuration, ParticipantId}
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.api.util.TimeProvider
import com.daml.lf.data.ImmArray
import com.daml.ledger.api.domain.LedgerId
@ -30,28 +30,27 @@ object LedgerResource {
ledgerId: LedgerId,
participantId: ParticipantId,
timeProvider: TimeProvider,
initialConfig: Configuration,
acs: InMemoryActiveLedgerState = InMemoryActiveLedgerState.empty,
packages: InMemoryPackageStore = InMemoryPackageStore.empty,
entries: ImmArray[LedgerEntryOrBump] = ImmArray.empty,
)(implicit executionContext: ExecutionContext): Resource[Ledger] =
new OwnedResource(
ResourceOwner.successful(
new InMemoryLedger(
ledgerId,
participantId,
timeProvider,
acs,
packages,
entries,
initialConfig)))
ResourceOwner.forValue(
() =>
new InMemoryLedger(
ledgerId,
participantId,
timeProvider,
acs,
packages,
entries,
)))
def postgres(
testClass: Class[_],
ledgerId: LedgerId,
participantId: ParticipantId,
timeProvider: TimeProvider,
initialConfig: Configuration,
metrics: MetricRegistry,
packages: InMemoryPackageStore = InMemoryPackageStore.empty,
)(
@ -71,7 +70,6 @@ object LedgerResource {
acs = InMemoryActiveLedgerState.empty,
packages = packages,
initialLedgerEntries = ImmArray.empty,
initialConfig = initialConfig,
queueDepth = 128,
startMode = SqlStartMode.AlwaysReset,
eventsPageSize = 100,

View File

@ -28,7 +28,8 @@ sealed trait LedgerConfigurationServiceITBase extends WordSpec with Matchers {
.next()
.getLedgerConfiguration
maxDeduplicationTime shouldEqual toProto(config.submissionConfig.maxDeduplicationTime)
maxDeduplicationTime shouldEqual toProto(
config.ledgerConfig.initialConfiguration.maxDeduplicationTime)
}
}
}

View File

@ -3,9 +3,18 @@
package com.daml.platform.sandbox.stores.ledger
import java.time.{Duration, Instant}
import java.time.{Instant, Duration => JDuration}
import java.util.UUID
import akka.stream.scaladsl.Sink
import com.daml.ledger.participant.state.v1.{
Configuration,
ParticipantId,
SubmissionResult,
SubmitterInfo,
TimeModel,
TransactionMeta
}
import com.daml.api.util.TimeProvider
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.testing.utils.{
@ -53,7 +62,7 @@ class TransactionTimeModelComplianceIT
implicit val executionContext: ExecutionContext = system.dispatcher
fixtureId match {
case BackendType.InMemory =>
LedgerResource.inMemory(ledgerId, participantId, timeProvider, ledgerConfig)
LedgerResource.inMemory(ledgerId, participantId, timeProvider)
case BackendType.Postgres =>
newLoggingContext { implicit logCtx =>
LedgerResource.postgres(
@ -61,7 +70,6 @@ class TransactionTimeModelComplianceIT
ledgerId,
participantId,
timeProvider,
ledgerConfig,
metrics,
)
}
@ -70,6 +78,23 @@ class TransactionTimeModelComplianceIT
private[this] val submissionSeed = Some(crypto.Hash.hashPrivateKey(this.getClass.getName))
private[this] def publishConfig(
ledger: Ledger,
recordTime: Instant,
generation: Long,
minSkew: JDuration,
maxSkew: JDuration) = {
val config = Configuration(
generation = generation,
timeModel = TimeModel(JDuration.ZERO, minSkew, maxSkew).get,
maxDeduplicationTime = JDuration.ofSeconds(10),
)
ledger.publishConfiguration(
Time.Timestamp.assertFromInstant(recordTime.plusSeconds(3600)),
UUID.randomUUID().toString,
config)
}
private[this] def publishTxAt(ledger: Ledger, ledgerTime: Instant, commandId: String) = {
val dummyTransaction: Transaction.AbsTransaction =
GenTransaction(HashMap.empty, ImmArray.empty)
@ -77,7 +102,7 @@ class TransactionTimeModelComplianceIT
val submitterInfo = SubmitterInfo(
submitter = Ref.Party.assertFromString("submitter"),
applicationId = Ref.LedgerString.assertFromString("appId"),
commandId = Ref.LedgerString.assertFromString(commandId),
commandId = Ref.LedgerString.assertFromString(commandId + UUID.randomUUID().toString),
deduplicateUntil = Instant.EPOCH
)
val transactionMeta = TransactionMeta(
@ -120,17 +145,63 @@ class TransactionTimeModelComplianceIT
completion.status.value.code shouldBe ok
"A Ledger" should {
"reject transactions if there is no ledger config" in allFixtures { ledger =>
val ledgerTime = recordTime
for {
r1 <- publishTxAt(ledger, ledgerTime, "lt-valid")
} yield {
expectInvalidLedgerTime(r1)
}
}
"accept transactions with ledger time that is right" in allFixtures { ledger =>
val ledgerTime = recordTime
publishTxAt(ledger, ledgerTime, "lt-valid").flatMap(expectValidTx)
for {
_ <- publishConfig(ledger, recordTime, 1, JDuration.ofSeconds(1), JDuration.ofSeconds(1))
r1 <- publishTxAt(ledger, ledgerTime, "lt-valid")
} yield {
expectValidTx(r1)
}
}
"reject transactions with ledger time that is too low" in allFixtures { ledger =>
val ledgerTime = recordTime.minus(ledgerConfig.timeModel.minSkew).minusSeconds(1)
publishTxAt(ledger, ledgerTime, "lt-low").flatMap(expectInvalidLedgerTime)
val minSkew = JDuration.ofSeconds(1)
val maxSkew = JDuration.ofDays(1)
val ledgerTime = recordTime.minus(minSkew).minusSeconds(1)
for {
_ <- publishConfig(ledger, recordTime, 1, minSkew, maxSkew)
r1 <- publishTxAt(ledger, ledgerTime, "lt-low")
} yield {
expectInvalidLedgerTime(r1)
}
}
"reject transactions with ledger time that is too high" in allFixtures { ledger =>
val ledgerTime = recordTime.plus(ledgerConfig.timeModel.maxSkew).plusSeconds(1)
publishTxAt(ledger, ledgerTime, "lt-high").flatMap(expectInvalidLedgerTime)
val minSkew = JDuration.ofDays(1)
val maxSkew = JDuration.ofSeconds(1)
val ledgerTime = recordTime.plus(maxSkew).plusSeconds(1)
for {
_ <- publishConfig(ledger, recordTime, 1, minSkew, maxSkew)
r1 <- publishTxAt(ledger, ledgerTime, "lt-high")
} yield {
expectInvalidLedgerTime(r1)
}
}
"reject transactions after ledger config changes" in allFixtures { ledger =>
val largeSkew = JDuration.ofDays(1)
val smallSkew = JDuration.ofSeconds(1)
val ledgerTime = recordTime.plus(largeSkew)
for {
_ <- publishConfig(ledger, recordTime, 1, largeSkew, largeSkew)
r1 <- publishTxAt(ledger, ledgerTime, "lt-before")
_ <- publishConfig(ledger, recordTime, 2, smallSkew, smallSkew)
r2 <- publishTxAt(ledger, ledgerTime, "lt-after")
} yield {
expectValidTx(r1)
expectInvalidLedgerTime(r2)
}
}
}
@ -143,7 +214,6 @@ object TransactionTimeModelComplianceIT {
private val ledgerId: LedgerId = LedgerId(Ref.LedgerString.assertFromString("ledgerId"))
private val participantId: ParticipantId = Ref.ParticipantId.assertFromString("participantId")
private val timeProvider = TimeProvider.Constant(recordTime)
private val ledgerConfig = Configuration(0, TimeModel.reasonableDefault, Duration.ofDays(1))
private implicit def toParty(s: String): Ref.Party = Ref.Party.assertFromString(s)

View File

@ -4,9 +4,9 @@
package com.daml.platform.sandbox.stores.ledger.sql
import java.nio.file.Paths
import java.time.{Duration, Instant}
import java.time.Instant
import com.daml.ledger.participant.state.v1.{Configuration, ParticipantId, TimeModel}
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.api.util.TimeProvider
import com.daml.bazeltools.BazelRunfiles.rlocation
import com.daml.lf.archive.DarReader
@ -211,7 +211,6 @@ class SqlLedgerSpec
.withPackages(Instant.EPOCH, None, packages)
.fold(sys.error, identity),
initialLedgerEntries = ImmArray.empty,
initialConfig = Configuration(0, TimeModel.reasonableDefault, Duration.ofDays(1)),
queueDepth = queueDepth,
startMode = SqlStartMode.ContinueIfExists,
eventsPageSize = 100,

View File

@ -79,6 +79,9 @@ object ResourceOwner {
def failed(throwable: Throwable): ResourceOwner[Nothing] =
new FutureResourceOwner(() => Future.failed(throwable))
def forValue[T](acquire: () => T): ResourceOwner[T] =
new FutureResourceOwner(() => Future.successful(acquire()))
def forTry[T](acquire: () => Try[T]): ResourceOwner[T] =
new FutureResourceOwner(() => Future.fromTry(acquire()))