CommandService returns useful data for successful submissions (#875)

Submitting a command via the CommandService now returns either the
transaction id (SubmitAndWaitForTransactionId), the flat transaction
(SubmitAndWaitForTransactionResponse), or the transaction tree
(SubmitAndWaitForTransactionTreeResponse).

This means that users don't have to wade through the transaction stream
to retrieve the resulting transaction. This is particularly useful in
combination with #479.

Fixes #406
This commit is contained in:
Gerolf Seitz 2019-05-03 16:01:41 +02:00 committed by GitHub
parent 7cf02b8579
commit ecf6ece8c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 578 additions and 194 deletions

View File

@ -9,6 +9,15 @@ This page contains release notes for the SDK.
HEAD — ongoing
--------------
- Ledger API: Added three new methods to :ref:`CommandService <com.digitalasset.ledger.api.v1.commandservice>`:
- ``SubmitAndWaitForTransactionId`` returns the transaction id.
- ``SubmitAndWaitForTransaction`` returns the transaction.
- ``SubmitAndWaitForTransactionTree`` returns the transaction tree.
- Ledger API: Added field ``transaction_id`` to command completions. It is only set by the ledger
with the id of the transaction for a successful command.
0.12.14 - 2019-05-03
--------------------

View File

@ -4,6 +4,8 @@
package com.daml.ledger.rxjava;
import com.daml.ledger.javaapi.data.Command;
import com.daml.ledger.javaapi.data.Transaction;
import com.daml.ledger.javaapi.data.TransactionTree;
import com.google.protobuf.Empty;
import io.reactivex.Single;
import org.checkerframework.checker.nullness.qual.NonNull;
@ -19,4 +21,17 @@ public interface CommandClient {
Single<Empty> submitAndWait(@NonNull String workflowId, @NonNull String applicationId,
@NonNull String commandId, @NonNull String party, @NonNull Instant ledgerEffectiveTime,
@NonNull Instant maximumRecordTime, @NonNull List<@NonNull Command> commands);
Single<String> submitAndWaitForTransactionId(@NonNull String workflowId, @NonNull String applicationId,
@NonNull String commandId, @NonNull String party, @NonNull Instant ledgerEffectiveTime,
@NonNull Instant maximumRecordTime, @NonNull List<@NonNull Command> commands);
Single<Transaction> submitAndWaitForTransaction(@NonNull String workflowId, @NonNull String applicationId,
@NonNull String commandId, @NonNull String party, @NonNull Instant ledgerEffectiveTime,
@NonNull Instant maximumRecordTime, @NonNull List<@NonNull Command> commands);
Single<TransactionTree> submitAndWaitForTransactionTree(@NonNull String workflowId, @NonNull String applicationId,
@NonNull String commandId, @NonNull String party, @NonNull Instant ledgerEffectiveTime,
@NonNull Instant maximumRecordTime, @NonNull List<@NonNull Command> commands);
}

View File

@ -3,14 +3,15 @@
package com.daml.ledger.rxjava.grpc;
import com.daml.ledger.rxjava.CommandClient;
import com.daml.ledger.javaapi.data.Command;
import com.daml.ledger.javaapi.data.SubmitAndWaitRequest;
import com.daml.ledger.javaapi.data.Transaction;
import com.daml.ledger.javaapi.data.TransactionTree;
import com.daml.ledger.rxjava.CommandClient;
import com.digitalasset.ledger.api.v1.CommandServiceGrpc;
import com.digitalasset.ledger.api.v1.CommandServiceOuterClass;
import com.google.protobuf.Empty;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.reactivex.Single;
import org.checkerframework.checker.nullness.qual.NonNull;
@ -35,4 +36,36 @@ public class CommandClientImpl implements CommandClient {
workflowId, applicationId, commandId, party, ledgerEffectiveTime, maximumRecordTime, commands);
return Single.fromFuture(serviceStub.submitAndWait(request));
}
@Override
public Single<String> submitAndWaitForTransactionId(@NonNull String workflowId, @NonNull String applicationId,
@NonNull String commandId, @NonNull String party, @NonNull Instant ledgerEffectiveTime,
@NonNull Instant maximumRecordTime, @NonNull List<@NonNull Command> commands) {
CommandServiceOuterClass.SubmitAndWaitRequest request = SubmitAndWaitRequest.toProto(this.ledgerId,
workflowId, applicationId, commandId, party, ledgerEffectiveTime, maximumRecordTime, commands);
return Single.fromFuture(serviceStub.submitAndWaitForTransactionId(request))
.map(CommandServiceOuterClass.SubmitAndWaitForTransactionIdResponse::getTransactionId);
}
@Override
public Single<Transaction> submitAndWaitForTransaction(@NonNull String workflowId, @NonNull String applicationId,
@NonNull String commandId, @NonNull String party, @NonNull Instant ledgerEffectiveTime,
@NonNull Instant maximumRecordTime, @NonNull List<@NonNull Command> commands) {
CommandServiceOuterClass.SubmitAndWaitRequest request = SubmitAndWaitRequest.toProto(this.ledgerId,
workflowId, applicationId, commandId, party, ledgerEffectiveTime, maximumRecordTime, commands);
return Single.fromFuture(serviceStub.submitAndWaitForTransaction(request))
.map(CommandServiceOuterClass.SubmitAndWaitForTransactionResponse::getTransaction)
.map(Transaction::fromProto);
}
@Override
public Single<TransactionTree> submitAndWaitForTransactionTree(@NonNull String workflowId, @NonNull String applicationId,
@NonNull String commandId, @NonNull String party, @NonNull Instant ledgerEffectiveTime,
@NonNull Instant maximumRecordTime, @NonNull List<@NonNull Command> commands) {
CommandServiceOuterClass.SubmitAndWaitRequest request = SubmitAndWaitRequest.toProto(this.ledgerId,
workflowId, applicationId, commandId, party, ledgerEffectiveTime, maximumRecordTime, commands);
return Single.fromFuture(serviceStub.submitAndWaitForTransactionTree(request))
.map(CommandServiceOuterClass.SubmitAndWaitForTransactionTreeResponse::getTransaction)
.map(TransactionTree::fromProto);
}
}

View File

@ -12,6 +12,11 @@ import com.daml.ledger.javaapi.data.{Command, CreateCommand, Identifier, Record}
import com.daml.ledger.rxjava.grpc.helpers._
import com.daml.ledger.testkit.services._
import com.digitalasset.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.digitalasset.ledger.api.v1.command_service.{
SubmitAndWaitForTransactionIdResponse,
SubmitAndWaitForTransactionResponse,
SubmitAndWaitForTransactionTreeResponse
}
import com.digitalasset.ledger.api.v1.ledger_configuration_service.GetLedgerConfigurationResponse
import com.digitalasset.ledger.api.v1.package_service._
import com.google.protobuf.ByteString
@ -214,6 +219,9 @@ class DamlLedgerClientTest extends FlatSpec with Matchers with OptionValues with
List(CompletionStreamResponse(None, Seq())),
genCompletionEndResponse("completionEndResponse"),
Future.successful(Empty.defaultInstance),
Future.successful(SubmitAndWaitForTransactionIdResponse.defaultInstance),
Future.successful(SubmitAndWaitForTransactionResponse.defaultInstance),
Future.successful(SubmitAndWaitForTransactionTreeResponse.defaultInstance),
List(genGetTimeResponse),
Seq(GetLedgerConfigurationResponse.defaultInstance),
Future.successful(ListPackagesResponse(Seq("id1"))),

View File

