Make submission ID optional [KVL-1107] (#11011)

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Hubert Slojewski 2021-10-26 16:39:56 +02:00 committed by GitHub
parent 3587eb84a2
commit 8212c0b2a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 400 additions and 243 deletions

View File

@ -259,7 +259,11 @@ class Extractor[T](config: ExtractorConfig, target: T)(
LedgerClientConfiguration(
applicationId = config.appId,
ledgerIdRequirement = LedgerIdRequirement.none,
commandClient = CommandClientConfiguration(1, 1, java.time.Duration.ofSeconds(20L)),
commandClient = CommandClientConfiguration(
maxCommandsInFlight = 1,
maxParallelSubmissions = 1,
defaultDeduplicationTime = java.time.Duration.ofSeconds(20L),
),
sslContext = config.tlsConfig.client(),
token = tokenHolder.flatMap(_.token),
maxInboundMessageSize = config.ledgerInboundMessageSizeMax,

View File

@ -45,7 +45,7 @@ message Completion {
// The submission ID this completion refers to, as described in ``commands.proto``.
// Must be a valid LedgerString (as described in ``value.proto``).
// Optional for historic completions where this data is not available.
// Optional
string submission_id = 6;
reserved "submission_rank"; // For future use.

View File

@ -85,6 +85,8 @@ da_scala_test_suite(
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/caching",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/ledger-grpc",
"//ledger/metrics",
"//libs-scala/concurrent",

View File

@ -20,5 +20,9 @@ final case class CommandClientConfiguration(
)
object CommandClientConfiguration {
def default = CommandClientConfiguration(1, 1, Duration.ofSeconds(30L))
def default: CommandClientConfiguration = CommandClientConfiguration(
maxCommandsInFlight = 1,
maxParallelSubmissions = 1,
defaultDeduplicationTime = Duration.ofSeconds(30L),
)
}

View File

@ -0,0 +1,59 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.client.services.commands
import akka.NotUsed
import akka.stream.scaladsl.Flow
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.client.configuration.CommandClientConfiguration
import com.daml.util.Ctx
import com.google.protobuf.duration.Duration
import scala.annotation.nowarn
object CommandUpdaterFlow {
@nowarn("msg=deprecated")
def apply[Context](
config: CommandClientConfiguration,
submissionIdGenerator: SubmissionIdGenerator,
applicationId: String,
ledgerIdToUse: LedgerId,
): Flow[Ctx[Context, CommandSubmission], Ctx[Context, CommandSubmission], NotUsed] =
Flow[Ctx[Context, CommandSubmission]]
.map(_.map { case submission @ CommandSubmission(commands, _) =>
if (LedgerId(commands.ledgerId) != ledgerIdToUse)
throw new IllegalArgumentException(
s"Failing fast on submission request of command ${commands.commandId} with invalid ledger ID ${commands.ledgerId} (client expected $ledgerIdToUse)"
)
if (commands.applicationId != applicationId)
throw new IllegalArgumentException(
s"Failing fast on submission request of command ${commands.commandId} with invalid application ID ${commands.applicationId} (client expected $applicationId)"
)
val nonEmptySubmissionId = if (commands.submissionId.isEmpty) {
submissionIdGenerator.generate()
} else {
commands.submissionId
}
val updatedDeduplicationPeriod = commands.deduplicationPeriod match {
case DeduplicationPeriod.Empty =>
DeduplicationPeriod.DeduplicationTime(
Duration
.of(
config.defaultDeduplicationTime.getSeconds,
config.defaultDeduplicationTime.getNano,
)
)
case existing => existing
}
submission.copy(commands =
commands.copy(
submissionId = nonEmptySubmissionId,
deduplicationPeriod = updatedDeduplicationPeriod,
)
)
})
}

View File

@ -13,7 +13,6 @@ import com.daml.ledger.client.services.commands.tracker.CommandTracker._
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionFailure,
CompletionSuccess,
NotOkResponse,
}
import com.daml.ledger.client.services.commands.{
CommandSubmission,
@ -62,8 +61,6 @@ private[commands] class CommandTracker[Context](
private val logger = LoggerFactory.getLogger(this.getClass.getName)
type ContextualizedCompletionResponse = Ctx[Context, Either[CompletionFailure, CompletionSuccess]]
val submitRequestIn: Inlet[Ctx[Context, CommandSubmission]] =
Inlet[Ctx[Context, CommandSubmission]]("submitRequestIn")
val submitRequestOut: Outlet[Ctx[(Context, TrackedCommandKey), CommandSubmission]] =
@ -73,8 +70,8 @@ private[commands] class CommandTracker[Context](
Inlet[Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]](
"commandResultIn"
)
val resultOut: Outlet[ContextualizedCompletionResponse] =
Outlet[ContextualizedCompletionResponse]("resultOut")
val resultOut: Outlet[ContextualizedCompletionResponse[Context]] =
Outlet[ContextualizedCompletionResponse[Context]]("resultOut")
val offsetOut: Outlet[LedgerOffset] =
Outlet[LedgerOffset]("offsetOut")
@ -166,7 +163,7 @@ private[commands] class CommandTracker[Context](
pushResultOrPullCommandResultIn(handleSubmitResponse(submitResponse))
case Right(CompletionStreamElement.CompletionElement(completion)) =>
pushResultOrPullCommandResultIn(getResponsesForCompletion(completion))
pushResultOrPullCommandResultIn(getResponseForCompletion(completion))
case Right(CompletionStreamElement.CheckpointElement(checkpoint)) =>
if (!hasBeenPulled(commandResultIn)) pull(commandResultIn)
@ -187,7 +184,7 @@ private[commands] class CommandTracker[Context](
)
private def pushResultOrPullCommandResultIn(
completionResponses: Seq[ContextualizedCompletionResponse]
completionResponse: Option[ContextualizedCompletionResponse[Context]]
): Unit = {
// The command tracker detects timeouts outside the regular pull/push
// mechanism of the input/output ports. Basically the timeout
@ -196,10 +193,13 @@ private[commands] class CommandTracker[Context](
// even though it hasn't been pulled again in the meantime. Using `emit`
// instead of `push` when a completion arrives makes akka take care of
// handling the signaling properly.
if (completionResponses.isEmpty && !hasBeenPulled(commandResultIn)) {
pull(commandResultIn)
completionResponse match {
case Some(response) => emit(resultOut, response)
case None =>
if (!hasBeenPulled(commandResultIn)) {
pull(commandResultIn)
}
}
emitMultiple(resultOut, completionResponses.to(immutable.Iterable))
}
private def completeStageIfTerminal(): Unit = {
@ -212,7 +212,7 @@ private[commands] class CommandTracker[Context](
private def handleSubmitResponse(
submitResponse: Ctx[(Context, TrackedCommandKey), Try[Empty]]
): Seq[ContextualizedCompletionResponse] = {
): Option[ContextualizedCompletionResponse[Context]] = {
val Ctx((_, commandKey), value, _) = submitResponse
value match {
case Failure(GrpcException(status @ GrpcStatus(code, _), metadata))
@ -220,18 +220,18 @@ private[commands] class CommandTracker[Context](
getResponseForTerminalStatusCode(
commandKey,
GrpcStatus.toProto(status, metadata),
).toList
)
case Failure(throwable) =>
logger.warn(
s"Service responded with error for submitting command with context ${submitResponse.context}. Status of command is unknown. watching for completion...",
throwable,
)
Seq.empty
None
case Success(_) =>
logger.trace(
s"Received confirmation that command ${commandKey.commandId} from submission ${commandKey.submissionId} was accepted."
)
Seq.empty
None
}
}
@ -241,7 +241,7 @@ private[commands] class CommandTracker[Context](
val submissionId = commands.submissionId
val commandId = commands.commandId
logger.trace(s"Begin tracking of command $commandId for submission $submissionId.")
if (commands.submissionId.isEmpty) {
if (submissionId.isEmpty) {
throw new IllegalArgumentException(
s"The submission ID for the command ID $commandId is empty. This should not happen."
)
@ -281,7 +281,7 @@ private[commands] class CommandTracker[Context](
private def getResponsesForTimeouts(
instant: Instant
): Seq[ContextualizedCompletionResponse] = {
): Seq[ContextualizedCompletionResponse[Context]] = {
logger.trace("Checking timeouts at {}", instant)
pendingCommands.view.flatMap { case (commandKey, trackingData) =>
if (trackingData.commandTimeout.isBefore(instant)) {
@ -305,14 +305,11 @@ private[commands] class CommandTracker[Context](
}.toSeq
}
private def getResponsesForCompletion(
private def getResponseForCompletion(
completion: Completion
): Seq[ContextualizedCompletionResponse] = {
): Option[ContextualizedCompletionResponse[Context]] = {
val commandId = completion.commandId
val maybeSubmissionId = Option(completion.submissionId).collect {
case id if id.nonEmpty => id
}
val maybeSubmissionId = Option(completion.submissionId).filter(_.nonEmpty)
logger.trace {
val completionDescription = completion.status match {
case Some(StatusProto(code, _, _, _)) if code == Status.Code.OK.value =>
@ -322,49 +319,26 @@ private[commands] class CommandTracker[Context](
s"Handling $completionDescription $commandId from submission $maybeSubmissionId."
}
val trackedCommandKeys = pendingCommandKeys(maybeSubmissionId, commandId)
val trackingDataForSubmissionId =
trackedCommandKeys.flatMap(pendingCommands.remove(_).toList)
if (trackingDataForSubmissionId.size > 1) {
trackingDataForSubmissionId.map { trackingData =>
Ctx(
trackingData.context,
Left(
NotOkResponse(
Completion(
commandId,
Some(
StatusProto.of(
Status.Code.INTERNAL.value(),
s"There are multiple pending commands with ID: $commandId for submission ID: $maybeSubmissionId. This can only happen for the mutating schema that shouldn't be used anymore, as it doesn't fully support command deduplication.",
Seq.empty,
)
),
)
)
),
maybeSubmissionId
.map { submissionId =>
val key = TrackedCommandKey(submissionId, completion.commandId)
val trackedCommandForCompletion = pendingCommands.remove(key)
trackedCommandForCompletion.map(trackingData =>
Ctx(trackingData.context, tracker.CompletionResponse(completion))
)
}
} else {
trackingDataForSubmissionId.map(trackingData =>
Ctx(trackingData.context, tracker.CompletionResponse(completion))
)
}
.getOrElse {
logger.trace(
"Ignoring a completion with an empty submission ID for a submission from the CommandSubmissionService."
)
None
}
}
private def pendingCommandKeys(
submissionId: Option[String],
commandId: String,
): Seq[TrackedCommandKey] =
submissionId.map(id => Seq(TrackedCommandKey(id, commandId))).getOrElse {
pendingCommands.keys.filter(_.commandId == commandId).toList
}
private def getResponseForTerminalStatusCode(
commandKey: TrackedCommandKey,
status: StatusProto,
): Option[ContextualizedCompletionResponse] = {
): Option[ContextualizedCompletionResponse[Context]] = {
logger.trace(
s"Handling failure of command ${commandKey.commandId} from submission ${commandKey.submissionId}."
)
@ -407,6 +381,9 @@ private[commands] class CommandTracker[Context](
}
object CommandTracker {
type ContextualizedCompletionResponse[Context] =
Ctx[Context, Either[CompletionFailure, CompletionSuccess]]
private val durationOrdering = implicitly[Ordering[Duration]]
private val nonTerminalCodes =

View File

@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.client.services.commands.withoutledgerid
import com.daml.ledger.client.services.commands._
import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
@ -18,25 +18,23 @@ import com.daml.ledger.api.v1.command_completion_service.{
}
import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.CommandSubmissionServiceStub
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.validation.CommandsValidator
import com.daml.ledger.client.LedgerClient
import com.daml.ledger.client.configuration.CommandClientConfiguration
import com.daml.ledger.client.services.commands.CommandTrackerFlow.Materialized
import com.daml.ledger.client.services.commands.tracker.TrackedCommandKey
import com.daml.ledger.client.services.commands._
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionFailure,
CompletionSuccess,
}
import com.daml.ledger.client.services.commands.tracker.TrackedCommandKey
import com.daml.util.Ctx
import com.daml.util.akkastreams.MaxInFlight
import com.google.protobuf.duration.Duration
import com.google.protobuf.empty.Empty
import org.slf4j.{Logger, LoggerFactory}
import scalaz.syntax.tag._
import scala.annotation.nowarn
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.util.Try
@ -150,7 +148,9 @@ private[daml] final class CommandClient(
ledgerEnd <- getCompletionEnd(ledgerIdToUse, token)
} yield {
partyFilter(parties.toSet)
.via(commandUpdaterFlow[Context](ledgerIdToUse))
.via(
CommandUpdaterFlow[Context](config, submissionIdGenerator, applicationId, ledgerIdToUse)
)
.viaMat(
CommandTrackerFlow[Context, NotUsed](
commandSubmissionFlow = CommandSubmissionFlow[(Context, TrackedCommandKey)](
@ -193,48 +193,12 @@ private[daml] final class CommandClient(
)
}
@nowarn("msg=deprecated")
private def commandUpdaterFlow[Context](ledgerIdToUse: LedgerId) =
Flow[Ctx[Context, CommandSubmission]]
.map(_.map { case submission @ CommandSubmission(commands, _) =>
if (LedgerId(commands.ledgerId) != ledgerIdToUse)
throw new IllegalArgumentException(
s"Failing fast on submission request of command ${commands.commandId} with invalid ledger ID ${commands.ledgerId} (client expected $ledgerIdToUse)"
)
if (commands.applicationId != applicationId)
throw new IllegalArgumentException(
s"Failing fast on submission request of command ${commands.commandId} with invalid application ID ${commands.applicationId} (client expected $applicationId)"
)
val nonEmptySubmissionId = if (commands.submissionId.isEmpty) {
submissionIdGenerator.generate()
} else {
commands.submissionId
}
val updatedDeduplicationPeriod = commands.deduplicationPeriod match {
case DeduplicationPeriod.Empty =>
DeduplicationPeriod.DeduplicationTime(
Duration
.of(
config.defaultDeduplicationTime.getSeconds,
config.defaultDeduplicationTime.getNano,
)
)
case existing => existing
}
submission.copy(commands =
commands.copy(
submissionId = nonEmptySubmissionId,
deduplicationPeriod = updatedDeduplicationPeriod,
)
)
})
def submissionFlow[Context](
ledgerIdToUse: LedgerId,
token: Option[String] = None,
): Flow[Ctx[Context, CommandSubmission], Ctx[Context, Try[Empty]], NotUsed] = {
Flow[Ctx[Context, CommandSubmission]]
.via(commandUpdaterFlow(ledgerIdToUse))
.via(CommandUpdaterFlow[Context](config, submissionIdGenerator, applicationId, ledgerIdToUse))
.via(CommandSubmissionFlow[Context](submit(token), config.maxParallelSubmissions))
}

View File

@ -493,64 +493,6 @@ class CommandTrackerFlowTest
}
}
"a completion without the submission id arrives" should {
"output a failure if there are multiple pending commands with the same command id" in {
val Handle(submissions, results, _, completionStreamMock) =
runCommandTrackingFlow(allSubmissionsSuccessful)
submissions.sendNext(newSubmission("submissionId", commandId))
submissions.sendNext(newSubmission("anotherSubmissionId", commandId))
val completionWithoutSubmissionId =
Completion(
commandId,
Some(successStatus),
submissionId = "",
)
completionStreamMock.send(
CompletionStreamElement.CompletionElement(completionWithoutSubmissionId)
)
results.expectNext(
Ctx(
context,
Left(
failureCompletion(
code = Code.INTERNAL,
message =
s"There are multiple pending commands with ID: $commandId for submission ID: None. This can only happen for the mutating schema that shouldn't be used anymore, as it doesn't fully support command deduplication.",
submissionId = "",
)
),
)
)
succeed
}
"output a successful completion" in {
val Handle(submissions, results, _, completionStreamMock) =
runCommandTrackingFlow(allSubmissionsSuccessful)
submissions.sendNext(submission)
val completionWithoutSubmissionId =
Completion(
commandId,
Some(successStatus),
submissionId = "",
)
completionStreamMock.send(
CompletionStreamElement.CompletionElement(completionWithoutSubmissionId)
)
results.expectNext(
Ctx(context, Right(successCompletion(submissionId = "")))
)
succeed
}
}
"a multitude of successful completions arrive for submitted commands" should {
"output all expected values" in {
@ -612,6 +554,26 @@ class CommandTrackerFlowTest
}
}
"completions with empty and nonempty submission IDs arrive" should {
"ignore a completion with an empty submission ID and output a successful response" in {
val Handle(submissions, results, _, completionStreamMock) =
runCommandTrackingFlow(allSubmissionsSuccessful)
results.request(2)
submissions.sendNext(submission)
completionStreamMock.send(successfulStreamCompletion("", commandId))
completionStreamMock.send(successfulStreamCompletion(submissionId, commandId))
results.expectNext(
Ctx(context, Right(successCompletion(submissionId = submissionId)))
)
succeed
}
}
"completion stream disconnects" should {
"keep run and recover the completion subscription from a recent offset" in {

View File

@ -0,0 +1,94 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.client.services.commands
import akka.stream.scaladsl.{Sink, Source}
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.api.v1.commands.Commands
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.api.{SubmissionIdGenerator, domain}
import com.daml.ledger.client.configuration.CommandClientConfiguration
import com.daml.lf.data.Ref
import com.daml.telemetry.NoOpTelemetryContext
import com.daml.util.Ctx
import com.google.protobuf.duration.Duration
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import scala.annotation.nowarn
import scala.util.Failure
@nowarn("msg=deprecated")
class CommandUpdaterFlowTest extends AsyncWordSpec with Matchers with AkkaBeforeAndAfterAll {
import CommandUpdaterFlowTest._
"apply" should {
"fail fast on an invalid ledger ID" in {
val aCommandSubmission =
CommandSubmission(defaultCommands.copy(ledgerId = "anotherLedgerId"))
runCommandUpdaterFlow(aCommandSubmission)
.transformWith {
case Failure(exception) => exception shouldBe an[IllegalArgumentException]
case _ => fail
}
}
"fail fast on an invalid application ID" in {
val aCommandSubmission =
CommandSubmission(defaultCommands.copy(applicationId = "anotherApplicationId"))
runCommandUpdaterFlow(aCommandSubmission)
.transformWith {
case Failure(exception) => exception shouldBe an[IllegalArgumentException]
case _ => fail
}
}
"generate a submission ID if it's empty" in {
val aCommandSubmission = CommandSubmission(defaultCommands.copy(submissionId = ""))
runCommandUpdaterFlow(aCommandSubmission)
.map(_.value.commands.submissionId shouldBe aSubmissionId)
}
"set the default deduplication period if it's empty" in {
val aCommandSubmission =
CommandSubmission(
defaultCommands.copy(deduplicationPeriod = DeduplicationPeriod.Empty)
)
val defaultDeduplicationTime = CommandClientConfiguration.default.defaultDeduplicationTime
runCommandUpdaterFlow(aCommandSubmission)
.map(
_.value.commands.getDeduplicationTime shouldBe Duration
.of(defaultDeduplicationTime.getSeconds, defaultDeduplicationTime.getNano)
)
}
}
private def runCommandUpdaterFlow(aCommandSubmission: CommandSubmission) = {
Source
.single(Ctx((), aCommandSubmission, NoOpTelemetryContext))
.via(
CommandUpdaterFlow(
CommandClientConfiguration.default,
aSubmissionIdGenerator,
anApplicationId,
aLedgerId,
)
)
.runWith(Sink.head)
}
}
object CommandUpdaterFlowTest {
private val anApplicationId = "anApplicationId"
private val aLedgerId = domain.LedgerId("aLedgerId")
private val aSubmissionId = Ref.SubmissionId.assertFromString("aSubmissionId")
private val aSubmissionIdGenerator: SubmissionIdGenerator = () => aSubmissionId
private val defaultCommands =
Commands.defaultInstance.copy(applicationId = anApplicationId, ledgerId = aLedgerId.toString)
}

View File

@ -49,7 +49,7 @@ final class CommandsValidator(ledgerId: LedgerId) {
appId <- requireLedgerString(commands.applicationId, "application_id")
.map(domain.ApplicationId(_))
commandId <- requireLedgerString(commands.commandId, "command_id").map(domain.CommandId(_))
submissionId <- requireSubmissionId(commands.submissionId)
submissionId <- validateSubmissionId(commands.submissionId)
submitters <- validateSubmitters(commands)
commandz <- requireNonEmpty(commands.commands, "commands")
validatedCommands <- validateInnerCommands(commandz)

View File

@ -4,7 +4,6 @@
package com.daml.platform.server.api.services.grpc
import com.daml.error.{DamlContextualizedErrorLogger, ContextualizedErrorLogger}
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.{
CommandSubmissionService => ApiCommandSubmissionService
@ -32,7 +31,6 @@ class GrpcCommandSubmissionService(
currentLedgerTime: () => Instant,
currentUtcTime: () => Instant,
maxDeduplicationTime: () => Option[Duration],
submissionIdGenerator: SubmissionIdGenerator,
metrics: Metrics,
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext)
extends ApiCommandSubmissionService
@ -57,7 +55,6 @@ class GrpcCommandSubmissionService(
telemetryContext.setAttribute(SpanAttribute.Submitter, commands.party)
telemetryContext.setAttribute(SpanAttribute.WorkflowId, commands.workflowId)
}
val requestWithSubmissionId = generateSubmissionIdIfEmpty(request)
Timed.timedAndTrackedFuture(
metrics.daml.commands.submissions,
metrics.daml.commands.submissionsRunning,
@ -65,14 +62,14 @@ class GrpcCommandSubmissionService(
.value(
metrics.daml.commands.validation,
validator.validate(
requestWithSubmissionId,
request,
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
),
)
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
t => Future.failed(ValidationLogger.logFailure(request, t)),
service.submit(_).map(_ => Empty.defaultInstance),
),
)
@ -80,14 +77,4 @@ class GrpcCommandSubmissionService(
override def bindService(): ServerServiceDefinition =
CommandSubmissionServiceGrpc.bindService(this, executionContext)
private def generateSubmissionIdIfEmpty(request: ApiSubmitRequest): ApiSubmitRequest = {
if (request.commands.exists(_.submissionId.isEmpty)) {
val commandsWithSubmissionId =
request.commands.map(_.copy(submissionId = submissionIdGenerator.generate()))
request.copy(commands = commandsWithSubmissionId)
} else {
request
}
}
}

View File

@ -93,20 +93,18 @@ class FieldValidations private (errorFactories: ErrorFactories) {
): Either[StatusRuntimeException, Ref.LedgerString] =
Ref.LedgerString.fromString(s).left.map(invalidArgument(definiteAnswer = Some(false)))
def requireSubmissionId(s: String)(implicit
def validateSubmissionId(s: String)(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
): Either[StatusRuntimeException, domain.SubmissionId] = {
val fieldName = "submission_id"
): Either[StatusRuntimeException, Option[domain.SubmissionId]] =
if (s.isEmpty) {
Left(missingField(fieldName, definiteAnswer = Some(false)))
Right(None)
} else {
Ref.SubmissionId
.fromString(s)
.map(domain.SubmissionId(_))
.map(submissionId => Some(domain.SubmissionId(submissionId)))
.left
.map(invalidField(fieldName, _, definiteAnswer = Some(false)))
.map(invalidField("submission_id", _, definiteAnswer = Some(false)))
}
}
def requireContractId(
s: String,

View File

@ -84,7 +84,7 @@ class SubmitRequestValidatorTest
workflowId = Some(workflowId),
applicationId = applicationId,
commandId = commandId,
submissionId = submissionId,
submissionId = Some(submissionId),
actAs = Set(DomainMocks.party),
readAs = Set.empty,
submittedAt = Time.Timestamp.assertFromInstant(submittedAt),
@ -132,18 +132,14 @@ class SubmitRequestValidatorTest
"CommandSubmissionRequestValidator" when {
"validating command submission requests" should {
"reject requests with empty submissionId" in {
"tolerate a missing submissionId" in {
val commandsValidator = new CommandsValidator(ledgerId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands.withSubmissionId(""),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationDuration),
),
INVALID_ARGUMENT,
"Missing field: submission_id",
)
commandsValidator.validateCommands(
api.commands.withSubmissionId(""),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationDuration),
) shouldEqual Right(internal.emptyCommands.copy(submissionId = None))
}
"reject requests with empty commands" in {

View File

@ -0,0 +1,111 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.server.api.services.grpc
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.testing.utils.MockMessages._
import com.daml.ledger.api.v1.command_service.CommandServiceGrpc.CommandService
import com.daml.ledger.api.v1.command_service.{
SubmitAndWaitForTransactionIdResponse,
SubmitAndWaitForTransactionResponse,
SubmitAndWaitForTransactionTreeResponse,
SubmitAndWaitRequest,
}
import com.daml.ledger.api.v1.commands.{Command, CreateCommand}
import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value}
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext
import com.google.protobuf.empty.Empty
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import java.time.{Duration, Instant}
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Future
class GrpcCommandServiceSpec
extends AsyncWordSpec
with MockitoSugar
with Matchers
with ArgumentMatchersSugar {
import GrpcCommandServiceSpec._
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
"GrpcCommandService" should {
"generate a submission ID if it's empty" in {
val submissionCounter = new AtomicInteger
val mockCommandService = mock[CommandService with AutoCloseable]
when(mockCommandService.submitAndWait(any[SubmitAndWaitRequest]))
.thenReturn(Future.successful(Empty.defaultInstance))
when(mockCommandService.submitAndWaitForTransaction(any[SubmitAndWaitRequest]))
.thenReturn(Future.successful(SubmitAndWaitForTransactionResponse.defaultInstance))
when(mockCommandService.submitAndWaitForTransactionId(any[SubmitAndWaitRequest]))
.thenReturn(Future.successful(SubmitAndWaitForTransactionIdResponse.defaultInstance))
when(mockCommandService.submitAndWaitForTransactionTree(any[SubmitAndWaitRequest]))
.thenReturn(Future.successful(SubmitAndWaitForTransactionTreeResponse.defaultInstance))
val grpcCommandService = new GrpcCommandService(
mockCommandService,
ledgerId = LedgerId(ledgerId),
currentLedgerTime = () => Instant.EPOCH,
currentUtcTime = () => Instant.EPOCH,
maxDeduplicationTime = () => Some(Duration.ZERO),
generateSubmissionId = () =>
Ref.SubmissionId.assertFromString(
s"$submissionIdPrefix${submissionCounter.incrementAndGet()}"
),
)
for {
_ <- grpcCommandService.submitAndWait(aSubmitAndWaitRequestWithNoSubmissionId)
_ <- grpcCommandService.submitAndWaitForTransaction(aSubmitAndWaitRequestWithNoSubmissionId)
_ <- grpcCommandService.submitAndWaitForTransactionId(
aSubmitAndWaitRequestWithNoSubmissionId
)
_ <- grpcCommandService.submitAndWaitForTransactionTree(
aSubmitAndWaitRequestWithNoSubmissionId
)
} yield {
def expectedSubmitAndWaitRequest(submissionIdSuffix: String) =
aSubmitAndWaitRequestWithNoSubmissionId.copy(commands =
aSubmitAndWaitRequestWithNoSubmissionId.commands
.map(_.copy(submissionId = s"$submissionIdPrefix$submissionIdSuffix"))
)
verify(mockCommandService).submitAndWait(expectedSubmitAndWaitRequest("1"))
verify(mockCommandService).submitAndWaitForTransaction(expectedSubmitAndWaitRequest("2"))
verify(mockCommandService).submitAndWaitForTransactionId(expectedSubmitAndWaitRequest("3"))
verify(mockCommandService).submitAndWaitForTransactionTree(
expectedSubmitAndWaitRequest("4")
)
succeed
}
}
}
}
object GrpcCommandServiceSpec {
private val aCommand = Command.of(
Command.Command.Create(
CreateCommand(
Some(Identifier("package", moduleName = "module", entityName = "entity")),
Some(
Record(
Some(Identifier("package", moduleName = "module", entityName = "entity")),
Seq(RecordField("something", Some(Value(Value.Sum.Bool(true))))),
)
),
)
)
)
private val aSubmitAndWaitRequestWithNoSubmissionId = submitAndWaitRequest.copy(
commands = Some(commands.copy(commands = Seq(aCommand), submissionId = ""))
)
private val submissionIdPrefix = "submissionId-"
}

View File

@ -10,7 +10,6 @@ import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.testing.utils.MockMessages._
import com.daml.ledger.api.v1.commands.{Command, CreateCommand}
import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value}
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.server.api.services.domain.CommandSubmissionService
@ -42,7 +41,6 @@ class GrpcCommandSubmissionServiceSpec
currentLedgerTime = () => Instant.EPOCH,
currentUtcTime = () => Instant.EPOCH,
maxDeduplicationTime = () => Some(Duration.ZERO),
submissionIdGenerator = () => Ref.SubmissionId.assertFromString("submissionId"),
metrics = new Metrics(new MetricRegistry),
)

View File

@ -278,7 +278,7 @@ object domain {
workflowId: Option[WorkflowId],
applicationId: ApplicationId,
commandId: CommandId,
submissionId: SubmissionId,
submissionId: Option[SubmissionId],
actAs: Set[Ref.Party],
readAs: Set[Ref.Party],
submittedAt: Timestamp,

View File

@ -49,7 +49,7 @@ final class AppendOnlyCompletionDeduplicationInfoIT[ServiceRequest](
.submitRequest(ledger, party, requestWithSubmissionId)
} yield {
assertApplicationIdIsPreserved(ledger.applicationId, optNoDeduplicationSubmittedCompletion)
assertSubmissionIdIsGenerated(optNoDeduplicationSubmittedCompletion)
service.assertCompletion(optNoDeduplicationSubmittedCompletion)
assertDeduplicationPeriodIsReported(optNoDeduplicationSubmittedCompletion)
assertSubmissionIdIsPreserved(optSubmissionIdSubmittedCompletion, RandomSubmissionId)
}
@ -70,6 +70,8 @@ private[testtool] object AppendOnlyCompletionDeduplicationInfoIT {
party: Primitive.Party,
request: ProtoRequestType,
)(implicit ec: ExecutionContext): Future[Option[Completion]]
def assertCompletion(optCompletion: Option[Completion]): Unit
}
case object CommandService extends Service[SubmitAndWaitRequest] {
@ -96,6 +98,15 @@ private[testtool] object AppendOnlyCompletionDeduplicationInfoIT {
_ <- ledger.submitAndWait(request)
completion <- singleCompletionAfterOffset(ledger, party, offset)
} yield completion
override def assertCompletion(optCompletion: Option[Completion]): Unit = {
val completion = assertDefined(optCompletion)
assert(completion.status.forall(_.code == Status.Code.OK.value()))
assert(
Ref.SubmissionId.fromString(completion.submissionId).isRight,
"Missing or invalid submission ID in completion",
)
}
}
case object CommandSubmissionService extends Service[SubmitRequest] {
@ -122,6 +133,11 @@ private[testtool] object AppendOnlyCompletionDeduplicationInfoIT {
_ <- ledger.submit(request)
completion <- singleCompletionAfterOffset(ledger, party, offset)
} yield completion
override def assertCompletion(optCompletion: Option[Completion]): Unit = {
val completion = assertDefined(optCompletion)
assert(completion.status.forall(_.code == Status.Code.OK.value()))
}
}
private def singleCompletionAfterOffset(
@ -155,15 +171,6 @@ private[testtool] object AppendOnlyCompletionDeduplicationInfoIT {
assert(completion.deduplicationPeriod.isDefined, "The deduplication period was not reported")
}
private def assertSubmissionIdIsGenerated(optCompletion: Option[Completion]): Unit = {
val completion = assertDefined(optCompletion)
assert(completion.status.forall(_.code == Status.Code.OK.value()))
assert(
Ref.SubmissionId.fromString(completion.submissionId).isRight,
"Missing or invalid submission ID in completion",
)
}
private def assertApplicationIdIsPreserved(
requestedApplicationId: String,
optCompletion: Option[Completion],

View File

@ -7,7 +7,7 @@ package com.daml.platform.db.migration.postgres
import java.io.InputStream
import java.sql.Connection
import java.util.{Date, UUID}
import java.util.Date
import akka.NotUsed
import akka.stream.scaladsl.Source
@ -484,14 +484,12 @@ private[migration] class V2_1__Rebuild_Acs extends BaseJavaMigration {
Some(rejectionDescription),
offset,
) =>
// We don't have a submission ID, so we need to generate one.
val submissionId = Ref.SubmissionId.assertFromString(UUID.randomUUID().toString)
val rejectionReason = readRejectionReason(rejectionType, rejectionDescription)
offset -> LedgerEntry.Rejection(
recordTime = Timestamp.assertFromInstant(recordedAt.toInstant),
commandId = commandId,
applicationId = applicationId,
submissionId = submissionId,
submissionId = None,
actAs = List(submitter),
rejectionReason = rejectionReason,
)

View File

@ -80,7 +80,7 @@ private[apiserver] final class StoreBackedCommandExecutor(
commands.applicationId.unwrap,
commands.commandId.unwrap,
commands.deduplicationPeriod,
commands.submissionId.unwrap,
commands.submissionId.map(_.unwrap),
ledgerConfiguration,
),
transactionMeta = state.TransactionMeta(

View File

@ -11,9 +11,9 @@ import com.daml.error.{
ErrorCodesVersionSwitcher,
}
import com.daml.error.definitions.{ErrorCauseExport, RejectionGenerators}
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.{DeduplicationPeriod, SubmissionIdGenerator}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.index.v2._
import com.daml.ledger.participant.state.{v2 => state}
@ -82,7 +82,6 @@ private[apiserver] object ApiSubmissionService {
currentUtcTime = () => Instant.now,
maxDeduplicationTime = () =>
ledgerConfigurationSubscription.latestConfiguration().map(_.maxDeduplicationTime),
submissionIdGenerator = SubmissionIdGenerator.Random,
metrics = metrics,
)

View File

@ -366,7 +366,7 @@ private class JdbcLedgerDao(
state.Update.CommandRejected(
recordTime = recordTime,
completionInfo = state
.CompletionInfo(actAs, applicationId, commandId, None, Some(submissionId)),
.CompletionInfo(actAs, applicationId, commandId, None, submissionId),
reasonTemplate = reason.toParticipantStateRejectionReason,
)
),

View File

@ -17,7 +17,7 @@ private[platform] object LedgerEntry {
recordTime: Timestamp,
commandId: Ref.CommandId,
applicationId: Ref.ApplicationId,
submissionId: Ref.SubmissionId,
submissionId: Option[Ref.SubmissionId],
actAs: List[Ref.Party],
rejectionReason: RejectionReason,
) extends LedgerEntry

View File

@ -6,7 +6,7 @@ package com.daml.platform.apiserver.execution
import java.time.Duration
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.domain.{ApplicationId, CommandId, Commands, LedgerId, SubmissionId}
import com.daml.ledger.api.domain.{ApplicationId, CommandId, Commands, LedgerId}
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.index.v2.{ContractStore, IndexPackagesService}
import com.daml.lf.crypto.Hash
@ -59,7 +59,7 @@ class StoreBackedCommandExecutorSpec
workflowId = None,
applicationId = ApplicationId(Ref.ApplicationId.assertFromString("applicationId")),
commandId = CommandId(Ref.CommandId.assertFromString("commandId")),
submissionId = SubmissionId(Ref.SubmissionId.assertFromString("submissionId")),
submissionId = None,
actAs = Set.empty,
readAs = Set.empty,
submittedAt = Time.Timestamp.Epoch,

View File

@ -5,7 +5,7 @@ package com.daml.platform.apiserver.services
import com.codahale.metrics.MetricRegistry
import com.daml.error.{ErrorCause, ErrorCodesVersionSwitcher}
import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails, SubmissionId}
import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks}
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
@ -42,7 +42,6 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.{Assertion, Inside}
import java.time.Duration
import java.util.UUID
import java.util.concurrent.CompletableFuture.completedFuture
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.{ExecutionContext, Future}
@ -355,7 +354,6 @@ class ApiSubmissionServiceSpec
writeService,
partyManagementService,
implicitPartyAllocation = true,
deduplicationEnabled = true,
mockIndexSubmissionService = indexSubmissionService,
commandExecutor = mockCommandExecutor,
)
@ -433,7 +431,7 @@ object ApiSubmissionServiceSpec {
commandId = CommandId(
Ref.CommandId.assertFromString(s"commandId-${commandId.incrementAndGet()}")
),
submissionId = SubmissionId(Ref.SubmissionId.assertFromString(UUID.randomUUID().toString)),
submissionId = None,
actAs = Set.empty,
readAs = Set.empty,
submittedAt = Timestamp.Epoch,

View File

@ -32,7 +32,7 @@ message DamlSubmitterInfo {
google.protobuf.Duration deduplication_duration = 6;
string deduplication_offset = 7;
}
string submission_id = 9;
optional string submission_id = 9;
}
// Daml transaction entry, used in both `DamlSubmission` and `DamlLogEntry`.

View File

@ -166,7 +166,7 @@ private[state] object Conversions {
.addAllSubmitters((subInfo.actAs: List[String]).asJava)
.setApplicationId(subInfo.applicationId)
.setCommandId(subInfo.commandId)
.setSubmissionId(subInfo.submissionId)
.setSubmissionId(subInfo.submissionId.getOrElse(""))
subInfo.deduplicationPeriod match {
case DeduplicationPeriod.DeduplicationDuration(duration) =>
submitterInfoBuilder.setDeduplicationDuration(buildDuration(duration))

View File

@ -275,7 +275,6 @@ object KVTest {
submitter,
commandId,
deduplicationDuration,
randomLedgerString,
)
testState.keyValueSubmission.transactionToSubmission(
submitterInfo = submitterInfo,
@ -444,14 +443,13 @@ object KVTest {
submitter: Ref.Party,
commandId: Ref.CommandId,
deduplicationDuration: Duration,
submissionId: Ref.SubmissionId,
): SubmitterInfo = {
SubmitterInfo(
actAs = List(submitter),
applicationId = Ref.LedgerString.assertFromString("test"),
commandId = commandId,
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(deduplicationDuration),
submissionId = submissionId,
submissionId = None,
ledgerConfiguration =
Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofSeconds(1)),
)

View File

@ -716,7 +716,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i
applicationId = Ref.LedgerString.assertFromString("tests"),
commandId = Ref.LedgerString.assertFromString(commandId),
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofSeconds(10)),
submissionId = Ref.LedgerString.assertFromString("submissionId"),
submissionId = None,
ledgerConfiguration =
Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofSeconds(1)),
)

View File

@ -47,7 +47,7 @@ class KeyValueCommittingSpec extends AnyWordSpec with Matchers {
applicationId = applicationId,
commandId = commandId,
deduplicationPeriod = ApiDeduplicationPeriod.DeduplicationDuration(Duration.ZERO),
submissionId = Ref.LedgerString.assertFromString("submission"),
submissionId = None,
ledgerConfiguration =
Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofSeconds(1)),
)

View File

@ -171,7 +171,7 @@ object KeyValueParticipantStateWriterSpec {
applicationId = Ref.LedgerString.assertFromString("tests"),
commandId = Ref.LedgerString.assertFromString(commandId),
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofDays(1)),
submissionId = Ref.LedgerString.assertFromString("submission"),
submissionId = None,
ledgerConfiguration =
Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofSeconds(1)),
)

View File

@ -32,7 +32,7 @@ final case class SubmitterInfo(
applicationId: Ref.ApplicationId,
commandId: Ref.CommandId,
deduplicationPeriod: DeduplicationPeriod,
submissionId: Ref.SubmissionId,
submissionId: Option[Ref.SubmissionId],
ledgerConfiguration: Configuration,
) {
@ -44,7 +44,7 @@ final case class SubmitterInfo(
applicationId,
commandId,
Some(deduplicationPeriod),
Some(submissionId),
submissionId,
)
}

View File

@ -206,7 +206,7 @@ private[sandbox] final class InMemoryLedger(
Some(commandId),
transactionId,
Some(`appId`),
_,
submissionId,
actAs,
_,
_,
@ -222,12 +222,13 @@ private[sandbox] final class InMemoryLedger(
commandId,
transactionId,
appId,
submissionId,
)
case (
offset,
InMemoryLedgerEntry(
LedgerEntry.Rejection(recordTime, commandId, `appId`, _, actAs, reason)
LedgerEntry.Rejection(recordTime, commandId, `appId`, submissionId, actAs, reason)
),
) if actAs.exists(parties) =>
val status = reason.toParticipantStateRejectionReason.status
@ -237,6 +238,7 @@ private[sandbox] final class InMemoryLedger(
commandId,
status,
appId,
submissionId,
)
}
}
@ -423,7 +425,7 @@ private[sandbox] final class InMemoryLedger(
Some(submitterInfo.commandId),
transactionId,
Some(submitterInfo.applicationId),
Some(submitterInfo.submissionId),
submitterInfo.submissionId,
submitterInfo.actAs,
transactionMeta.workflowId,
transactionMeta.ledgerEffectiveTime,

View File

@ -88,7 +88,6 @@ class TransactionTimeModelComplianceIT
ledger: Ledger,
ledgerTime: Instant,
commandId: String,
submissionId: String = UUID.randomUUID().toString,
configuration: Configuration,
) = {
val dummyTransaction = TransactionBuilder.EmptySubmitted
@ -98,7 +97,7 @@ class TransactionTimeModelComplianceIT
applicationId = Ref.ApplicationId.assertFromString("appId"),
commandId = Ref.CommandId.assertFromString(commandId + UUID.randomUUID().toString),
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(JDuration.ZERO),
submissionId = Ref.SubmissionId.assertFromString(submissionId),
submissionId = None,
ledgerConfiguration = configuration,
)
val transactionMeta = state.TransactionMeta(

View File

@ -264,7 +264,7 @@ final class SqlLedgerSpec
applicationId = applicationId,
commandId = commandId1,
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofHours(1)),
submissionId = submissionId1,
submissionId = None,
ledgerConfiguration = Configuration.reasonableInitialConfiguration,
),
transactionMeta = emptyTransactionMeta(seedService, ledgerEffectiveTime = now),
@ -299,7 +299,7 @@ final class SqlLedgerSpec
applicationId = applicationId,
commandId = commandId1,
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofHours(1)),
submissionId = submissionId1,
submissionId = None,
ledgerConfiguration = Configuration.reasonableInitialConfiguration,
),
transactionMeta = emptyTransactionMeta(seedService, ledgerEffectiveTime = now),
@ -368,7 +368,7 @@ final class SqlLedgerSpec
applicationId = applicationId,
commandId = commandId1,
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofHours(1)),
submissionId = submissionId1,
submissionId = None,
ledgerConfiguration = configuration,
),
transactionMeta =