mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 09:17:43 +03:00
Delay submissions of transactions with future time (#5556)
* Add time provider type to ledger API server Static time sometimes needs special treatment * Delay submissions of transactions with future time Fixes #5480. CHANGELOG_BEGIN - [Sandbox] The sandbox now properly delays command submissions using minLedgerTimeAbs or minLedgerTimeRel. See `issue #5480 <https://github.com/digital-asset/daml/issues/5480>`_. CHANGELOG_END
This commit is contained in:
parent
9186f5ccb6
commit
fe29cfbbb9
@ -43,6 +43,7 @@ compile_deps = [
|
||||
"//libs-scala/ports",
|
||||
"//libs-scala/resources",
|
||||
"//libs-scala/resources-akka",
|
||||
"//libs-scala/timer-utils",
|
||||
"@maven//:ch_qos_logback_logback_classic",
|
||||
"@maven//:ch_qos_logback_logback_core",
|
||||
"@maven//:com_auth0_java_jwt",
|
||||
|
@ -44,6 +44,7 @@ import com.daml.platform.configuration.{
|
||||
SubmissionConfiguration
|
||||
}
|
||||
import com.daml.platform.server.api.services.grpc.GrpcHealthService
|
||||
import com.daml.platform.services.time.TimeProviderType
|
||||
import io.grpc.BindableService
|
||||
import io.grpc.protobuf.services.ProtoReflectionService
|
||||
import scalaz.syntax.tag._
|
||||
@ -81,6 +82,7 @@ object ApiServices {
|
||||
authorizer: Authorizer,
|
||||
engine: Engine,
|
||||
timeProvider: TimeProvider,
|
||||
timeProviderType: TimeProviderType,
|
||||
defaultLedgerConfiguration: Configuration,
|
||||
commandConfig: CommandConfiguration,
|
||||
partyConfig: PartyConfiguration,
|
||||
@ -132,6 +134,7 @@ object ApiServices {
|
||||
partyManagementService,
|
||||
defaultLedgerConfiguration.timeModel,
|
||||
timeProvider,
|
||||
timeProviderType,
|
||||
seedService,
|
||||
commandExecutor,
|
||||
ApiSubmissionService.Configuration(
|
||||
|
@ -31,6 +31,7 @@ import com.daml.platform.configuration.{
|
||||
}
|
||||
import com.daml.platform.index.JdbcIndex
|
||||
import com.daml.platform.packages.InMemoryPackageStore
|
||||
import com.daml.platform.services.time.TimeProviderType
|
||||
import com.daml.ports.Port
|
||||
import com.daml.resources.{Resource, ResourceOwner}
|
||||
import io.grpc.{BindableService, ServerInterceptor}
|
||||
@ -100,6 +101,8 @@ final class StandaloneApiServer(
|
||||
authorizer = authorizer,
|
||||
engine = engine,
|
||||
timeProvider = timeServiceBackend.getOrElse(TimeProvider.UTC),
|
||||
timeProviderType = timeServiceBackend.fold[TimeProviderType](
|
||||
TimeProviderType.WallClock)(_ => TimeProviderType.Static),
|
||||
defaultLedgerConfiguration = initialConditions.config,
|
||||
commandConfig = commandConfig,
|
||||
partyConfig = partyConfig,
|
||||
|
@ -42,7 +42,9 @@ import com.daml.platform.apiserver.execution.{CommandExecutionResult, CommandExe
|
||||
import com.daml.platform.server.api.services.domain.CommandSubmissionService
|
||||
import com.daml.platform.server.api.services.grpc.GrpcCommandSubmissionService
|
||||
import com.daml.platform.server.api.validation.ErrorFactories
|
||||
import com.daml.platform.services.time.TimeProviderType
|
||||
import com.daml.platform.store.ErrorCause
|
||||
import com.daml.timer.Delayed
|
||||
import io.grpc.Status
|
||||
|
||||
import scala.collection.breakOut
|
||||
@ -63,6 +65,7 @@ object ApiSubmissionService {
|
||||
partyManagementService: IndexPartyManagementService,
|
||||
timeModel: TimeModel,
|
||||
timeProvider: TimeProvider,
|
||||
timeProviderType: TimeProviderType,
|
||||
seedService: Option[SeedService],
|
||||
commandExecutor: CommandExecutor,
|
||||
configuration: ApiSubmissionService.Configuration,
|
||||
@ -80,6 +83,7 @@ object ApiSubmissionService {
|
||||
partyManagementService,
|
||||
timeModel,
|
||||
timeProvider,
|
||||
timeProviderType,
|
||||
seedService,
|
||||
commandExecutor,
|
||||
configuration,
|
||||
@ -110,6 +114,7 @@ final class ApiSubmissionService private (
|
||||
partyManagementService: IndexPartyManagementService,
|
||||
timeModel: TimeModel,
|
||||
timeProvider: TimeProvider,
|
||||
timeProviderType: TimeProviderType,
|
||||
seedService: Option[SeedService],
|
||||
commandExecutor: CommandExecutor,
|
||||
configuration: ApiSubmissionService.Configuration,
|
||||
@ -129,6 +134,8 @@ final class ApiSubmissionService private (
|
||||
metrics.meter(servicePrefix :+ "failed_command_interpretations")
|
||||
val deduplicatedCommandsMeter: Meter =
|
||||
metrics.meter(servicePrefix :+ "deduplicated_commands")
|
||||
val delayedSubmissionsMeter: Meter =
|
||||
metrics.meter(servicePrefix :+ "delayed_submissions")
|
||||
val submittedTransactionsTimer: Timer =
|
||||
metrics.timer(servicePrefix :+ "submitted_transactions")
|
||||
}
|
||||
@ -243,15 +250,38 @@ final class ApiSubmissionService private (
|
||||
case Some(result) =>
|
||||
Future.successful(result)
|
||||
case None =>
|
||||
transactionInfo match {
|
||||
case CommandExecutionResult(submitterInfo, transactionMeta, transaction, _) =>
|
||||
Timed.future(
|
||||
Metrics.submittedTransactionsTimer,
|
||||
FutureConverters.toScala(
|
||||
writeService.submitTransaction(submitterInfo, transactionMeta, transaction)))
|
||||
timeProviderType match {
|
||||
case TimeProviderType.WallClock =>
|
||||
// Submit transactions such that they arrive at the ledger sequencer exactly when record time equals ledger time.
|
||||
// If the ledger time of the transaction is far in the future (farther than the expected latency),
|
||||
// the submission to the WriteService is delayed.
|
||||
val submitAt = transactionInfo.transactionMeta.ledgerEffectiveTime.toInstant
|
||||
.minus(timeModel.avgTransactionLatency)
|
||||
val submissionDelay = Duration.between(timeProvider.getCurrentTime, submitAt)
|
||||
if (submissionDelay.isNegative)
|
||||
submitTransaction(transactionInfo)
|
||||
else {
|
||||
Metrics.delayedSubmissionsMeter.mark()
|
||||
val scalaDelay = scala.concurrent.duration.Duration.fromNanos(submissionDelay.toNanos)
|
||||
Delayed.Future.by(scalaDelay)(submitTransaction(transactionInfo))
|
||||
}
|
||||
case TimeProviderType.Static =>
|
||||
// In static time mode, record time is always equal to ledger time
|
||||
submitTransaction(transactionInfo)
|
||||
}
|
||||
}
|
||||
|
||||
private def submitTransaction(
|
||||
result: CommandExecutionResult,
|
||||
): Future[SubmissionResult] = {
|
||||
Timed.future(
|
||||
Metrics.submittedTransactionsTimer,
|
||||
FutureConverters.toScala(
|
||||
writeService
|
||||
.submitTransaction(result.submitterInfo, result.transactionMeta, result.transaction))
|
||||
)
|
||||
}
|
||||
|
||||
private def toStatus(errorCause: ErrorCause) =
|
||||
errorCause match {
|
||||
case e: ErrorCause.DamlLf =>
|
||||
|
@ -287,6 +287,7 @@ final class SandboxServer(
|
||||
authorizer = authorizer,
|
||||
engine = SandboxServer.engine,
|
||||
timeProvider = timeProvider,
|
||||
timeProviderType = timeProviderType,
|
||||
defaultLedgerConfiguration = defaultConfiguration,
|
||||
commandConfig = config.commandConfig,
|
||||
partyConfig = PartyConfiguration.default.copy(
|
||||
|
@ -0,0 +1,124 @@
|
||||
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.platform.sandbox.services.command
|
||||
|
||||
import java.time.{Duration, Instant}
|
||||
import java.util.UUID
|
||||
|
||||
import com.daml.api.util.DurationConversion
|
||||
import com.daml.ledger.api.testing.utils.{MockMessages, SuiteResourceManagementAroundAll}
|
||||
import com.daml.ledger.api.v1.admin.config_management_service.{
|
||||
ConfigManagementServiceGrpc,
|
||||
GetTimeModelRequest,
|
||||
GetTimeModelResponse
|
||||
}
|
||||
import com.daml.ledger.api.v1.command_service.CommandServiceGrpc
|
||||
import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc
|
||||
import com.daml.ledger.api.v1.commands.CreateCommand
|
||||
import com.daml.ledger.api.v1.value.{Record, RecordField, Value}
|
||||
import com.daml.platform.participant.util.ValueConversions._
|
||||
import com.daml.platform.sandbox.SandboxBackend
|
||||
import com.daml.platform.sandbox.config.SandboxConfig
|
||||
import com.daml.platform.sandbox.services.{SandboxFixture, TestCommands}
|
||||
import com.daml.platform.services.time.TimeProviderType
|
||||
import com.google.protobuf.duration.{Duration => ProtoDuration}
|
||||
import org.scalatest.{AsyncWordSpec, Inspectors, Matchers}
|
||||
import scalaz.syntax.tag._
|
||||
|
||||
@SuppressWarnings(Array("org.wartremover.warts.Any"))
|
||||
class CommandServiceIT
|
||||
extends AsyncWordSpec
|
||||
with Matchers
|
||||
with Inspectors
|
||||
with SandboxFixture
|
||||
with SandboxBackend.Postgresql
|
||||
with TestCommands
|
||||
with SuiteResourceManagementAroundAll {
|
||||
|
||||
private def command(party: String) =
|
||||
CreateCommand(
|
||||
Some(templateIds.dummy),
|
||||
Some(
|
||||
Record(
|
||||
Some(templateIds.dummy),
|
||||
Seq(RecordField("operator", Option(Value(Value.Sum.Party(party)))))))).wrap
|
||||
|
||||
private def submitAndWaitRequest(ledgerId: String) =
|
||||
MockMessages.submitAndWaitRequest
|
||||
.update(
|
||||
_.commands.commands := List(command(MockMessages.submitAndWaitRequest.getCommands.party)),
|
||||
_.commands.ledgerId := ledgerId,
|
||||
_.commands.commandId := UUID.randomUUID().toString,
|
||||
)
|
||||
|
||||
private def submitRequest(ledgerId: String) =
|
||||
MockMessages.submitRequest
|
||||
.update(
|
||||
_.commands.commands := List(command(MockMessages.submitRequest.getCommands.party)),
|
||||
_.commands.ledgerId := ledgerId,
|
||||
_.commands.commandId := UUID.randomUUID().toString,
|
||||
)
|
||||
|
||||
private[this] def assertExpectedDelay(
|
||||
start: Instant,
|
||||
end: Instant,
|
||||
minLedgerTimeRel: ProtoDuration,
|
||||
timeModel: GetTimeModelResponse) = {
|
||||
val avgLatency = DurationConversion.fromProto(timeModel.timeModel.get.avgTransactionLatency.get)
|
||||
val expectedDuration = DurationConversion.fromProto(minLedgerTimeRel).minus(avgLatency)
|
||||
val actualDuration = Duration.between(start, end)
|
||||
assert(
|
||||
actualDuration.compareTo(expectedDuration) != -1,
|
||||
s"Expected submission duration was $expectedDuration, actual duration way $actualDuration")
|
||||
}
|
||||
|
||||
"CommandSubmissionService" when {
|
||||
"receiving a command with minLedgerTimeRel" should {
|
||||
"delay the submission" in {
|
||||
val lid = ledgerId().unwrap
|
||||
val submissionService = CommandSubmissionServiceGrpc.stub(channel)
|
||||
val configService = ConfigManagementServiceGrpc.stub(channel)
|
||||
val minLedgerTimeRel = ProtoDuration.of(5, 0)
|
||||
val request = submitRequest(lid).update(_.commands.minLedgerTimeRel := minLedgerTimeRel)
|
||||
|
||||
for {
|
||||
timeModel <- configService.getTimeModel(GetTimeModelRequest())
|
||||
start = Instant.now
|
||||
_ <- submissionService.submit(request)
|
||||
end = Instant.now
|
||||
} yield {
|
||||
assertExpectedDelay(start, end, minLedgerTimeRel, timeModel)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"CommandService" when {
|
||||
"receiving a command with minLedgerTimeRel" should {
|
||||
"delay the submission" in {
|
||||
val lid = ledgerId().unwrap
|
||||
val commandService = CommandServiceGrpc.stub(channel)
|
||||
val configService = ConfigManagementServiceGrpc.stub(channel)
|
||||
val minLedgerTimeRel = ProtoDuration.of(5, 0)
|
||||
val request =
|
||||
submitAndWaitRequest(lid).update(_.commands.minLedgerTimeRel := minLedgerTimeRel)
|
||||
|
||||
for {
|
||||
timeModel <- configService.getTimeModel(GetTimeModelRequest())
|
||||
start = Instant.now
|
||||
_ <- commandService.submitAndWait(request)
|
||||
end = Instant.now
|
||||
} yield {
|
||||
assertExpectedDelay(start, end, minLedgerTimeRel, timeModel)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override protected def config: SandboxConfig =
|
||||
super.config.copy(
|
||||
timeProviderType = Some(TimeProviderType.WallClock),
|
||||
)
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user