[DPP-417][DDP-612] Adapt ApiSubmissionService to support V2 error codes (#11052)

Adapt ApiSubmissionService to support self-service (v2) error codes

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
pbatko-da 2021-10-13 14:19:10 +02:00 committed by GitHub
parent 514e8b50a3
commit 909a1bf896
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 143 additions and 43 deletions

View File

@ -381,6 +381,23 @@ object LedgerApiErrors extends LedgerApiErrorGroup {
}
@Explanation(
"The ledger configuration could not be retrieved. This could happen due to incomplete initialization of the participant or due to an internal system error."
)
@Resolution("Contact the participant operator.")
object LedgerConfigurationNotFound
extends ErrorCode(
id = "LEDGER_CONFIGURATION_NOT_FOUND",
ErrorCategory.InvalidGivenCurrentSystemStateResourceMissing,
) {
case class Reject()(implicit
loggingContext: ErrorCodeLoggingContext
) extends LoggingTransactionErrorImpl(
cause = "The ledger configuration is not available."
)
}
}
@Explanation("""This error occurs if the Daml transaction fails due to an authorization error.

View File

@ -56,7 +56,7 @@ class RejectionGenerators(conformanceMode: Boolean) {
def commandExecutorError(cause: ErrorCauseExport)(implicit
errorLoggingContext: ErrorCodeLoggingContext
): Option[StatusRuntimeException] = {
): StatusRuntimeException = {
def processPackageError(err: LfError.Package.Error): BaseError = err match {
case e: Package.Internal => LedgerApiErrors.InternalError.PackageInternal(e)
@ -172,12 +172,11 @@ class RejectionGenerators(conformanceMode: Boolean) {
toGrpc(transformed)
}
val rej = cause match {
cause match {
case ErrorCauseExport.DamlLf(error) => processLfError(error)
case x: ErrorCauseExport.LedgerTime =>
toGrpc(LedgerApiErrors.CommandPreparation.FailedToDetermineLedgerTime.Reject(x.explain))
}
Some(rej)
}
def submissionResult(result: Try[state.v2.SubmissionResult]): Option[Try[Unit]] = {

View File

@ -235,6 +235,7 @@ private[daml] object ApiServices {
submissionConfig.enableDeduplication,
),
metrics,
errorsVersionsSwitcher,
)
// Note: the command service uses the command submission, command completion, and transaction

View File

@ -6,5 +6,17 @@ package com.daml.platform.apiserver
import com.daml.error.ValueSwitch
import io.grpc.StatusRuntimeException
import scala.concurrent.Future
/** A mechanism to switch between the legacy error codes (v1) and the new self-service error codes (v2).
* This class is intended to facilitate transition to self-service error codes.
* Once the previous error codes are removed, this class should be dropped as well.
*/
final class ErrorCodesVersionSwitcher(enableSelfServiceErrorCodes: Boolean)
extends ValueSwitch[StatusRuntimeException](enableSelfServiceErrorCodes)
extends ValueSwitch[StatusRuntimeException](enableSelfServiceErrorCodes) {
def chooseAsFailedFuture[T](
v1: => StatusRuntimeException,
v2: => StatusRuntimeException,
): Future[T] = Future.failed(choose(v1 = v1, v2 = v2))
}

View File

@ -4,7 +4,8 @@
package com.daml.platform.apiserver.services
import com.daml.api.util.TimeProvider
import com.daml.error.ErrorCause
import com.daml.error.definitions.{ErrorCauseExport, LedgerApiErrors, RejectionGenerators}
import com.daml.error.{DamlErrorCodeLoggingContext, ErrorCause}
import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.{DeduplicationPeriod, SubmissionIdGenerator}
@ -20,9 +21,9 @@ import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.apiserver.SeedService
import com.daml.platform.apiserver.configuration.LedgerConfigurationSubscription
import com.daml.platform.apiserver.execution.{CommandExecutionResult, CommandExecutor}
import com.daml.platform.apiserver.{ErrorCodesVersionSwitcher, SeedService}
import com.daml.platform.server.api.services.domain.CommandSubmissionService
import com.daml.platform.server.api.services.grpc.GrpcCommandSubmissionService
import com.daml.platform.server.api.validation.ErrorFactories
@ -52,6 +53,7 @@ private[apiserver] object ApiSubmissionService {
commandExecutor: CommandExecutor,
configuration: ApiSubmissionService.Configuration,
metrics: Metrics,
errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
@ -68,6 +70,7 @@ private[apiserver] object ApiSubmissionService {
commandExecutor,
configuration,
metrics,
errorCodesVersionSwitcher,
),
ledgerId = ledgerId,
currentLedgerTime = () => timeProvider.getCurrentTime,
@ -96,6 +99,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
commandExecutor: CommandExecutor,
configuration: ApiSubmissionService.Configuration,
metrics: Metrics,
errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext)
extends CommandSubmissionService
with ErrorFactories
@ -103,6 +107,9 @@ private[apiserver] final class ApiSubmissionService private[services] (
private val logger = ContextualizedLogger.get(this.getClass)
// TODO error codes: review conformance mode usages wherever RejectionGenerators is instantiated
private val rejectionGenerators = new RejectionGenerators(conformanceMode = true)
override def submit(
request: SubmitRequest
)(implicit telemetryContext: TelemetryContext): Future[Unit] =
@ -123,7 +130,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
.transform(handleSubmissionResult)
}
case None =>
Future.failed[Unit](ErrorFactories.missingLedgerConfig(definiteAnswer = Some(false)))
failedOnMissingLedgerConfiguration()
}
evaluatedCommand
.andThen(logger.logErrorsOnCall[Unit])
@ -155,9 +162,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
}
case _: CommandDeduplicationDuplicate =>
metrics.daml.commands.deduplicatedCommands.mark()
val exception = duplicateCommandException
logger.debug(exception.getMessage)
Future.failed(exception)
failedOnDuplicateCommand()
}
private def handleSubmissionResult(result: Try[state.SubmissionResult])(implicit
@ -185,7 +190,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
result.fold(
error => {
metrics.daml.commands.failedCommandInterpretations.mark()
Future.failed(toStatusException(error))
failedOnCommandExecution(error)
},
Future.successful,
)
@ -283,7 +288,10 @@ private[apiserver] final class ApiSubmissionService private[services] (
.toScala
}
private def toStatusException(errorCause: ErrorCause): StatusRuntimeException =
/** This method encodes logic related to legacy error codes (V1).
* Cf. self-service error codes (V2) in //ledger/error
*/
private def toStatusExceptionV1(errorCause: ErrorCause): StatusRuntimeException =
errorCause match {
case cause @ ErrorCause.DamlLf(error) =>
error match {
@ -302,6 +310,42 @@ private[apiserver] final class ApiSubmissionService private[services] (
ErrorFactories.aborted(cause.explain, definiteAnswer = Some(false))
}
private def failedOnMissingLedgerConfiguration()(implicit
loggingContext: LoggingContext
): Future[Unit] = {
errorCodesVersionSwitcher.chooseAsFailedFuture(
v1 = ErrorFactories.missingLedgerConfig(definiteAnswer = Some(false)),
v2 = LedgerApiErrors.InterpreterErrors.LookupErrors.LedgerConfigurationNotFound
.Reject()(new DamlErrorCodeLoggingContext(logger, loggingContext, None))
.asGrpcError,
)
}
private def failedOnDuplicateCommand()(implicit loggingContext: LoggingContext): Future[Unit] = {
errorCodesVersionSwitcher.chooseAsFailedFuture(
v1 = {
val exception = duplicateCommandException
logger.debug(exception.getMessage)
exception
},
v2 = rejectionGenerators.duplicateCommand(
new DamlErrorCodeLoggingContext(logger, loggingContext, None)
),
)
}
private def failedOnCommandExecution(
error: ErrorCause
)(implicit loggingContext: LoggingContext): Future[CommandExecutionResult] = {
errorCodesVersionSwitcher.chooseAsFailedFuture(
v1 = toStatusExceptionV1(error),
v2 = rejectionGenerators
.commandExecutorError(cause = ErrorCauseExport.fromErrorCause(error))(
new DamlErrorCodeLoggingContext(logger, loggingContext, None)
),
)
}
override def close(): Unit = ()
}

View File

@ -3,22 +3,19 @@
package com.daml.platform.apiserver.services
import java.time.{Duration, Instant}
import java.util.UUID
import java.util.concurrent.CompletableFuture.completedFuture
import java.util.concurrent.atomic.AtomicInteger
import com.codahale.metrics.MetricRegistry
import com.daml.error.ErrorCause
import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks}
import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails, SubmissionId}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks}
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.index.v2.{
CommandDeduplicationNew,
IndexPartyManagementService,
IndexSubmissionService,
}
import com.daml.ledger.participant.state.v2.WriteService
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.ledger.resources.TestResourceContext
import com.daml.lf
@ -34,20 +31,24 @@ import com.daml.lf.transaction.{GlobalKey, NodeId, ReplayMismatch}
import com.daml.lf.value.Value
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.apiserver.SeedService
import com.daml.platform.apiserver.configuration.LedgerConfigurationSubscription
import com.daml.platform.apiserver.execution.CommandExecutor
import com.daml.platform.apiserver.services.ApiSubmissionServiceSpec._
import com.daml.platform.apiserver.{ErrorCodesVersionSwitcher, SeedService}
import com.daml.telemetry.{NoOpTelemetryContext, TelemetryContext}
import com.google.rpc.status.{Status => RpcStatus}
import io.grpc.Status
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
import org.scalatest.Inside
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{Assertion, Inside}
import java.time.{Duration, Instant}
import java.util.UUID
import java.util.concurrent.CompletableFuture.completedFuture
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.util.{Failure, Success, Try}
class ApiSubmissionServiceSpec
extends AsyncFlatSpec
@ -229,18 +230,29 @@ class ApiSubmissionServiceSpec
behavior of "submit"
it should "return proper gRPC status codes for DamlLf errors" in {
val partyManagementService = mock[IndexPartyManagementService]
val writeService = mock[state.WriteService]
testProperGrpcStatusCodesForDamlLfErrors(useSelfServiceErrorCodes = false)
}
it should "return proper gRPC status codes for DamlLf errors (self service error codes a.k.a v2 error codes)" in {
testProperGrpcStatusCodesForDamlLfErrors(useSelfServiceErrorCodes = true)
}
private def testProperGrpcStatusCodesForDamlLfErrors(
useSelfServiceErrorCodes: Boolean
): Future[Assertion] = {
// given
val partyManagementService = mock[IndexPartyManagementService]
val writeService = mock[WriteService]
val mockCommandExecutor = mock[CommandExecutor]
val tmplId = toIdentifier("M:T")
val errorsToStatuses = List(
val errorsToExpectedStatuses: Seq[(ErrorCause, Status)] = List(
ErrorCause.DamlLf(
LfError.Interpretation(
LfError.Interpretation.DamlException(LfInterpretationError.ContractNotFound("#cid")),
None,
)
) -> Status.ABORTED,
) -> ((Status.ABORTED, Status.ABORTED)),
ErrorCause.DamlLf(
LfError.Interpretation(
LfError.Interpretation.DamlException(
@ -250,12 +262,12 @@ class ApiSubmissionServiceSpec
),
None,
)
) -> Status.ABORTED,
) -> ((Status.ABORTED, Status.ALREADY_EXISTS)),
ErrorCause.DamlLf(
LfError.Validation(
LfError.Validation.ReplayMismatch(ReplayMismatch(null, null))
)
) -> Status.ABORTED,
) -> ((Status.ABORTED, Status.INTERNAL)),
ErrorCause.DamlLf(
LfError.Preprocessing(
LfError.Preprocessing.Lookup(
@ -265,7 +277,7 @@ class ApiSubmissionServiceSpec
)
)
)
) -> Status.INVALID_ARGUMENT,
) -> ((Status.INVALID_ARGUMENT, Status.INVALID_ARGUMENT)),
ErrorCause.DamlLf(
LfError.Interpretation(
LfError.Interpretation.DamlException(
@ -276,20 +288,27 @@ class ApiSubmissionServiceSpec
),
None,
)
) -> Status.INVALID_ARGUMENT,
ErrorCause.LedgerTime(0) -> Status.ABORTED,
)
val mockCommandExecutor = mock[CommandExecutor]
) -> ((Status.INVALID_ARGUMENT, Status.INVALID_ARGUMENT)),
ErrorCause.LedgerTime(0) -> ((Status.ABORTED, Status.ABORTED)),
).map { case (key, (statusV1, statusV2)) =>
if (useSelfServiceErrorCodes) {
(key, statusV2)
} else {
(key, statusV1)
}
}
val service = newSubmissionService(
writeService,
partyManagementService,
implicitPartyAllocation = true,
commandExecutor = mockCommandExecutor,
useSelfServiceErrorCodes = useSelfServiceErrorCodes,
)
Future
.sequence(errorsToStatuses.map { case (error, code) =>
// when
val results: Seq[Future[(Status, Try[Unit])]] = errorsToExpectedStatuses
.map { case (error, expectedStatus) =>
val submitRequest = newSubmitRequest()
when(
mockCommandExecutor.execute(
@ -298,13 +317,17 @@ class ApiSubmissionServiceSpec
any[Configuration],
)(any[ExecutionContext], any[LoggingContext])
).thenReturn(Future.successful(Left(error)))
service
.submit(submitRequest)
.transform(result => Success(expectedStatus -> result))
}
val sequencedResults: Future[Seq[(Status, Try[Unit])]] = Future.sequence(results)
service.submit(submitRequest).transform(result => Success(code -> result))
})
.map { results =>
results.foreach { case (code, result) =>
// then
sequencedResults.map { results: Seq[(Status, Try[Unit])] =>
results.foreach { case (expectedStatus: Status, result: Try[Unit]) =>
inside(result) { case Failure(exception) =>
exception.getMessage should startWith(code.getCode.toString)
exception.getMessage should startWith(expectedStatus.getCode.toString)
}
}
succeed
@ -431,6 +454,7 @@ object ApiSubmissionServiceSpec {
commandExecutor: CommandExecutor = null,
deduplicationEnabled: Boolean = true,
mockIndexSubmissionService: IndexSubmissionService = mock[IndexSubmissionService],
useSelfServiceErrorCodes: Boolean = false,
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
@ -468,6 +492,9 @@ object ApiSubmissionServiceSpec {
configuration = ApiSubmissionService
.Configuration(implicitPartyAllocation, enableDeduplication = true),
metrics = new Metrics(new MetricRegistry),
errorCodesVersionSwitcher = new ErrorCodesVersionSwitcher(
enableSelfServiceErrorCodes = useSelfServiceErrorCodes
),
)
}
}