From 909a1bf896fe5f70176df132ef271ed9d1b386c2 Mon Sep 17 00:00:00 2001 From: pbatko-da Date: Wed, 13 Oct 2021 14:19:10 +0200 Subject: [PATCH] [DPP-417][DDP-612] Adapt ApiSubmissionService to support V2 error codes (#11052) Adapt ApiSubmissionService to support self-service (v2) error codes CHANGELOG_BEGIN CHANGELOG_END --- .../error/definitions/LedgerApiErrors.scala | 17 ++++ .../definitions/RejectionGenerators.scala | 5 +- .../platform/apiserver/ApiServices.scala | 1 + .../apiserver/ErrorCodesVersionSwitcher.scala | 14 ++- .../services/ApiSubmissionService.scala | 60 +++++++++++-- .../services/ApiSubmissionServiceSpec.scala | 89 ++++++++++++------- 6 files changed, 143 insertions(+), 43 deletions(-) diff --git a/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala b/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala index e38ce3a14fe..4d389a0b2f3 100644 --- a/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala +++ b/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala @@ -381,6 +381,23 @@ object LedgerApiErrors extends LedgerApiErrorGroup { } + @Explanation( + "The ledger configuration could not be retrieved. This could happen due to incomplete initialization of the participant or due to an internal system error." + ) + @Resolution("Contact the participant operator.") + object LedgerConfigurationNotFound + extends ErrorCode( + id = "LEDGER_CONFIGURATION_NOT_FOUND", + ErrorCategory.InvalidGivenCurrentSystemStateResourceMissing, + ) { + + case class Reject()(implicit + loggingContext: ErrorCodeLoggingContext + ) extends LoggingTransactionErrorImpl( + cause = "The ledger configuration is not available." + ) + } + } @Explanation("""This error occurs if the Daml transaction fails due to an authorization error. diff --git a/ledger/error/src/main/scala/com/daml/error/definitions/RejectionGenerators.scala b/ledger/error/src/main/scala/com/daml/error/definitions/RejectionGenerators.scala index 0827a978584..d300745da06 100644 --- a/ledger/error/src/main/scala/com/daml/error/definitions/RejectionGenerators.scala +++ b/ledger/error/src/main/scala/com/daml/error/definitions/RejectionGenerators.scala @@ -56,7 +56,7 @@ class RejectionGenerators(conformanceMode: Boolean) { def commandExecutorError(cause: ErrorCauseExport)(implicit errorLoggingContext: ErrorCodeLoggingContext - ): Option[StatusRuntimeException] = { + ): StatusRuntimeException = { def processPackageError(err: LfError.Package.Error): BaseError = err match { case e: Package.Internal => LedgerApiErrors.InternalError.PackageInternal(e) @@ -172,12 +172,11 @@ class RejectionGenerators(conformanceMode: Boolean) { toGrpc(transformed) } - val rej = cause match { + cause match { case ErrorCauseExport.DamlLf(error) => processLfError(error) case x: ErrorCauseExport.LedgerTime => toGrpc(LedgerApiErrors.CommandPreparation.FailedToDetermineLedgerTime.Reject(x.explain)) } - Some(rej) } def submissionResult(result: Try[state.v2.SubmissionResult]): Option[Try[Unit]] = { diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala index b1ad13208b2..240efff9e11 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala @@ -235,6 +235,7 @@ private[daml] object ApiServices { submissionConfig.enableDeduplication, ), metrics, + errorsVersionsSwitcher, ) // Note: the command service uses the command submission, command completion, and transaction diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ErrorCodesVersionSwitcher.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ErrorCodesVersionSwitcher.scala index d8ae67eaa9f..07f2689f061 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ErrorCodesVersionSwitcher.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ErrorCodesVersionSwitcher.scala @@ -6,5 +6,17 @@ package com.daml.platform.apiserver import com.daml.error.ValueSwitch import io.grpc.StatusRuntimeException +import scala.concurrent.Future + +/** A mechanism to switch between the legacy error codes (v1) and the new self-service error codes (v2). + * This class is intended to facilitate transition to self-service error codes. + * Once the previous error codes are removed, this class should be dropped as well. + */ final class ErrorCodesVersionSwitcher(enableSelfServiceErrorCodes: Boolean) - extends ValueSwitch[StatusRuntimeException](enableSelfServiceErrorCodes) + extends ValueSwitch[StatusRuntimeException](enableSelfServiceErrorCodes) { + + def chooseAsFailedFuture[T]( + v1: => StatusRuntimeException, + v2: => StatusRuntimeException, + ): Future[T] = Future.failed(choose(v1 = v1, v2 = v2)) +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala index 5b1260f1e79..290cd55917f 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala @@ -4,7 +4,8 @@ package com.daml.platform.apiserver.services import com.daml.api.util.TimeProvider -import com.daml.error.ErrorCause +import com.daml.error.definitions.{ErrorCauseExport, LedgerApiErrors, RejectionGenerators} +import com.daml.error.{DamlErrorCodeLoggingContext, ErrorCause} import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands} import com.daml.ledger.api.messages.command.submission.SubmitRequest import com.daml.ledger.api.{DeduplicationPeriod, SubmissionIdGenerator} @@ -20,9 +21,9 @@ import com.daml.logging.LoggingContext.withEnrichedLoggingContext import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.Metrics import com.daml.platform.api.grpc.GrpcApiService -import com.daml.platform.apiserver.SeedService import com.daml.platform.apiserver.configuration.LedgerConfigurationSubscription import com.daml.platform.apiserver.execution.{CommandExecutionResult, CommandExecutor} +import com.daml.platform.apiserver.{ErrorCodesVersionSwitcher, SeedService} import com.daml.platform.server.api.services.domain.CommandSubmissionService import com.daml.platform.server.api.services.grpc.GrpcCommandSubmissionService import com.daml.platform.server.api.validation.ErrorFactories @@ -52,6 +53,7 @@ private[apiserver] object ApiSubmissionService { commandExecutor: CommandExecutor, configuration: ApiSubmissionService.Configuration, metrics: Metrics, + errorCodesVersionSwitcher: ErrorCodesVersionSwitcher, )(implicit executionContext: ExecutionContext, loggingContext: LoggingContext, @@ -68,6 +70,7 @@ private[apiserver] object ApiSubmissionService { commandExecutor, configuration, metrics, + errorCodesVersionSwitcher, ), ledgerId = ledgerId, currentLedgerTime = () => timeProvider.getCurrentTime, @@ -96,6 +99,7 @@ private[apiserver] final class ApiSubmissionService private[services] ( commandExecutor: CommandExecutor, configuration: ApiSubmissionService.Configuration, metrics: Metrics, + errorCodesVersionSwitcher: ErrorCodesVersionSwitcher, )(implicit executionContext: ExecutionContext, loggingContext: LoggingContext) extends CommandSubmissionService with ErrorFactories @@ -103,6 +107,9 @@ private[apiserver] final class ApiSubmissionService private[services] ( private val logger = ContextualizedLogger.get(this.getClass) + // TODO error codes: review conformance mode usages wherever RejectionGenerators is instantiated + private val rejectionGenerators = new RejectionGenerators(conformanceMode = true) + override def submit( request: SubmitRequest )(implicit telemetryContext: TelemetryContext): Future[Unit] = @@ -123,7 +130,7 @@ private[apiserver] final class ApiSubmissionService private[services] ( .transform(handleSubmissionResult) } case None => - Future.failed[Unit](ErrorFactories.missingLedgerConfig(definiteAnswer = Some(false))) + failedOnMissingLedgerConfiguration() } evaluatedCommand .andThen(logger.logErrorsOnCall[Unit]) @@ -155,9 +162,7 @@ private[apiserver] final class ApiSubmissionService private[services] ( } case _: CommandDeduplicationDuplicate => metrics.daml.commands.deduplicatedCommands.mark() - val exception = duplicateCommandException - logger.debug(exception.getMessage) - Future.failed(exception) + failedOnDuplicateCommand() } private def handleSubmissionResult(result: Try[state.SubmissionResult])(implicit @@ -185,7 +190,7 @@ private[apiserver] final class ApiSubmissionService private[services] ( result.fold( error => { metrics.daml.commands.failedCommandInterpretations.mark() - Future.failed(toStatusException(error)) + failedOnCommandExecution(error) }, Future.successful, ) @@ -283,7 +288,10 @@ private[apiserver] final class ApiSubmissionService private[services] ( .toScala } - private def toStatusException(errorCause: ErrorCause): StatusRuntimeException = + /** This method encodes logic related to legacy error codes (V1). + * Cf. self-service error codes (V2) in //ledger/error + */ + private def toStatusExceptionV1(errorCause: ErrorCause): StatusRuntimeException = errorCause match { case cause @ ErrorCause.DamlLf(error) => error match { @@ -302,6 +310,42 @@ private[apiserver] final class ApiSubmissionService private[services] ( ErrorFactories.aborted(cause.explain, definiteAnswer = Some(false)) } + private def failedOnMissingLedgerConfiguration()(implicit + loggingContext: LoggingContext + ): Future[Unit] = { + errorCodesVersionSwitcher.chooseAsFailedFuture( + v1 = ErrorFactories.missingLedgerConfig(definiteAnswer = Some(false)), + v2 = LedgerApiErrors.InterpreterErrors.LookupErrors.LedgerConfigurationNotFound + .Reject()(new DamlErrorCodeLoggingContext(logger, loggingContext, None)) + .asGrpcError, + ) + } + + private def failedOnDuplicateCommand()(implicit loggingContext: LoggingContext): Future[Unit] = { + errorCodesVersionSwitcher.chooseAsFailedFuture( + v1 = { + val exception = duplicateCommandException + logger.debug(exception.getMessage) + exception + }, + v2 = rejectionGenerators.duplicateCommand( + new DamlErrorCodeLoggingContext(logger, loggingContext, None) + ), + ) + } + + private def failedOnCommandExecution( + error: ErrorCause + )(implicit loggingContext: LoggingContext): Future[CommandExecutionResult] = { + errorCodesVersionSwitcher.chooseAsFailedFuture( + v1 = toStatusExceptionV1(error), + v2 = rejectionGenerators + .commandExecutorError(cause = ErrorCauseExport.fromErrorCause(error))( + new DamlErrorCodeLoggingContext(logger, loggingContext, None) + ), + ) + } + override def close(): Unit = () } diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala index c41c60832bd..1ff1d329ab3 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala @@ -3,22 +3,19 @@ package com.daml.platform.apiserver.services -import java.time.{Duration, Instant} -import java.util.UUID -import java.util.concurrent.CompletableFuture.completedFuture -import java.util.concurrent.atomic.AtomicInteger import com.codahale.metrics.MetricRegistry import com.daml.error.ErrorCause -import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks} import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails, SubmissionId} import com.daml.ledger.api.messages.command.submission.SubmitRequest import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll +import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks} import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} import com.daml.ledger.participant.state.index.v2.{ CommandDeduplicationNew, IndexPartyManagementService, IndexSubmissionService, } +import com.daml.ledger.participant.state.v2.WriteService import com.daml.ledger.participant.state.{v2 => state} import com.daml.ledger.resources.TestResourceContext import com.daml.lf @@ -34,20 +31,24 @@ import com.daml.lf.transaction.{GlobalKey, NodeId, ReplayMismatch} import com.daml.lf.value.Value import com.daml.logging.LoggingContext import com.daml.metrics.Metrics -import com.daml.platform.apiserver.SeedService import com.daml.platform.apiserver.configuration.LedgerConfigurationSubscription import com.daml.platform.apiserver.execution.CommandExecutor import com.daml.platform.apiserver.services.ApiSubmissionServiceSpec._ +import com.daml.platform.apiserver.{ErrorCodesVersionSwitcher, SeedService} import com.daml.telemetry.{NoOpTelemetryContext, TelemetryContext} import com.google.rpc.status.{Status => RpcStatus} import io.grpc.Status import org.mockito.{ArgumentMatchersSugar, MockitoSugar} -import org.scalatest.Inside import org.scalatest.flatspec.AsyncFlatSpec import org.scalatest.matchers.should.Matchers +import org.scalatest.{Assertion, Inside} +import java.time.{Duration, Instant} +import java.util.UUID +import java.util.concurrent.CompletableFuture.completedFuture +import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} class ApiSubmissionServiceSpec extends AsyncFlatSpec @@ -229,18 +230,29 @@ class ApiSubmissionServiceSpec behavior of "submit" it should "return proper gRPC status codes for DamlLf errors" in { - val partyManagementService = mock[IndexPartyManagementService] - val writeService = mock[state.WriteService] + testProperGrpcStatusCodesForDamlLfErrors(useSelfServiceErrorCodes = false) + } + it should "return proper gRPC status codes for DamlLf errors (self service error codes a.k.a v2 error codes)" in { + testProperGrpcStatusCodesForDamlLfErrors(useSelfServiceErrorCodes = true) + } + + private def testProperGrpcStatusCodesForDamlLfErrors( + useSelfServiceErrorCodes: Boolean + ): Future[Assertion] = { + // given + val partyManagementService = mock[IndexPartyManagementService] + val writeService = mock[WriteService] + val mockCommandExecutor = mock[CommandExecutor] val tmplId = toIdentifier("M:T") - val errorsToStatuses = List( + val errorsToExpectedStatuses: Seq[(ErrorCause, Status)] = List( ErrorCause.DamlLf( LfError.Interpretation( LfError.Interpretation.DamlException(LfInterpretationError.ContractNotFound("#cid")), None, ) - ) -> Status.ABORTED, + ) -> ((Status.ABORTED, Status.ABORTED)), ErrorCause.DamlLf( LfError.Interpretation( LfError.Interpretation.DamlException( @@ -250,12 +262,12 @@ class ApiSubmissionServiceSpec ), None, ) - ) -> Status.ABORTED, + ) -> ((Status.ABORTED, Status.ALREADY_EXISTS)), ErrorCause.DamlLf( LfError.Validation( LfError.Validation.ReplayMismatch(ReplayMismatch(null, null)) ) - ) -> Status.ABORTED, + ) -> ((Status.ABORTED, Status.INTERNAL)), ErrorCause.DamlLf( LfError.Preprocessing( LfError.Preprocessing.Lookup( @@ -265,7 +277,7 @@ class ApiSubmissionServiceSpec ) ) ) - ) -> Status.INVALID_ARGUMENT, + ) -> ((Status.INVALID_ARGUMENT, Status.INVALID_ARGUMENT)), ErrorCause.DamlLf( LfError.Interpretation( LfError.Interpretation.DamlException( @@ -276,20 +288,27 @@ class ApiSubmissionServiceSpec ), None, ) - ) -> Status.INVALID_ARGUMENT, - ErrorCause.LedgerTime(0) -> Status.ABORTED, - ) - val mockCommandExecutor = mock[CommandExecutor] + ) -> ((Status.INVALID_ARGUMENT, Status.INVALID_ARGUMENT)), + ErrorCause.LedgerTime(0) -> ((Status.ABORTED, Status.ABORTED)), + ).map { case (key, (statusV1, statusV2)) => + if (useSelfServiceErrorCodes) { + (key, statusV2) + } else { + (key, statusV1) + } + } val service = newSubmissionService( writeService, partyManagementService, implicitPartyAllocation = true, commandExecutor = mockCommandExecutor, + useSelfServiceErrorCodes = useSelfServiceErrorCodes, ) - Future - .sequence(errorsToStatuses.map { case (error, code) => + // when + val results: Seq[Future[(Status, Try[Unit])]] = errorsToExpectedStatuses + .map { case (error, expectedStatus) => val submitRequest = newSubmitRequest() when( mockCommandExecutor.execute( @@ -298,17 +317,21 @@ class ApiSubmissionServiceSpec any[Configuration], )(any[ExecutionContext], any[LoggingContext]) ).thenReturn(Future.successful(Left(error))) - - service.submit(submitRequest).transform(result => Success(code -> result)) - }) - .map { results => - results.foreach { case (code, result) => - inside(result) { case Failure(exception) => - exception.getMessage should startWith(code.getCode.toString) - } - } - succeed + service + .submit(submitRequest) + .transform(result => Success(expectedStatus -> result)) } + val sequencedResults: Future[Seq[(Status, Try[Unit])]] = Future.sequence(results) + + // then + sequencedResults.map { results: Seq[(Status, Try[Unit])] => + results.foreach { case (expectedStatus: Status, result: Try[Unit]) => + inside(result) { case Failure(exception) => + exception.getMessage should startWith(expectedStatus.getCode.toString) + } + } + succeed + } } behavior of "command deduplication" @@ -431,6 +454,7 @@ object ApiSubmissionServiceSpec { commandExecutor: CommandExecutor = null, deduplicationEnabled: Boolean = true, mockIndexSubmissionService: IndexSubmissionService = mock[IndexSubmissionService], + useSelfServiceErrorCodes: Boolean = false, )(implicit executionContext: ExecutionContext, loggingContext: LoggingContext, @@ -468,6 +492,9 @@ object ApiSubmissionServiceSpec { configuration = ApiSubmissionService .Configuration(implicitPartyAllocation, enableDeduplication = true), metrics = new Metrics(new MetricRegistry), + errorCodesVersionSwitcher = new ErrorCodesVersionSwitcher( + enableSelfServiceErrorCodes = useSelfServiceErrorCodes + ), ) } }