[DPP-611][Self-service error codes] Adapt ApiCommandService (#11325)

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
pbatko-da 2021-10-22 15:42:38 +02:00 committed by GitHub
parent a89079b4a5
commit e8d0ccbdb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 17 deletions

View File

@ -268,6 +268,7 @@ private[daml] object ApiServices {
timeProvider = timeProvider,
ledgerConfigurationSubscription = ledgerConfigurationSubscription,
metrics = metrics,
errorsVersionsSwitcher,
)
val apiPartyManagementService = ApiPartyManagementService.createApiService(

View File

@ -7,7 +7,7 @@ import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Keep, Source}
import com.daml.api.util.TimeProvider
import com.daml.error.DamlContextualizedErrorLogger
import com.daml.error.{DamlContextualizedErrorLogger, ErrorCodesVersionSwitcher}
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_completion_service.{
@ -59,6 +59,7 @@ import scala.util.Try
private[apiserver] final class ApiCommandService private[services] (
transactionServices: TransactionServices,
submissionTracker: Tracker,
errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
@ -67,6 +68,8 @@ private[apiserver] final class ApiCommandService private[services] (
private val logger = ContextualizedLogger.get(this.getClass)
private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)
@volatile private var running = true
override def close(): Unit = {
@ -93,7 +96,7 @@ private[apiserver] final class ApiCommandService private[services] (
submissionTracker.track(CommandSubmission(commands, timeout))
} else {
Future.failed(
ErrorFactories.serviceNotRunning(definiteAnswer = Some(false))(contextualizedErrorLogger)
errorFactories.serviceNotRunning(definiteAnswer = Some(false))(contextualizedErrorLogger)
)
}.andThen(logger.logErrorsOnCall[Completion])
}
@ -179,6 +182,7 @@ private[apiserver] object ApiCommandService {
timeProvider: TimeProvider,
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
metrics: Metrics,
errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
)(implicit
materializer: Materializer,
executionContext: ExecutionContext,
@ -191,7 +195,8 @@ private[apiserver] object ApiCommandService {
trackerCleanupInterval,
)
new GrpcCommandService(
service = new ApiCommandService(transactionServices, submissionTracker),
service =
new ApiCommandService(transactionServices, submissionTracker, errorCodesVersionSwitcher),
ledgerId = configuration.ledgerId,
currentLedgerTime = () => timeProvider.getCurrentTime,
currentUtcTime = () => Instant.now,

View File

@ -3,10 +3,11 @@
package com.daml.platform.apiserver.services
import com.daml.error.ErrorCodesVersionSwitcher
import java.time.{Duration, Instant}
import java.util.UUID
import java.util.concurrent.TimeUnit
import com.daml.grpc.{GrpcException, GrpcStatus}
import com.daml.ledger.api.v1.command_service.CommandServiceGrpc.CommandService
import com.daml.ledger.api.v1.command_service.{CommandServiceGrpc, SubmitAndWaitRequest}
@ -36,7 +37,9 @@ class ApiCommandServiceSpec
private implicit val resourceContext: ResourceContext = ResourceContext(executionContext)
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
"the command service" should {
val errorCodesVersionSwitcher = mock[ErrorCodesVersionSwitcher]
s"the command service" should {
val completionSuccess = CompletionResponse.CompletionSuccess(
Completion(
commandId = "command ID",
@ -48,7 +51,10 @@ class ApiCommandServiceSpec
val commands = someCommands()
val submissionTracker = mock[Tracker]
when(
submissionTracker.track(any[CommandSubmission])(any[ExecutionContext], any[LoggingContext])
submissionTracker.track(any[CommandSubmission])(
any[ExecutionContext],
any[LoggingContext],
)
).thenReturn(
Future.successful(
Right(completionSuccess)
@ -56,7 +62,11 @@ class ApiCommandServiceSpec
)
openChannel(
new ApiCommandService(UnimplementedTransactionServices, submissionTracker)
new ApiCommandService(
UnimplementedTransactionServices,
submissionTracker,
errorCodesVersionSwitcher,
)
).use { stub =>
val request = SubmitAndWaitRequest.of(Some(commands))
stub.submitAndWaitForTransactionId(request).map { response =>
@ -64,6 +74,7 @@ class ApiCommandServiceSpec
verify(submissionTracker).track(
eqTo(CommandSubmission(commands))
)(any[ExecutionContext], any[LoggingContext])
verifyZeroInteractions(errorCodesVersionSwitcher)
succeed
}
}
@ -79,7 +90,10 @@ class ApiCommandServiceSpec
val commands = someCommands()
val submissionTracker = mock[Tracker]
when(
submissionTracker.track(any[CommandSubmission])(any[ExecutionContext], any[LoggingContext])
submissionTracker.track(any[CommandSubmission])(
any[ExecutionContext],
any[LoggingContext],
)
).thenReturn(
Future.successful(
Right(completionSuccess)
@ -87,7 +101,11 @@ class ApiCommandServiceSpec
)
openChannel(
new ApiCommandService(UnimplementedTransactionServices, submissionTracker),
new ApiCommandService(
UnimplementedTransactionServices,
submissionTracker,
errorCodesVersionSwitcher,
),
deadlineTicker,
).use { stub =>
val request = SubmitAndWaitRequest.of(Some(commands))
@ -99,6 +117,7 @@ class ApiCommandServiceSpec
verify(submissionTracker).track(
eqTo(CommandSubmission(commands, timeout = Some(Duration.ofSeconds(30))))
)(any[ExecutionContext], any[LoggingContext])
verifyZeroInteractions(errorCodesVersionSwitcher)
succeed
}
}
@ -108,7 +127,10 @@ class ApiCommandServiceSpec
val commands = someCommands()
val submissionTracker = mock[Tracker]
when(
submissionTracker.track(any[CommandSubmission])(any[ExecutionContext], any[LoggingContext])
submissionTracker.track(any[CommandSubmission])(
any[ExecutionContext],
any[LoggingContext],
)
).thenReturn(
Future.successful(
Left(
@ -119,20 +141,32 @@ class ApiCommandServiceSpec
)
)
openChannel(new ApiCommandService(UnimplementedTransactionServices, submissionTracker)).use {
stub =>
val request = SubmitAndWaitRequest.of(Some(commands))
stub.submitAndWaitForTransactionId(request).failed.map { exception =>
exception should matchPattern { case GrpcException(GrpcStatus.ABORTED(), _) => }
}
openChannel(
new ApiCommandService(
UnimplementedTransactionServices,
submissionTracker,
errorCodesVersionSwitcher,
)
).use { stub =>
val request = SubmitAndWaitRequest.of(Some(commands))
stub.submitAndWaitForTransactionId(request).failed.map { exception =>
exception should matchPattern { case GrpcException(GrpcStatus.ABORTED(), _) => }
verifyZeroInteractions(errorCodesVersionSwitcher)
succeed
}
}
}
"close the supplied tracker when closed" in {
val submissionTracker = mock[Tracker]
val service = new ApiCommandService(UnimplementedTransactionServices, submissionTracker)
val service = new ApiCommandService(
UnimplementedTransactionServices,
submissionTracker,
errorCodesVersionSwitcher,
)
verifyZeroInteractions(submissionTracker)
verifyZeroInteractions(errorCodesVersionSwitcher)
service.close()
verify(submissionTracker).close()