Do not drop generated submissionIds in GrpcCommandService [KVL-1104] (#10882)

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Hubert Slojewski 2021-09-14 19:19:26 +02:00 committed by GitHub
parent b4750a495c
commit e4230dc51a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 123 additions and 71 deletions

View File

@ -3,9 +3,8 @@
package com.daml.ledger.api.validation
import java.time.{Duration, Instant}
import com.daml.api.util.{DurationConversion, TimestampConversion}
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.commands.Command.Command.{
Create => ProtoCreate,
@ -15,7 +14,6 @@ import com.daml.ledger.api.v1.commands.Command.Command.{
ExerciseByKey => ProtoExerciseByKey,
}
import com.daml.ledger.api.v1.commands.{Command => ProtoCommand, Commands => ProtoCommands}
import com.daml.ledger.api.{SubmissionIdGenerator, domain}
import com.daml.lf.command._
import com.daml.lf.data._
import com.daml.lf.value.{Value => Lf}
@ -24,10 +22,11 @@ import com.daml.platform.server.api.validation.FieldValidations.{requirePresence
import io.grpc.StatusRuntimeException
import scalaz.syntax.tag._
import java.time.{Duration, Instant}
import scala.Ordering.Implicits.infixOrderingOps
import scala.collection.immutable
final class CommandsValidator(ledgerId: LedgerId, submissionIdGenerator: SubmissionIdGenerator) {
final class CommandsValidator(ledgerId: LedgerId) {
import ValueValidator._
@ -46,7 +45,7 @@ final class CommandsValidator(ledgerId: LedgerId, submissionIdGenerator: Submiss
appId <- requireLedgerString(commands.applicationId, "application_id")
.map(domain.ApplicationId(_))
commandId <- requireLedgerString(commands.commandId, "command_id").map(domain.CommandId(_))
submissionId = extractOrGenerateSubmissionId(commands)
submissionId <- requireSubmissionId(commands.submissionId)
submitters <- CommandsValidator.validateSubmitters(commands)
commandz <- requireNonEmpty(commands.commands, "commands")
validatedCommands <- validateInnerCommands(commandz)
@ -181,13 +180,6 @@ final class CommandsValidator(ledgerId: LedgerId, submissionIdGenerator: Submiss
Left(missingField("command", definiteAnswer = Some(false)))
}
private def extractOrGenerateSubmissionId(commands: ProtoCommands) =
if (commands.submissionId.isEmpty) {
domain.SubmissionId(submissionIdGenerator.generate())
} else {
domain.SubmissionId(Ref.SubmissionId.assertFromString(commands.submissionId))
}
}
object CommandsValidator {

View File

@ -32,48 +32,84 @@ class GrpcCommandService(
protected implicit val logger: Logger = LoggerFactory.getLogger(service.getClass)
private[this] val validator =
new SubmitAndWaitRequestValidator(new CommandsValidator(ledgerId, generateSubmissionId))
private[this] val validator = new SubmitAndWaitRequestValidator(new CommandsValidator(ledgerId))
override def submitAndWait(request: SubmitAndWaitRequest): Future[Empty] =
override def submitAndWait(request: SubmitAndWaitRequest): Future[Empty] = {
val requestWithSubmissionId = generateSubmissionIdIfEmpty(request)
validator
.validate(request, currentLedgerTime(), currentUtcTime(), maxDeduplicationTime())
.fold(
t => Future.failed(ValidationLogger.logFailure(request, t)),
_ => service.submitAndWait(request),
.validate(
requestWithSubmissionId,
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWait(requestWithSubmissionId),
)
}
override def submitAndWaitForTransactionId(
request: SubmitAndWaitRequest
): Future[SubmitAndWaitForTransactionIdResponse] =
): Future[SubmitAndWaitForTransactionIdResponse] = {
val requestWithSubmissionId = generateSubmissionIdIfEmpty(request)
validator
.validate(request, currentLedgerTime(), currentUtcTime(), maxDeduplicationTime())
.fold(
t => Future.failed(ValidationLogger.logFailure(request, t)),
_ => service.submitAndWaitForTransactionId(request),
.validate(
requestWithSubmissionId,
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransactionId(requestWithSubmissionId),
)
}
override def submitAndWaitForTransaction(
request: SubmitAndWaitRequest
): Future[SubmitAndWaitForTransactionResponse] =
): Future[SubmitAndWaitForTransactionResponse] = {
val requestWithSubmissionId = generateSubmissionIdIfEmpty(request)
validator
.validate(request, currentLedgerTime(), currentUtcTime(), maxDeduplicationTime())
.fold(
t => Future.failed(ValidationLogger.logFailure(request, t)),
_ => service.submitAndWaitForTransaction(request),
.validate(
requestWithSubmissionId,
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransaction(requestWithSubmissionId),
)
}
override def submitAndWaitForTransactionTree(
request: SubmitAndWaitRequest
): Future[SubmitAndWaitForTransactionTreeResponse] =
): Future[SubmitAndWaitForTransactionTreeResponse] = {
val requestWithSubmissionId = generateSubmissionIdIfEmpty(request)
validator
.validate(request, currentLedgerTime(), currentUtcTime(), maxDeduplicationTime())
.fold(
t => Future.failed(ValidationLogger.logFailure(request, t)),
_ => service.submitAndWaitForTransactionTree(request),
.validate(
requestWithSubmissionId,
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransactionTree(requestWithSubmissionId),
)
}
override def bindService(): ServerServiceDefinition =
CommandServiceGrpc.bindService(this, executionContext)
private def generateSubmissionIdIfEmpty(request: SubmitAndWaitRequest): SubmitAndWaitRequest = {
if (request.commands.exists(_.submissionId.isEmpty)) {
val commandsWithSubmissionId =
request.commands.map(_.copy(submissionId = generateSubmissionId.generate()))
request.copy(commands = commandsWithSubmissionId)
} else {
request
}
}
}

View File

@ -3,8 +3,6 @@
package com.daml.platform.server.api.services.grpc
import java.time.{Duration, Instant}
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.{
@ -24,6 +22,7 @@ import com.google.protobuf.empty.Empty
import io.grpc.ServerServiceDefinition
import org.slf4j.{Logger, LoggerFactory}
import java.time.{Duration, Instant}
import scala.concurrent.{ExecutionContext, Future}
class GrpcCommandSubmissionService(
@ -41,9 +40,7 @@ class GrpcCommandSubmissionService(
protected implicit val logger: Logger = LoggerFactory.getLogger(service.getClass)
private val validator = new SubmitRequestValidator(
new CommandsValidator(ledgerId, submissionIdGenerator)
)
private val validator = new SubmitRequestValidator(new CommandsValidator(ledgerId))
override def submit(request: ApiSubmitRequest): Future[Empty] = {
implicit val telemetryContext: TelemetryContext =
@ -54,6 +51,7 @@ class GrpcCommandSubmissionService(
telemetryContext.setAttribute(SpanAttribute.Submitter, commands.party)
telemetryContext.setAttribute(SpanAttribute.WorkflowId, commands.workflowId)
}
val requestWithSubmissionId = generateSubmissionIdIfEmpty(request)
Timed.timedAndTrackedFuture(
metrics.daml.commands.submissions,
metrics.daml.commands.submissionsRunning,
@ -61,14 +59,14 @@ class GrpcCommandSubmissionService(
.value(
metrics.daml.commands.validation,
validator.validate(
request,
requestWithSubmissionId,
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
),
)
.fold(
t => Future.failed(ValidationLogger.logFailure(request, t)),
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
service.submit(_).map(_ => Empty.defaultInstance),
),
)
@ -77,4 +75,13 @@ class GrpcCommandSubmissionService(
override def bindService(): ServerServiceDefinition =
CommandSubmissionServiceGrpc.bindService(this, executionContext)
private def generateSubmissionIdIfEmpty(request: ApiSubmitRequest): ApiSubmitRequest = {
if (request.commands.exists(_.submissionId.isEmpty)) {
val commandsWithSubmissionId =
request.commands.map(_.copy(submissionId = submissionIdGenerator.generate()))
request.copy(commands = commandsWithSubmissionId)
} else {
request
}
}
}

View File

@ -3,7 +3,7 @@
package com.daml.platform.server.api.validation
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.{DeduplicationPeriod, domain}
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.commands.Commands.{DeduplicationPeriod => DeduplicationPeriodProto}
import com.daml.ledger.api.v1.value.Identifier
@ -85,6 +85,19 @@ trait FieldValidations {
def requireLedgerString(s: String): Either[StatusRuntimeException, Ref.LedgerString] =
Ref.LedgerString.fromString(s).left.map(invalidArgument(definiteAnswer = Some(false)))
def requireSubmissionId(s: String): Either[StatusRuntimeException, domain.SubmissionId] = {
val fieldName = "submission_id"
if (s.isEmpty) {
Left(missingField(fieldName, definiteAnswer = Some(false)))
} else {
Ref.SubmissionId
.fromString(s)
.map(domain.SubmissionId(_))
.left
.map(invalidField(fieldName, _, definiteAnswer = Some(false)))
}
}
def requireContractId(
s: String,
fieldName: String,

View File

@ -3,9 +3,6 @@
package com.daml.ledger.api.validation
import java.time.{Instant, Duration => JDuration}
import java.util.UUID
import com.daml.api.util.{DurationConversion, TimestampConversion}
import com.daml.ledger.api.DomainMocks.{applicationId, commandId, submissionId, workflowId}
import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands}
@ -13,7 +10,7 @@ import com.daml.ledger.api.v1.commands.Commands.{DeduplicationPeriod => Deduplic
import com.daml.ledger.api.v1.commands.{Command, Commands, CreateCommand}
import com.daml.ledger.api.v1.value.Value.Sum
import com.daml.ledger.api.v1.value.{List => ApiList, Map => ApiMap, Optional => ApiOptional, _}
import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks, SubmissionIdGenerator}
import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks}
import com.daml.lf.command.{Commands => LfCommands, CreateCommand => LfCreateCommand}
import com.daml.lf.data._
import com.daml.lf.value.Value.ValueRecord
@ -25,6 +22,7 @@ import org.scalatest.prop.TableDrivenPropertyChecks
import org.scalatest.wordspec.AnyWordSpec
import scalaz.syntax.tag._
import java.time.{Instant, Duration => JDuration}
import scala.annotation.nowarn
@nowarn("msg=deprecated")
@ -60,6 +58,7 @@ class SubmitRequestValidatorTest
ledgerId = ledgerId.unwrap,
workflowId = workflowId.unwrap,
applicationId = applicationId.unwrap,
submissionId = submissionId.unwrap,
commandId = commandId.unwrap,
party = submitter,
commands = Seq(command),
@ -130,13 +129,24 @@ class SubmitRequestValidatorTest
private def unexpectedError = sys.error("unexpected error")
private val generateRandomSubmissionId: SubmissionIdGenerator =
() => Ref.SubmissionId.assertFromString(UUID.randomUUID().toString)
"CommandSubmissionRequestValidator" when {
"validating command submission requests" should {
"reject requests with empty submissionId" in {
val commandsValidator = new CommandsValidator(ledgerId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands.withSubmissionId(""),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationDuration),
),
INVALID_ARGUMENT,
"Missing field: submission_id",
)
}
"reject requests with empty commands" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands.withCommands(Seq.empty),
@ -150,7 +160,7 @@ class SubmitRequestValidatorTest
}
"not allow missing ledgerId" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
requestMustFailWith(
commandsValidator
.validateCommands(
@ -165,8 +175,7 @@ class SubmitRequestValidatorTest
}
"tolerate a missing workflowId" in {
val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap
val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
commandsValidator.validateCommands(
api.commands.withWorkflowId(""),
internal.ledgerTime,
@ -181,7 +190,7 @@ class SubmitRequestValidatorTest
}
"not allow missing applicationId" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands.withApplicationId(""),
@ -195,7 +204,7 @@ class SubmitRequestValidatorTest
}
"not allow missing commandId" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands.withCommandId(""),
@ -209,7 +218,7 @@ class SubmitRequestValidatorTest
}
"not allow missing submitter" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
requestMustFailWith(
commandsValidator
.validateCommands(
@ -224,7 +233,7 @@ class SubmitRequestValidatorTest
}
"correctly read and deduplicate multiple submitters" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
val result = commandsValidator
.validateCommands(
api.commands
@ -246,8 +255,7 @@ class SubmitRequestValidatorTest
}
"tolerate a single submitter specified in the actAs fields" in {
val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap
val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
commandsValidator
.validateCommands(
api.commands.withParty("").addActAs(api.submitter),
@ -258,8 +266,7 @@ class SubmitRequestValidatorTest
}
"tolerate a single submitter specified in party, actAs, and readAs fields" in {
val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap
val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
commandsValidator
.validateCommands(
api.commands.withParty(api.submitter).addActAs(api.submitter).addReadAs(api.submitter),
@ -271,8 +278,7 @@ class SubmitRequestValidatorTest
"advance ledger time if minLedgerTimeAbs is set" in {
val minLedgerTimeAbs = internal.ledgerTime.plus(internal.timeDelta)
val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap
val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
commandsValidator.validateCommands(
api.commands.copy(
minLedgerTimeAbs = Some(TimestampConversion.fromInstant(minLedgerTimeAbs))
@ -285,8 +291,7 @@ class SubmitRequestValidatorTest
"advance ledger time if minLedgerTimeRel is set" in {
val minLedgerTimeAbs = internal.ledgerTime.plus(internal.timeDelta)
val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap
val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
commandsValidator.validateCommands(
api.commands.copy(
minLedgerTimeRel = Some(DurationConversion.toProto(internal.timeDelta))
@ -317,7 +322,7 @@ class SubmitRequestValidatorTest
sentDeduplication: DeduplicationPeriodProto,
expectedDeduplication: DeduplicationPeriod,
) =>
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
val result = commandsValidator.validateCommands(
api.commands.copy(deduplicationPeriod = sentDeduplication),
internal.ledgerTime,
@ -338,7 +343,7 @@ class SubmitRequestValidatorTest
DeduplicationPeriodProto.DeduplicationDuration(Duration.of(-1, 0)),
)
) { deduplication =>
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands.copy(deduplicationPeriod = deduplication),
@ -364,7 +369,7 @@ class SubmitRequestValidatorTest
),
)
) { deduplication =>
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands
@ -381,8 +386,7 @@ class SubmitRequestValidatorTest
}
"default to maximum deduplication time if deduplication is missing" in {
val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap
val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
commandsValidator.validateCommands(
api.commands.copy(deduplicationPeriod = DeduplicationPeriodProto.Empty),
internal.ledgerTime,
@ -397,7 +401,7 @@ class SubmitRequestValidatorTest
}
"not allow missing ledger configuration" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
val commandsValidator = new CommandsValidator(ledgerId)
requestMustFailWith(
commandsValidator
.validateCommands(api.commands, internal.ledgerTime, internal.submittedAt, None),