@ -9,15 +9,21 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.{Collections, Optional}
import com.daml.ledger.javaapi.data.{Unit => DAMLUnit, _}
import com.daml.ledger.rxjava.components.LedgerViewFlowable.LedgerView
import com.daml.ledger.rxjava.components.helpers.{CommandsAndPendingSet, CreatedContract}
import com.daml.ledger.rxjava.components.tests.helpers.DummyLedgerClient
import com.daml.ledger.javaapi.data.{Unit => DAMLUnit, _}
import com.daml.ledger.rxjava.grpc.helpers.{DataLayerHelpers, LedgerServices}
import com.daml.ledger.rxjava.{CommandSubmissionClient, DamlLedgerClient}
import com.daml.ledger.testkit.services.TransactionServiceImpl
import com.digitalasset.ledger.api.v1.command_service.{
SubmitAndWaitForTransactionIdResponse,
SubmitAndWaitForTransactionResponse,
SubmitAndWaitForTransactionTreeResponse
}
import com.digitalasset.ledger.api.{v1 => scalaAPI}
import com.google.protobuf.Empty
import com.google.protobuf.{Empty => JEmpty}
import com.google.protobuf.empty.Empty
import com.google.rpc.Status
import io.grpc.Metadata
import io.grpc.Status.Code
@ -132,7 +138,7 @@ class BotTest extends FlatSpec with Matchers with DataLayerHelpers {
party: String,
ledgerEffectiveTime: Instant,
maximumRecordTime: Instant,
commands: util.List[Command]): Single[Empty] = {
commands: util.List[Command]): Single[JEmpty] = {
submitted.append(
new SubmitCommandsRequest(
workflowId,
@ -371,7 +377,10 @@ class BotTest extends FlatSpec with Matchers with DataLayerHelpers {
Future.successful(com.google.protobuf.empty.Empty.defaultInstance),
List.empty,
scalaAPI.command_completion_service.CompletionEndResponse.defaultInstance,
Future.successful(com.google.protobuf.empty.Empty.defaultInstance),
Future.successful(Empty.defaultInstance),
Future.successful(SubmitAndWaitForTransactionIdResponse.defaultInstance),
Future.successful(SubmitAndWaitForTransactionResponse.defaultInstance),
Future.successful(SubmitAndWaitForTransactionTreeResponse.defaultInstance),
List.empty,
Seq.empty,
Future.successful(scalaAPI.package_service.ListPackagesResponse.defaultInstance),

View File

@ -7,6 +7,11 @@ import java.util.concurrent.TimeUnit
import com.daml.ledger.javaapi.data.{CreateCommand, Identifier, Record}
import com.daml.ledger.rxjava.grpc.helpers.{DataLayerHelpers, LedgerServices, TestConfiguration}
import com.digitalasset.ledger.api.v1.command_service.{
SubmitAndWaitForTransactionIdResponse,
SubmitAndWaitForTransactionResponse,
SubmitAndWaitForTransactionTreeResponse
}
import com.google.protobuf.empty.Empty
import org.scalatest.{FlatSpec, Matchers, OptionValues}
@ -18,69 +23,76 @@ class CommandClientImplTest extends FlatSpec with Matchers with OptionValues wit
val ledgerServices = new LedgerServices("command-service-ledger")
private val withCommandclient = {
ledgerServices.withCommandClient(
Future.successful(Empty.defaultInstance),
Future.successful(SubmitAndWaitForTransactionIdResponse.defaultInstance),
Future.successful(SubmitAndWaitForTransactionResponse.defaultInstance),
Future.successful(SubmitAndWaitForTransactionTreeResponse.defaultInstance)
) _
}
behavior of "[2.1] CommandClientImpl.submitAndWait"
it should "send the given command to the Ledger" in {
ledgerServices.withCommandClient(Future.successful(Empty.defaultInstance)) {
(client, service) =>
val commands = genCommands(List.empty)
client
.submitAndWait(
commands.getWorkflowId,
commands.getApplicationId,
commands.getCommandId,
commands.getParty,
commands.getLedgerEffectiveTime,
commands.getMaximumRecordTime,
commands.getCommands
)
.timeout(TestConfiguration.timeoutInSeconds, TimeUnit.SECONDS)
.blockingGet()
service.getLastRequest.value.getCommands.commands shouldBe empty
withCommandclient { (client, service) =>
val commands = genCommands(List.empty)
client
.submitAndWait(
commands.getWorkflowId,
commands.getApplicationId,
commands.getCommandId,
commands.getParty,
commands.getLedgerEffectiveTime,
commands.getMaximumRecordTime,
commands.getCommands
)
.timeout(TestConfiguration.timeoutInSeconds, TimeUnit.SECONDS)
.blockingGet()
service.getLastRequest.value.getCommands.commands shouldBe empty
}
}
behavior of "[2.2] CommandClientImpl.submitAndWait"
it should "send the given command with the correct parameters" in {
ledgerServices.withCommandClient(Future.successful(Empty.defaultInstance)) {
(client, service) =>
val recordId = new Identifier("recordPackageId", "recordModuleName", "recordEntityName")
val record = new Record(recordId, List.empty[Record.Field].asJava)
val command = new CreateCommand(new Identifier("a", "a", "b"), record)
val commands = genCommands(List(command))
client
.submitAndWait(
commands.getWorkflowId,
commands.getApplicationId,
commands.getCommandId,
commands.getParty,
commands.getLedgerEffectiveTime,
commands.getMaximumRecordTime,
commands.getCommands
)
.timeout(TestConfiguration.timeoutInSeconds, TimeUnit.SECONDS)
.blockingGet()
service.getLastRequest.value.getCommands.applicationId shouldBe commands.getApplicationId
service.getLastRequest.value.getCommands.commandId shouldBe commands.getCommandId
service.getLastRequest.value.getCommands.party shouldBe commands.getParty
service.getLastRequest.value.getCommands.workflowId shouldBe commands.getWorkflowId
service.getLastRequest.value.getCommands.ledgerId shouldBe ledgerServices.ledgerId
service.getLastRequest.value.getCommands.getMaximumRecordTime.seconds shouldBe commands.getMaximumRecordTime.getEpochSecond
service.getLastRequest.value.getCommands.getMaximumRecordTime.nanos shouldBe commands.getMaximumRecordTime.getNano
service.getLastRequest.value.getCommands.getLedgerEffectiveTime.seconds shouldBe commands.getLedgerEffectiveTime.getEpochSecond
service.getLastRequest.value.getCommands.getLedgerEffectiveTime.nanos shouldBe commands.getLedgerEffectiveTime.getNano
service.getLastRequest.value.getCommands.commands should have size 1
val receivedCommand = service.getLastRequest.value.getCommands.commands.head.command
receivedCommand.isCreate shouldBe true
receivedCommand.isExercise shouldBe false
receivedCommand.create.value.getTemplateId.packageId shouldBe command.getTemplateId.getPackageId
receivedCommand.create.value.getTemplateId.moduleName shouldBe command.getTemplateId.getModuleName
receivedCommand.create.value.getTemplateId.entityName shouldBe command.getTemplateId.getEntityName
receivedCommand.create.value.getCreateArguments.getRecordId.packageId shouldBe recordId.getPackageId
receivedCommand.create.value.getCreateArguments.getRecordId.moduleName shouldBe recordId.getModuleName
receivedCommand.create.value.getCreateArguments.getRecordId.entityName shouldBe recordId.getEntityName
receivedCommand.create.value.getCreateArguments.fields shouldBe empty
withCommandclient { (client, service) =>
val recordId = new Identifier("recordPackageId", "recordModuleName", "recordEntityName")
val record = new Record(recordId, List.empty[Record.Field].asJava)
val command = new CreateCommand(new Identifier("a", "a", "b"), record)
val commands = genCommands(List(command))
client
.submitAndWait(
commands.getWorkflowId,
commands.getApplicationId,
commands.getCommandId,
commands.getParty,
commands.getLedgerEffectiveTime,
commands.getMaximumRecordTime,
commands.getCommands
)
.timeout(TestConfiguration.timeoutInSeconds, TimeUnit.SECONDS)
.blockingGet()
service.getLastRequest.value.getCommands.applicationId shouldBe commands.getApplicationId
service.getLastRequest.value.getCommands.commandId shouldBe commands.getCommandId
service.getLastRequest.value.getCommands.party shouldBe commands.getParty
service.getLastRequest.value.getCommands.workflowId shouldBe commands.getWorkflowId
service.getLastRequest.value.getCommands.ledgerId shouldBe ledgerServices.ledgerId
service.getLastRequest.value.getCommands.getMaximumRecordTime.seconds shouldBe commands.getMaximumRecordTime.getEpochSecond
service.getLastRequest.value.getCommands.getMaximumRecordTime.nanos shouldBe commands.getMaximumRecordTime.getNano
service.getLastRequest.value.getCommands.getLedgerEffectiveTime.seconds shouldBe commands.getLedgerEffectiveTime.getEpochSecond
service.getLastRequest.value.getCommands.getLedgerEffectiveTime.nanos shouldBe commands.getLedgerEffectiveTime.getNano
service.getLastRequest.value.getCommands.commands should have size 1
val receivedCommand = service.getLastRequest.value.getCommands.commands.head.command
receivedCommand.isCreate shouldBe true
receivedCommand.isExercise shouldBe false
receivedCommand.create.value.getTemplateId.packageId shouldBe command.getTemplateId.getPackageId
receivedCommand.create.value.getTemplateId.moduleName shouldBe command.getTemplateId.getModuleName
receivedCommand.create.value.getTemplateId.entityName shouldBe command.getTemplateId.getEntityName
receivedCommand.create.value.getCreateArguments.getRecordId.packageId shouldBe recordId.getPackageId
receivedCommand.create.value.getCreateArguments.getRecordId.moduleName shouldBe recordId.getModuleName
receivedCommand.create.value.getCreateArguments.getRecordId.entityName shouldBe recordId.getEntityName
receivedCommand.create.value.getCreateArguments.fields shouldBe empty
}
}
}

View File

@ -56,8 +56,8 @@ class CommandCompletionImplTest
it should "return a stream with all the completions" in {
val applicationId = "applicationId"
val completion1 = Completion("cid1", Option(new Status(0)), None)
val completion2 = Completion("cid2", Option(new Status(1)), None)
val completion1 = Completion("cid1", Option(new Status(0)), "1", None)
val completion2 = Completion("cid2", Option(new Status(1)), traceContext = None)
val completionResponse = CompletionStreamResponse(None, List(completion1, completion2))
ledgerServices.withCommandCompletionClient(
List(completionResponse),
@ -75,6 +75,7 @@ class CommandCompletionImplTest
val receivedCompletion2 = completions.getCompletions.get(1)
receivedCompletion1.getCommandId shouldBe completion1.commandId
receivedCompletion1.getStatus.getCode shouldBe completion1.getStatus.code
receivedCompletion1.getTransactionId shouldBe completion1.transactionId
receivedCompletion2.getCommandId shouldBe completion2.commandId
receivedCompletion2.getStatus.getCode shouldBe completion2.getStatus.code
}
@ -84,7 +85,7 @@ class CommandCompletionImplTest
it should "send the request with the correct ledgerId" in {
val applicationId = "applicationId"
val completion1 = Completion("cid1", Option(new Status(0)), None)
val completion1 = Completion("cid1", Option(new Status(0)), traceContext = None)
val completionResponse = CompletionStreamResponse(None, List(completion1))
val parties = Set("Alice")
ledgerServices.withCommandCompletionClient(

View File

@ -8,14 +8,19 @@ import java.util.concurrent.TimeUnit
import com.daml.ledger.rxjava.grpc._
import com.daml.ledger.rxjava.{CommandCompletionClient, LedgerConfigurationClient, PackageClient}
import com.daml.ledger.testkit.services._
import com.daml.ledger.testkit.services.TransactionServiceImpl.LedgerItem
import com.daml.ledger.testkit.services._
import com.digitalasset.grpc.adapter.{ExecutionSequencerFactory, SingleThreadExecutionSequencerPool}
import com.digitalasset.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.digitalasset.ledger.api.v1.command_completion_service.{
CompletionEndResponse,
CompletionStreamResponse
}
import com.digitalasset.ledger.api.v1.command_service.{
SubmitAndWaitForTransactionIdResponse,
SubmitAndWaitForTransactionResponse,
SubmitAndWaitForTransactionTreeResponse
}
import com.digitalasset.ledger.api.v1.ledger_configuration_service.GetLedgerConfigurationResponse
import com.digitalasset.ledger.api.v1.package_service.{
GetPackageResponse,
@ -128,9 +133,17 @@ class LedgerServices(val ledgerId: String) {
}
}
def withCommandClient(response: Future[Empty])(
def withCommandClient(
submitAndWaitResponse: Future[Empty],
submitAndWaitForTransactionIdResponse: Future[SubmitAndWaitForTransactionIdResponse],
submitAndWaitForTransactionResponse: Future[SubmitAndWaitForTransactionResponse],
submitAndWaitForTransactionTreeResponse: Future[SubmitAndWaitForTransactionTreeResponse])(
f: (CommandClientImpl, CommandServiceImpl) => Any): Any = {
val (service, serviceImpl) = CommandServiceImpl.createWithRef(response)(executionContext)
val (service, serviceImpl) = CommandServiceImpl.createWithRef(
submitAndWaitResponse,
submitAndWaitForTransactionIdResponse,
submitAndWaitForTransactionResponse,
submitAndWaitForTransactionTreeResponse)(executionContext)
withServerAndChannel(service) { channel =>
f(new CommandClientImpl(ledgerId, channel), serviceImpl)
}
@ -167,7 +180,10 @@ class LedgerServices(val ledgerId: String) {
commandSubmissionResponse: Future[Empty],
completions: List[CompletionStreamResponse],
completionsEnd: CompletionEndResponse,
commandResponse: Future[Empty],
submitAndWaitResponse: Future[Empty],
submitAndWaitForTransactionIdResponse: Future[SubmitAndWaitForTransactionIdResponse],
submitAndWaitForTransactionResponse: Future[SubmitAndWaitForTransactionResponse],
submitAndWaitForTransactionTreeResponse: Future[SubmitAndWaitForTransactionTreeResponse],
getTimeResponses: List[GetTimeResponse],
getLedgerConfigurationResponses: Seq[GetLedgerConfigurationResponse],
listPackagesResponse: Future[ListPackagesResponse],
@ -181,7 +197,10 @@ class LedgerServices(val ledgerId: String) {
commandSubmissionResponse,
completions,
completionsEnd,
commandResponse,
submitAndWaitResponse,
submitAndWaitForTransactionIdResponse,
submitAndWaitForTransactionResponse,
submitAndWaitForTransactionTreeResponse,
getTimeResponses,
getLedgerConfigurationResponses,
listPackagesResponse,

View File

@ -9,6 +9,11 @@ import com.digitalasset.ledger.api.v1.command_completion_service.{
CompletionEndResponse,
CompletionStreamResponse
}
import com.digitalasset.ledger.api.v1.command_service.{
SubmitAndWaitForTransactionIdResponse,
SubmitAndWaitForTransactionResponse,
SubmitAndWaitForTransactionTreeResponse
}
import com.digitalasset.ledger.api.v1.ledger_configuration_service.GetLedgerConfigurationResponse
import com.digitalasset.ledger.api.v1.package_service.{
GetPackageResponse,
@ -42,7 +47,10 @@ object LedgerServicesImpls {
commandSubmissionResponse: Future[Empty],
completions: List[CompletionStreamResponse],
completionsEnd: CompletionEndResponse,
commandResponse: Future[Empty],
submitAndWaitResponse: Future[Empty],
submitAndWaitForTransactionIdResponse: Future[SubmitAndWaitForTransactionIdResponse],
submitAndWaitForTransactionResponse: Future[SubmitAndWaitForTransactionResponse],
submitAndWaitForTransactionTreeResponse: Future[SubmitAndWaitForTransactionTreeResponse],
getTimeResponses: List[GetTimeResponse],
getLedgerConfigurationResponses: Seq[GetLedgerConfigurationResponse],
listPackagesResponse: Future[ListPackagesResponse],
@ -58,7 +66,11 @@ object LedgerServicesImpls {
CommandSubmissionServiceImpl.createWithRef(commandSubmissionResponse)(ec)
val (ccServiceDef, ccService) =
CommandCompletionServiceImpl.createWithRef(completions, completionsEnd)(ec)
val (cServiceDef, cService) = CommandServiceImpl.createWithRef(commandResponse)(ec)
val (cServiceDef, cService) = CommandServiceImpl.createWithRef(
submitAndWaitResponse,
submitAndWaitForTransactionIdResponse,
submitAndWaitForTransactionResponse,
submitAndWaitForTransactionTreeResponse)(ec)
val (lcServiceDef, lcService) =
LedgerConfigurationServiceImpl.createWithRef(getLedgerConfigurationResponses)(ec)
val (timeServiceDef, timeService) = TimeServiceImpl.createWithRef(getTimeResponses: _*)(ec)

View File

@ -4,19 +4,42 @@
package com.daml.ledger.testkit.services
import com.digitalasset.ledger.api.v1.command_service.CommandServiceGrpc.CommandService
import com.digitalasset.ledger.api.v1.command_service.{CommandServiceGrpc, SubmitAndWaitRequest}
import com.digitalasset.ledger.api.v1.command_service._
import com.google.protobuf.empty.Empty
import io.grpc.ServerServiceDefinition
import scala.concurrent.{ExecutionContext, Future}
class CommandServiceImpl(response: Future[Empty]) extends CommandService {
class CommandServiceImpl(
submitAndWaitResponse: Future[Empty],
submitAndWaitForTransactionIdResponse: Future[SubmitAndWaitForTransactionIdResponse],
submitAndWaitForTransactionResponse: Future[SubmitAndWaitForTransactionResponse],
submitAndWaitForTransactionTreeResponse: Future[SubmitAndWaitForTransactionTreeResponse])
extends CommandService {
private var lastRequest: Option[SubmitAndWaitRequest] = None
override def submitAndWait(request: SubmitAndWaitRequest): Future[Empty] = {
this.lastRequest = Some(request)
response
submitAndWaitResponse
}
override def submitAndWaitForTransactionId(
request: SubmitAndWaitRequest): Future[SubmitAndWaitForTransactionIdResponse] = {
this.lastRequest = Some(request)
submitAndWaitForTransactionIdResponse
}
override def submitAndWaitForTransaction(
request: SubmitAndWaitRequest): Future[SubmitAndWaitForTransactionResponse] = {
this.lastRequest = Some(request)
submitAndWaitForTransactionResponse
}
override def submitAndWaitForTransactionTree(
request: SubmitAndWaitRequest): Future[SubmitAndWaitForTransactionTreeResponse] = {
this.lastRequest = Some(request)
submitAndWaitForTransactionTreeResponse
}
def getLastRequest: Option[SubmitAndWaitRequest] = this.lastRequest
@ -24,9 +47,17 @@ class CommandServiceImpl(response: Future[Empty]) extends CommandService {
object CommandServiceImpl {
def createWithRef(response: Future[Empty])(
def createWithRef(
submitAndWaitResponse: Future[Empty],
submitAndWaitForTransactionIdResponse: Future[SubmitAndWaitForTransactionIdResponse],
submitAndWaitForTransactionResponse: Future[SubmitAndWaitForTransactionResponse],
submitAndWaitForTransactionTreeResponse: Future[SubmitAndWaitForTransactionTreeResponse])(
implicit ec: ExecutionContext): (ServerServiceDefinition, CommandServiceImpl) = {
val serviceImpl = new CommandServiceImpl(response)
val serviceImpl = new CommandServiceImpl(
submitAndWaitResponse,
submitAndWaitForTransactionIdResponse,
submitAndWaitForTransactionResponse,
submitAndWaitForTransactionTreeResponse)
(CommandServiceGrpc.bindService(serviceImpl, ec), serviceImpl)
}
}

View File

@ -66,7 +66,7 @@ object CommandRetryFlow {
outputPorts = 2, {
case Ctx(
RetryInfo(request, nrOfRetries, firstSubmissionTime, _),
Completion(_, Some(status: Status), _)) =>
Completion(_, Some(status: Status), _, _)) =>
if (status.code == Code.OK_VALUE) {
PROPAGATE_PORT
} else if ((firstSubmissionTime plus maxRetryTime) isBefore timeProvider.getCurrentTime) {
@ -79,7 +79,7 @@ object CommandRetryFlow {
RetryLogger.logNonFatal(request, status, nrOfRetries)
RETRY_PORT
}
case Ctx(_, Completion(commandId, _, _)) =>
case Ctx(_, Completion(commandId, _, _, _)) =>
statusNotFoundError(commandId)
}
))

View File

@ -37,9 +37,11 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest {
context @ RetryInfo(_, _, _, status),
SubmitRequest(Some(Commands(_, _, _, commandId, _, let, _, _)), tc)) =>
if (let.get.seconds == 0) {
Ctx(context, Completion(commandId, Some(status), tc))
Ctx(context, Completion(commandId, Some(status), traceContext = tc))
} else {
Ctx(context, Completion(commandId, Some(status.copy(code = Code.OK_VALUE)), tc))
Ctx(
context,
Completion(commandId, Some(status.copy(code = Code.OK_VALUE)), traceContext = tc))
}
case x =>
throw new RuntimeException(s"Unexpected input: '$x'")

View File

@ -7,10 +7,10 @@ package com.digitalasset.ledger.api.v1;
import "com/digitalasset/ledger/api/v1/commands.proto";
import "com/digitalasset/ledger/api/v1/trace_context.proto";
import "com/digitalasset/ledger/api/v1/transaction.proto";
import "google/protobuf/empty.proto";
option java_outer_classname = "CommandServiceOuterClass";
option java_package = "com.digitalasset.ledger.api.v1";
@ -22,6 +22,21 @@ service CommandService {
// Returns ``RESOURCE_EXHAUSTED`` if the number of in-flight commands reached the maximum (if a limit is configured).
// Propagates the gRPC error of failed submissions including DAML interpretation errors.
rpc SubmitAndWait (SubmitAndWaitRequest) returns (google.protobuf.Empty);
// Submits a single composite command, waits for its result, and returns the transaction id.
// Returns ``RESOURCE_EXHAUSTED`` if the number of in-flight commands reached the maximum (if a limit is configured).
// Propagates the gRPC error of failed submissions including DAML interpretation errors.
rpc SubmitAndWaitForTransactionId (SubmitAndWaitRequest) returns (SubmitAndWaitForTransactionIdResponse);
// Submits a single composite command, waits for its result, and returns the transaction.
// Returns ``RESOURCE_EXHAUSTED`` if the number of in-flight commands reached the maximum (if a limit is configured).
// Propagates the gRPC error of failed submissions including DAML interpretation errors.
rpc SubmitAndWaitForTransaction (SubmitAndWaitRequest) returns (SubmitAndWaitForTransactionResponse);
// Submits a single composite command, waits for its result, and returns the transaction tree.
// Returns ``RESOURCE_EXHAUSTED`` if the number of in-flight commands reached the maximum (if a limit is configured).
// Propagates the gRPC error of failed submissions including DAML interpretation errors.
rpc SubmitAndWaitForTransactionTree (SubmitAndWaitRequest) returns (SubmitAndWaitForTransactionTreeResponse);
}
// These commands are atomic, and will become transactions.
@ -37,3 +52,21 @@ message SubmitAndWaitRequest {
TraceContext trace_context = 1000;
}
message SubmitAndWaitForTransactionIdResponse {
// The id of the transaction that resulted from the submitted command.
// Required
string transaction_id = 1;
}
message SubmitAndWaitForTransactionResponse {
// The flat transaction that resulted from the submitted command.
// Required
Transaction transaction = 1;
}
message SubmitAndWaitForTransactionTreeResponse {
// The transaction tree that resulted from the submitted command.
// Required
TransactionTree transaction = 1;
}

View File

@ -24,6 +24,11 @@ message Completion {
// Optional
google.rpc.Status status = 2;
// The transaction_id of the transaction that resulted from the command with command_id.
// Only set for successfully executed commands.
// Optional
string transaction_id = 3;
// The trace context submitted with the command.
// This field is a future extension point and is currently not supported.
// Optional

View File

@ -126,7 +126,9 @@ object Server {
.asInstanceOf[DamlOnXCommandCompletionService]
.completionStreamSource(r),
() =>
commandCompletionService.completionEnd(CompletionEndRequest(ledgerId.underlyingString))
commandCompletionService.completionEnd(CompletionEndRequest(ledgerId.underlyingString)),
transactionService.getTransactionById,
transactionService.getFlatTransactionById
)
)

View File

@ -77,12 +77,12 @@ class DamlOnXCommandCompletionService private (indexService: IndexService)(
.fromFuture(compsFuture)
.flatMapConcat(src => {
src.map {
case CompletionEvent.CommandAccepted(offset, commandId) =>
case CompletionEvent.CommandAccepted(offset, commandId, transactionId) =>
logger.debug(s"sending completion accepted $offset: $commandId")
CompletionStreamResponse(
None, // FIXME(JM): is the checkpoint present in each response?
List(Completion(commandId, Some(Status())))
List(Completion(commandId, Some(Status()), transactionId))
)
case CompletionEvent.CommandRejected(offset, commandId, reason) =>
logger.debug(s"sending completion rejected $offset: $commandId: $reason")
@ -124,7 +124,7 @@ class DamlOnXCommandCompletionService private (indexService: IndexService)(
Code.INVALID_ARGUMENT
case RejectionReason.PartyNotKnownOnLedger => Code.INVALID_ARGUMENT
}
Completion(commandId, Some(Status(code.value(), error.description)), None)
Completion(commandId, Some(Status(code.value(), error.description)), traceContext = None)
}
override def close(): Unit = {

View File

@ -235,7 +235,7 @@ private[commands] class CommandTracker[Context]
trackingData.commandId,
Some(
com.google.rpc.status.Status(RpcStatus.ABORTED.getCode.value(), "Timeout")),
trackingData.traceContext)
traceContext = trackingData.traceContext)
))
} else {
Nil
@ -276,7 +276,7 @@ private[commands] class CommandTracker[Context]
Some(
Ctx(
trackingData.context,
Completion(commandId, Some(status), trackingData.traceContext)))
Completion(commandId, Some(status), traceContext = trackingData.traceContext)))
}
}

View File

@ -196,7 +196,7 @@ class CommandTrackerFlowTest
Completion(
commandId,
Some(Status(Code.RESOURCE_EXHAUSTED.value)),
submitRequest.value.traceContext)
traceContext = submitRequest.value.traceContext)
results.expectNext(Ctx(context, failureCompletion))
succeed
@ -214,7 +214,10 @@ class CommandTrackerFlowTest
results.expectNoMessage(3.seconds)
val completion =
Completion(commandId, Some(Status(Code.ABORTED.value)), submitRequest.value.traceContext)
Completion(
commandId,
Some(Status(Code.ABORTED.value)),
traceContext = submitRequest.value.traceContext)
completionStreamMock.send(CompletionStreamElement.CompletionElement(completion))
results.requestNext().value shouldEqual completion
succeed
@ -230,7 +233,10 @@ class CommandTrackerFlowTest
submissions.sendNext(submitRequest)
val completion =
Completion(commandId, Some(Status(Code.ABORTED.value)), submitRequest.value.traceContext)
Completion(
commandId,
Some(Status(Code.ABORTED.value)),
traceContext = submitRequest.value.traceContext)
completionStreamMock.send(CompletionStreamElement.CompletionElement(completion))
results.requestNext().value shouldEqual completion
}
@ -317,7 +323,7 @@ class CommandTrackerFlowTest
Completion(
commandId,
Some(Status(Code.INVALID_ARGUMENT.value)),
submitRequest.value.traceContext)
traceContext = submitRequest.value.traceContext)
completionStreamMock.send(CompletionStreamElement.CompletionElement(failureCompletion))
results.expectNext(Ctx(context, failureCompletion))
@ -347,7 +353,7 @@ class CommandTrackerFlowTest
results.expectNextUnorderedN(commandIds.map { commandId =>
val successCompletion =
Completion(commandId, Some(Status()), submitRequest.value.traceContext)
Completion(commandId, Some(Status()), traceContext = submitRequest.value.traceContext)
Ctx(context, successCompletion)
})
succeed
@ -375,7 +381,12 @@ class CommandTrackerFlowTest
_ <- completionStreamMock.send(successfulCompletion(commandId))
_ = results.request(1)
_ = results.expectNext(
Ctx(context, Completion(commandId, Some(Status()), submitRequest.value.traceContext)))
Ctx(
context,
Completion(
commandId,
Some(Status()),
traceContext = submitRequest.value.traceContext)))
} yield ()
}

View File

@ -4,7 +4,7 @@
package com.digitalasset.platform.server.api.validation
import com.digitalasset.ledger.api.v1.command_service.CommandServiceGrpc.CommandService
import com.digitalasset.ledger.api.v1.command_service.{CommandServiceGrpc, SubmitAndWaitRequest}
import com.digitalasset.ledger.api.v1.command_service._
import com.digitalasset.platform.api.grpc.GrpcApiService
import com.digitalasset.platform.common.util.DirectExecutionContext
import com.digitalasset.platform.server.api.ProxyCloseable
@ -31,6 +31,24 @@ class CommandServiceValidation(
validation.fold(Future.failed, _ => service.submitAndWait(request))
}
override def submitAndWaitForTransactionId(
request: SubmitAndWaitRequest): Future[SubmitAndWaitForTransactionIdResponse] = {
val validation = requirePresence(request.commands, "commands").flatMap(validateCommands)
validation.fold(Future.failed, _ => service.submitAndWaitForTransactionId(request))
}
override def submitAndWaitForTransaction(
request: SubmitAndWaitRequest): Future[SubmitAndWaitForTransactionResponse] = {
val validation = requirePresence(request.commands, "commands").flatMap(validateCommands)
validation.fold(Future.failed, _ => service.submitAndWaitForTransaction(request))
}
override def submitAndWaitForTransactionTree(
request: SubmitAndWaitRequest): Future[SubmitAndWaitForTransactionTreeResponse] = {
val validation = requirePresence(request.commands, "commands").flatMap(validateCommands)
validation.fold(Future.failed, _ => service.submitAndWaitForTransactionTree(request))
}
override def bindService(): ServerServiceDefinition =
CommandServiceGrpc.bindService(this, DirectExecutionContext)

View File

@ -16,7 +16,14 @@ import com.digitalasset.ledger.api.v1.command_completion_service.{
}
import com.digitalasset.ledger.api.v1.command_service._
import com.digitalasset.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.CommandSubmissionService
import com.digitalasset.ledger.api.v1.command_submission_service.{SubmitRequest}
import com.digitalasset.ledger.api.v1.command_submission_service.SubmitRequest
import com.digitalasset.ledger.api.v1.completion.Completion
import com.digitalasset.ledger.api.v1.transaction_service.TransactionServiceGrpc.TransactionService
import com.digitalasset.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
GetTransactionByIdRequest,
GetTransactionResponse
}
import com.digitalasset.ledger.client.configuration.CommandClientConfiguration
import com.digitalasset.ledger.client.services.commands.{
CommandClient,
@ -85,61 +92,113 @@ class ReferenceCommandService private (
)
@SuppressWarnings(Array("org.wartremover.warts.Any"))
override def submitAndWait(request: SubmitAndWaitRequest): Future[Empty] = {
private def submitAndWaitInternal(request: SubmitAndWaitRequest): Future[Completion] = {
val appId = request.getCommands.applicationId
val submitter = TrackerMap.Key(application = appId, party = request.getCommands.party)
if (running) {
submissionTracker.track(submitter, request) {
for {
trackingFlow <- {
lowLevelCommandServiceAccess match {
case LowLevelCommandServiceAccess.RemoteServices(submission, completion) =>
val client = commandClient(appId, submission, completion)
if (configuration.limitMaxCommandsInFlight)
client.trackCommands[Promise[Empty]](List(submitter.party))
else
client.trackCommandsUnbounded[Promise[Empty]](List(submitter.party))
case LowLevelCommandServiceAccess.LocalServices(
submissionFlow,
getCompletionSource,
getCompletionEnd) =>
for {
ledgerEnd <- getCompletionEnd().map(_.getOffset)
} yield {
val tracker =
CommandTrackerFlow[Promise[Empty], NotUsed](
submissionFlow,
offset =>
getCompletionSource(
CompletionStreamRequest(
configuration.ledgerId,
appId,
List(submitter.party),
Some(offset)))
.mapConcat(CommandCompletionSource.toStreamElements),
ledgerEnd
)
submissionTracker
.track(submitter, request) {
for {
trackingFlow <- {
lowLevelCommandServiceAccess match {
case LowLevelCommandServiceAccess.RemoteServices(submission, completion, _) =>
val client = commandClient(appId, submission, completion)
if (configuration.limitMaxCommandsInFlight)
MaxInFlight(configuration.maxCommandsInFlight).joinMat(tracker)(Keep.right)
else tracker
}
client.trackCommands[Promise[Completion]](List(submitter.party))
else
client.trackCommandsUnbounded[Promise[Completion]](List(submitter.party))
case LowLevelCommandServiceAccess.LocalServices(
submissionFlow,
getCompletionSource,
getCompletionEnd,
_,
_) =>
for {
ledgerEnd <- getCompletionEnd().map(_.getOffset)
} yield {
val tracker =
CommandTrackerFlow[Promise[Completion], NotUsed](
submissionFlow,
offset =>
getCompletionSource(
CompletionStreamRequest(
configuration.ledgerId,
appId,
List(submitter.party),
Some(offset)))
.mapConcat(CommandCompletionSource.toStreamElements),
ledgerEnd
)
if (configuration.limitMaxCommandsInFlight)
MaxInFlight(configuration.maxCommandsInFlight).joinMat(tracker)(Keep.right)
else tracker
}
}
}
} yield {
TrackerImpl(trackingFlow, configuration.inputBufferSize, configuration.historySize)
}
} yield {
TrackerImpl(trackingFlow, configuration.inputBufferSize, configuration.historySize)
}
}
} else {
Future.failed(
new ApiException(Status.UNAVAILABLE.withDescription("Service has been shut down.")))
}
}
override def submitAndWait(request: SubmitAndWaitRequest): Future[Empty] = {
submitAndWaitInternal(request).map(_ => Empty.defaultInstance)
}
override def submitAndWaitForTransactionId(
request: SubmitAndWaitRequest): Future[SubmitAndWaitForTransactionIdResponse] = {
submitAndWaitInternal(request).map { compl =>
SubmitAndWaitForTransactionIdResponse(compl.transactionId)
}
}
override def submitAndWaitForTransaction(
request: SubmitAndWaitRequest): Future[SubmitAndWaitForTransactionResponse] = {
submitAndWaitInternal(request).flatMap { resp =>
val txRequest = GetTransactionByIdRequest(
request.getCommands.ledgerId,
resp.transactionId,
List(request.getCommands.party))
flatById(txRequest).map(resp => SubmitAndWaitForTransactionResponse(resp.transaction))
}
}
override def submitAndWaitForTransactionTree(
request: SubmitAndWaitRequest): Future[SubmitAndWaitForTransactionTreeResponse] = {
submitAndWaitInternal(request).flatMap { resp =>
val txRequest = GetTransactionByIdRequest(
request.getCommands.ledgerId,
resp.transactionId,
List(request.getCommands.party))
treeById(txRequest).map(resp => SubmitAndWaitForTransactionTreeResponse(resp.transaction))
}
}
override def toString: String = ReferenceCommandService.getClass.getSimpleName
private val (treeById, flatById) = {
lowLevelCommandServiceAccess match {
case LowLevelCommandServiceAccess.RemoteServices(_, _, transaction) =>
(transaction.getTransactionById _, transaction.getFlatTransactionById _)
case LowLevelCommandServiceAccess.LocalServices(
_,
_,
_,
getTransactionById,
getFlatTransactionById) =>
(getTransactionById, getFlatTransactionById)
}
}
}
object ReferenceCommandService {
@ -170,16 +229,19 @@ object ReferenceCommandService {
final case class RemoteServices(
submissionStub: CommandSubmissionService,
completionStub: CommandCompletionService)
completionStub: CommandCompletionService,
transactionService: TransactionService)
extends LowLevelCommandServiceAccess
final case class LocalServices(
submissionFlow: Flow[
Ctx[(Promise[Empty], String), SubmitRequest],
Ctx[(Promise[Empty], String), Try[Empty]],
Ctx[(Promise[Completion], String), SubmitRequest],
Ctx[(Promise[Completion], String), Try[Empty]],
NotUsed],
getCompletionSource: CompletionStreamRequest => Source[CompletionStreamResponse, NotUsed],
getCompletionEnd: () => Future[CompletionEndResponse])
getCompletionEnd: () => Future[CompletionEndResponse],
getTransactionById: GetTransactionByIdRequest => Future[GetTransactionResponse],
getFlatTransactionById: GetTransactionByIdRequest => Future[GetFlatTransactionResponse])
extends LowLevelCommandServiceAccess
}

View File

@ -4,13 +4,13 @@
package com.digitalasset.platform.server.services.command
import com.digitalasset.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.google.protobuf.empty.Empty
import com.digitalasset.ledger.api.v1.completion.Completion
import scala.concurrent.{ExecutionContext, Future}
trait Tracker extends AutoCloseable {
def track(request: SubmitAndWaitRequest)(implicit ec: ExecutionContext): Future[Empty]
def track(request: SubmitAndWaitRequest)(implicit ec: ExecutionContext): Future[Completion]
}
object Tracker {
@ -24,7 +24,7 @@ object Tracker {
def getLastSubmission: Long = lastSubmission
override def track(request: SubmitAndWaitRequest)(
implicit ec: ExecutionContext): Future[Empty] = {
implicit ec: ExecutionContext): Future[Completion] = {
lastSubmission = System.nanoTime()
delegate.track(request)
}

View File

@ -13,10 +13,9 @@ import com.digitalasset.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.digitalasset.ledger.api.v1.command_submission_service.SubmitRequest
import com.digitalasset.ledger.api.v1.completion.Completion
import com.digitalasset.ledger.client.services.commands.CommandTrackerFlow.Materialized
import com.digitalasset.platform.server.api.ApiException
import com.digitalasset.platform.common.util.DirectExecutionContext
import com.digitalasset.platform.server.api.ApiException
import com.digitalasset.util.Ctx
import com.google.protobuf.empty.Empty
import com.google.rpc.status.Status
import io.grpc.{Status => GrpcStatus}
import org.slf4j.LoggerFactory
@ -37,14 +36,14 @@ class TrackerImpl(queue: SourceQueueWithComplete[TrackerImpl.QueueInput], histor
require(historySize > 0, " History size must be a positive integer.")
private val knownResults: util.Map[String, Future[Empty]] =
private val knownResults: util.Map[String, Future[Completion]] =
Collections.synchronizedMap(new SizeCappedMap(historySize / 2, historySize))
override def track(request: SubmitAndWaitRequest)(
implicit ec: ExecutionContext): Future[Empty] = {
implicit ec: ExecutionContext): Future[Completion] = {
logger.trace(
s"tracking command for party: ${request.getCommands.party}, commandId: ${request.getCommands.commandId}")
val promise = Promise[Empty]
val promise = Promise[Completion]
val storedResult =
knownResults.putIfAbsent(request.getCommands.commandId, promise.future)
@ -55,8 +54,8 @@ class TrackerImpl(queue: SourceQueueWithComplete[TrackerImpl.QueueInput], histor
}
}
private def submitNewRequest(request: SubmitAndWaitRequest, promise: Promise[Empty])(
implicit ec: ExecutionContext): Future[Empty] = {
private def submitNewRequest(request: SubmitAndWaitRequest, promise: Promise[Completion])(
implicit ec: ExecutionContext): Future[Completion] = {
queue
.offer(
@ -86,9 +85,9 @@ object TrackerImpl {
def apply(
tracker: Flow[
Ctx[Promise[Empty], SubmitRequest],
Ctx[Promise[Empty], Completion],
Materialized[NotUsed, Promise[Empty]]],
Ctx[Promise[Completion], SubmitRequest],
Ctx[Promise[Completion], Completion],
Materialized[NotUsed, Promise[Completion]]],
inputBufferSize: Int,
historySize: Int)(implicit materializer: Materializer): TrackerImpl = {
val ((queue, mat), foreachMat) = Source
@ -97,10 +96,10 @@ object TrackerImpl {
.toMat(Sink.foreach {
case Ctx(promise, result) =>
result match {
case Completion(_, Some(Status(0, _, _)), _) =>
case compl @ Completion(_, Some(Status(0, _, _)), _, _) =>
logger.trace("Completing promise with success")
promise.trySuccess(Empty())
case Completion(_, statusO, _) =>
promise.trySuccess(compl)
case Completion(_, statusO, _, _) =>
val status = statusO
.map(
status =>
@ -140,5 +139,5 @@ object TrackerImpl {
new TrackerImpl(queue, historySize)
}
type QueueInput = Ctx[Promise[Empty], SubmitRequest]
type QueueInput = Ctx[Promise[Completion], SubmitRequest]
}

View File

@ -6,16 +6,16 @@ package com.digitalasset.platform.server.services.command
import java.util.concurrent.atomic.AtomicReference
import com.digitalasset.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.digitalasset.ledger.api.v1.completion.Completion
import com.digitalasset.platform.common.util.DirectExecutionContext
import com.digitalasset.platform.server.services.command.TrackerMap.{AsyncResource, Key}
import com.google.protobuf.empty.Empty
import com.github.ghik.silencer.silent
import com.typesafe.scalalogging.LazyLogging
import scala.collection.immutable.HashMap
import scala.concurrent.duration.{FiniteDuration, _}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import com.github.ghik.silencer.silent
/**
* A map for [[Tracker]]s with thread-safe tracking methods and automatic cleanup. A tracker tracker, if you will.
@ -54,7 +54,7 @@ class TrackerMap(retentionPeriod: FiniteDuration) extends AutoCloseable with Laz
}
def track(submitter: Key, request: SubmitAndWaitRequest)(newTracker: => Future[Tracker])(
implicit ec: ExecutionContext): Future[Empty] =
implicit ec: ExecutionContext): Future[Completion] =
// double-checked locking
trackerBySubmitter
.getOrElse(

View File

@ -15,8 +15,9 @@ import com.digitalasset.ledger.api.testing.utils.{
}
import com.digitalasset.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.digitalasset.ledger.api.v1.commands.Commands
import com.digitalasset.ledger.api.v1.completion.Completion
import com.digitalasset.platform.common.util.DirectExecutionContext
import com.google.protobuf.empty.Empty
import com.google.rpc.status.{Status => RpcStatus}
import io.grpc.Status
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterEach, Matchers, Succeeded, WordSpec}
@ -40,7 +41,7 @@ class TrackerImplTest
val (q, sink) = Source
.queue[TrackerImpl.QueueInput](1, OverflowStrategy.dropNew)
.map { in =>
in.context.success(Empty())
in.context.success(Completion(in.value.getCommands.commandId, Some(RpcStatus())))
NotUsed
}
.toMat(TestSink.probe[NotUsed])(Keep.both)

View File

@ -15,7 +15,6 @@ import com.digitalasset.ledger.api.v1.command_completion_service.{
Checkpoint,
CompletionStreamRequest
}
import com.digitalasset.ledger.api.v1.completion.Completion
import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset
import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset.LedgerBoundary.LEDGER_BEGIN
import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset.Value.Boundary
@ -28,7 +27,6 @@ import com.digitalasset.platform.apitesting.LedgerContextExtensions._
import com.digitalasset.platform.apitesting.MultiLedgerFixture
import com.digitalasset.platform.services.time.TimeProviderType.WallClock
import com.digitalasset.util.Ctx
import com.google.rpc.status.Status
import org.scalatest.{AsyncWordSpec, Matchers}
import scala.concurrent.duration._
@ -62,8 +60,8 @@ class CommandCompletionServiceIT
commands = configuredParties.map(p => Ctx(p, ctx.command(p, Nil)))
result <- Source(commands).via(tracker).runWith(Sink.seq)
} yield {
val expected = configuredParties.map(p => Ctx(p, Completion(p, Some(Status(0)))))
result should contain theSameElementsAs expected
val expected = configuredParties.map(p => (p, 0))
result.map(ctx => (ctx.value.commandId, ctx.value.getStatus.code)) should contain theSameElementsAs expected
}
}
@ -88,8 +86,7 @@ class CommandCompletionServiceIT
.take(2)
.runWith(Sink.seq)
.map { noOfHeartBeats =>
noOfHeartBeats.foreach(_ should be >= 2) // should be fine for 1Hz
succeed
all(noOfHeartBeats) should be >= 2 // should be fine for 1Hz
}
}
}

View File

@ -16,6 +16,7 @@ import com.google.protobuf.empty.Empty
import io.grpc.Status
import org.scalatest.{AsyncWordSpec, Matchers}
@SuppressWarnings(Array("org.wartremover.warts.Any"))
class CommandServiceIT
extends AsyncWordSpec
with AkkaBeforeAndAfterAll
@ -39,6 +40,24 @@ class CommandServiceIT
}
}
"return the transaction id if successful" in allFixtures { ctx =>
ctx.commandService.submitAndWaitForTransactionId(request(ctx)) map {
_.transactionId should not be empty
}
}
"return the flat transaction if successful" in allFixtures { ctx =>
ctx.commandService.submitAndWaitForTransaction(request(ctx)) map {
_.transaction should not be empty
}
}
"return the transaction tree if successful" in allFixtures { ctx =>
ctx.commandService.submitAndWaitForTransactionTree(request(ctx)) map {
_.transaction should not be empty
}
}
"complete with an empty response if resending a successful command" in allFixtures { ctx =>
val commandId = UUID.randomUUID().toString
ctx.commandService.submitAndWait(request(ctx, id = commandId)) map {
@ -49,6 +68,37 @@ class CommandServiceIT
}
}
"return the transaction id if resending a successful command" in allFixtures { ctx =>
val commandId = UUID.randomUUID().toString
ctx.commandService.submitAndWaitForTransactionId(request(ctx, id = commandId)) map {
_.transactionId should not be empty
}
ctx.commandService.submitAndWaitForTransactionId(request(ctx, id = commandId)) map {
_.transactionId should not be empty
}
}
"return the flat transaction if resending a successful command" in allFixtures { ctx =>
val commandId = UUID.randomUUID().toString
ctx.commandService.submitAndWaitForTransaction(request(ctx, id = commandId)) map {
_.transaction should not be empty
}
ctx.commandService.submitAndWaitForTransaction(request(ctx, id = commandId)) map {
_.transaction should not be empty
}
}
"return the transaction tree if resending a successful command" in allFixtures { ctx =>
val commandId = UUID.randomUUID().toString
val req = request(ctx, id = commandId)
ctx.commandService.submitAndWaitForTransactionTree(req) map {
_.transaction should not be empty
}
ctx.commandService.submitAndWaitForTransactionTree(request(ctx, id = commandId)) map {
_.transaction should not be empty
}
}
"fail with not found if ledger id is invalid" in allFixtures { ctx =>
ctx.commandService
.submitAndWait(request(ctx, ledgerId = UUID.randomUUID().toString))
@ -57,6 +107,33 @@ class CommandServiceIT
}
}
"fail SubmitAndWaitForTransactionId with not found if ledger id is invalid" in allFixtures {
ctx =>
ctx.commandService
.submitAndWaitForTransactionId(request(ctx, ledgerId = UUID.randomUUID().toString))
.failed map {
IsStatusException(Status.NOT_FOUND)(_)
}
}
"fail SubmitAndWaitForTransaction with not found if ledger id is invalid" in allFixtures {
ctx =>
ctx.commandService
.submitAndWaitForTransaction(request(ctx, ledgerId = UUID.randomUUID().toString))
.failed map {
IsStatusException(Status.NOT_FOUND)(_)
}
}
"fail SubmitAndWaitForTransactionTree with not found if ledger id is invalid" in allFixtures {
ctx =>
ctx.commandService
.submitAndWaitForTransactionTree(request(ctx, ledgerId = UUID.randomUUID().toString))
.failed map {
IsStatusException(Status.NOT_FOUND)(_)
}
}
}
}

View File

@ -3,22 +3,29 @@
package com.digitalasset.platform.tests.integration.ledger.api.commands
import com.digitalasset.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.digitalasset.ledger.api.v1.command_service.{
SubmitAndWaitForTransactionIdResponse,
SubmitAndWaitRequest
}
import com.digitalasset.ledger.api.v1.command_submission_service.SubmitRequest
import com.digitalasset.ledger.api.v1.completion.Completion
import com.digitalasset.platform.apitesting.{CommandTransactionChecks, LedgerContext}
import com.google.protobuf.empty.Empty
import com.google.rpc.status.Status
import io.grpc.StatusRuntimeException
import scala.concurrent.Future
class CommandTransactionChecksHighLevelIT extends CommandTransactionChecks {
private[this] def emptyToCompletion(
private[this] def responseToCompletion(
commandId: String,
emptyF: Future[Empty]): Future[Completion] =
emptyF
.map(_ => Completion(commandId, Some(Status(io.grpc.Status.OK.getCode.value(), ""))))
txF: Future[SubmitAndWaitForTransactionIdResponse]): Future[Completion] =
txF
.map(
tx =>
Completion(
commandId,
Some(Status(io.grpc.Status.OK.getCode.value(), "")),
tx.transactionId))
.recover {
case sre: StatusRuntimeException =>
Completion(
@ -29,9 +36,10 @@ class CommandTransactionChecksHighLevelIT extends CommandTransactionChecks {
override protected def submitCommand(
ctx: LedgerContext,
submitRequest: SubmitRequest): Future[Completion] = {
emptyToCompletion(
responseToCompletion(
submitRequest.commands.value.commandId,
ctx.commandService.submitAndWait(
SubmitAndWaitRequest(submitRequest.commands, submitRequest.traceContext)))
ctx.commandService.submitAndWaitForTransactionId(
SubmitAndWaitRequest(submitRequest.commands, submitRequest.traceContext))
)
}
}

View File

@ -75,7 +75,7 @@ class FailingCommandsIT
.runWith(Sink.head)
} yield {
result.value should matchPattern {
case Completion(`failingCommandId`, Some(status), _) if status.code == 3 =>
case Completion(`failingCommandId`, Some(status), _, _) if status.code == 3 =>
}
}
}

View File

@ -61,7 +61,7 @@ class SemanticTestAdapter(
: Future[Event.Events[String, Value.AbsoluteContractId, TxValue[Value.AbsoluteContractId]]] = {
for {
tx <- LedgerTestingHelpers
.sync(lc.commandService.submitAndWait, lc)
.sync(lc.commandService.submitAndWaitForTransactionId, lc)
.submitAndListenForSingleTreeResultOfCommand(
SubmitRequest(Some(apiCommand(submitterName.underlyingString, cmds))),
TransactionFilter(parties.map(_ -> Filters.defaultInstance)(breakOut)),

View File

@ -7,8 +7,13 @@ import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.grpc.adapter.ExecutionSequencerFactory
import com.digitalasset.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.digitalasset.ledger.api.testing.utils.{MockMessages => M}
import com.digitalasset.ledger.api.v1.command_service.{
SubmitAndWaitForTransactionIdResponse,
SubmitAndWaitRequest
}
import com.digitalasset.ledger.api.v1.command_submission_service.SubmitRequest
import com.digitalasset.ledger.api.v1.commands.{CreateCommand, ExerciseCommand}
import com.digitalasset.ledger.api.v1.completion.Completion
import com.digitalasset.ledger.api.v1.event.Event.Event.{Archived, Created}
import com.digitalasset.ledger.api.v1.event.{ArchivedEvent, CreatedEvent, Event, ExercisedEvent}
@ -17,17 +22,14 @@ import com.digitalasset.ledger.api.v1.testing.time_service.TimeServiceGrpc.TimeS
import com.digitalasset.ledger.api.v1.transaction.{Transaction, TransactionTree, TreeEvent}
import com.digitalasset.ledger.api.v1.transaction_filter.{Filters, TransactionFilter}
import com.digitalasset.ledger.api.v1.value.{Identifier, Record, RecordField, Value}
import com.digitalasset.ledger.client.services.commands.CompletionStreamElement
import com.digitalasset.ledger.client.services.testing.time.StaticTime
import com.google.protobuf.empty.Empty
import com.digitalasset.platform.apitesting.LedgerContext
import com.digitalasset.platform.participant.util.ValueConversions._
import com.google.rpc.code.Code
import com.google.rpc.status.Status
import io.grpc.StatusRuntimeException
import org.scalatest.{Assertion, Inside, Matchers, OptionValues}
import com.digitalasset.ledger.api.testing.utils.{MockMessages => M}
import com.digitalasset.ledger.api.v1.commands.{CreateCommand, ExerciseCommand}
import com.digitalasset.platform.participant.util.ValueConversions._
import com.digitalasset.platform.apitesting.LedgerContext
import com.digitalasset.ledger.client.services.commands.CompletionStreamElement
import scala.collection.{breakOut, immutable}
import scala.concurrent.duration._
@ -590,7 +592,9 @@ class LedgerTestingHelpers(
}
object LedgerTestingHelpers extends OptionValues {
def sync(submitCommand: SubmitAndWaitRequest => Future[Empty], context: LedgerContext)(
def sync(
submitCommand: SubmitAndWaitRequest => Future[SubmitAndWaitForTransactionIdResponse],
context: LedgerContext)(
implicit ec: ExecutionContext,
mat: ActorMaterializer): LedgerTestingHelpers =
async(helper(submitCommand), context)
@ -616,10 +620,15 @@ object LedgerTestingHelpers extends OptionValues {
mat: ActorMaterializer): LedgerTestingHelpers =
new LedgerTestingHelpers(submitCommand, context)
def emptyToCompletion(commandId: String, emptyF: Future[Empty])(
def responseToCompletion(commandId: String, respF: Future[SubmitAndWaitForTransactionIdResponse])(
implicit ec: ExecutionContext): Future[Completion] =
emptyF
.map(_ => Completion(commandId, Some(Status(io.grpc.Status.OK.getCode.value(), ""))))
respF
.map(
tx =>
Completion(
commandId,
Some(Status(io.grpc.Status.OK.getCode.value(), "")),
tx.transactionId))
.recover {
case sre: StatusRuntimeException =>
Completion(
@ -627,9 +636,10 @@ object LedgerTestingHelpers extends OptionValues {
Some(Status(sre.getStatus.getCode.value(), sre.getStatus.getDescription)))
}
private def helper(submitCommand: SubmitAndWaitRequest => Future[Empty])(
private def helper(
submitCommand: SubmitAndWaitRequest => Future[SubmitAndWaitForTransactionIdResponse])(
implicit ec: ExecutionContext) = { req: SubmitRequest =>
emptyToCompletion(
responseToCompletion(
req.commands.value.commandId,
submitCommand(SubmitAndWaitRequest(req.commands, req.traceContext)))
}

View File

@ -69,8 +69,9 @@ trait TransactionServiceHelpers extends Matchers {
} yield c)
.via(trackingFlow)
.runWith(Sink.foreach {
case Ctx(i, Completion(_, Some(status), _)) =>
case Ctx(i, Completion(_, Some(status), transactionId, _)) =>
status should have('code (0))
transactionId should not be empty
()
})
}

View File

@ -289,7 +289,9 @@ final case class ReferenceIndexService(
case (offset, (acceptedTx, _blindingInfo)) =>
acceptedTx.optSubmitterInfo.flatMap { sinfo =>
if (sinfo.applicationId == applicationId) {
Some(CompletionEvent.CommandAccepted(offset, sinfo.commandId))
Some(
CompletionEvent
.CommandAccepted(offset, sinfo.commandId, acceptedTx.transactionId))
} else {
None
}

View File

@ -55,7 +55,11 @@ package object v1 {
}
object CompletionEvent {
final case class Checkpoint(offset: Offset, recordTime: Timestamp) extends CompletionEvent
final case class CommandAccepted(offset: Offset, commandId: CommandId) extends CompletionEvent
final case class CommandAccepted(
offset: Offset,
commandId: CommandId,
transactionId: TransactionId)
extends CompletionEvent
final case class CommandRejected(offset: Offset, commandId: CommandId, reason: RejectionReason)
extends CompletionEvent
}

View File

@ -129,7 +129,9 @@ object LedgerApiServer {
completionService.service
.asInstanceOf[SandboxCommandCompletionService]
.completionStreamSource(r),
() => completionService.completionEnd(CompletionEndRequest(ledgerBackend.ledgerId))
() => completionService.completionEnd(CompletionEndRequest(ledgerBackend.ledgerId)),
transactionService.getTransactionById,
transactionService.getFlatTransactionById
)
)

View File

@ -88,7 +88,8 @@ class SandboxCommandCompletionService private (
tx.submitter) =>
CompletionStreamResponse(
checkpoint,
tx.commandId.fold(List.empty[Completion])(c => List(Completion(c, Some(Status())))))
tx.commandId.fold(List.empty[Completion])(c =>
List(Completion(c, Some(Status()), tx.transactionId))))
case err: LedgerSyncEvent.RejectedCommand
if isRequested(
requestedApplicationId,
@ -127,7 +128,7 @@ class SandboxCommandCompletionService private (
case RejectionReason.DuplicateCommandId(description) => Code.INVALID_ARGUMENT
}
Completion(commandId, Some(Status(code.value(), error.description)), None)
Completion(commandId, Some(Status(code.value(), error.description)), traceContext = None)
}
override def close(): Unit = {