The completion stream RPC defaults to the ledger end as offset (#1961)

* The completion stream RPC defaults to the ledger end as offset

Fixes #1913

Relevant changes are propagated to the Java bindings (including
deprecating a method that would now return a nullable ledger end).

* Refactor completionStream method

* Address review comments

- ignore command creation results
(https://github.com/digital-asset/daml/pull/1961#discussion_r299089539)
- avoid re-connecting to the client for every command
(https://github.com/digital-asset/daml/pull/1961#discussion_r299092328)
- move offset field optionality to domain object
(https://github.com/digital-asset/daml/pull/1961#discussion_r299090451)

* Improve tests
This commit is contained in:
Stefano Baghino 2019-07-02 12:02:43 +02:00 committed by mergify[bot]
parent 6bef12f98b
commit 4774e75eb8
14 changed files with 154 additions and 32 deletions

View File

@ -17,6 +17,7 @@ import java.util.Set;
public interface CommandCompletionClient {
Flowable<CompletionStreamResponse> completionStream(String applicationId, LedgerOffset offset, Set<String> parties);
Flowable<CompletionStreamResponse> completionStream(String applicationId, Set<String> parties);
Single<CompletionEndResponse> completionEnd();
}

View File

@ -177,7 +177,7 @@ public final class DamlLedgerClient implements LedgerClient {
pool.close();
}
public static void main(String[] args) throws SSLException {
public static void main(String[] args) {
DamlLedgerClient ledgerClient = DamlLedgerClient.forHostWithLedgerIdDiscovery("localhost", 6865, Optional.empty());
ledgerClient.connect();
String ledgerId = ledgerClient.ledgerIdentityClient.getLedgerIdentity().blockingGet();

View File

@ -32,12 +32,20 @@ public class CommandCompletionClientImpl implements CommandCompletionClient {
serviceFutureStub = CommandCompletionServiceGrpc.newFutureStub(channel);
}
private Flowable<CompletionStreamResponse> completionStream(CompletionStreamRequest request) {
return ClientPublisherFlowable
.create(request.toProto(), serviceStub::completionStream, sequencerFactory)
.map(CompletionStreamResponse::fromProto);
}
@Override
public Flowable<CompletionStreamResponse> completionStream(String applicationId, LedgerOffset offset, Set<String> parties) {
CommandCompletionServiceOuterClass.CompletionStreamRequest request = new CompletionStreamRequest(ledgerId, applicationId, parties, offset).toProto();
return ClientPublisherFlowable
.create(request, serviceStub::completionStream, sequencerFactory)
.map(CompletionStreamResponse::fromProto);
return completionStream(new CompletionStreamRequest(ledgerId, applicationId, parties, offset));
}
@Override
public Flowable<CompletionStreamResponse> completionStream(String applicationId, Set<String> parties) {
return completionStream(new CompletionStreamRequest(ledgerId, applicationId, parties));
}
@Override

View File

@ -96,6 +96,10 @@ class DummyLedgerClient(
offset: LedgerOffset,
parties: util.Set[String]): Flowable[CompletionStreamResponse] =
commandCompletions
override def completionStream(
applicationId: String,
parties: util.Set[String]): Flowable[CompletionStreamResponse] =
commandCompletions
override def completionEnd(): Single[CompletionEndResponse] = ???
}

View File

@ -7,6 +7,7 @@ import com.digitalasset.ledger.api.v1.CommandCompletionServiceOuterClass;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
public class CompletionStreamRequest {
@ -17,7 +18,7 @@ public class CompletionStreamRequest {
private final Set<String> parties;
private final LedgerOffset offset;
private final Optional<LedgerOffset> offset;
public static CompletionStreamRequest fromProto(CommandCompletionServiceOuterClass.CompletionStreamRequest request) {
String ledgerId = request.getLedgerId();
@ -28,12 +29,13 @@ public class CompletionStreamRequest {
}
public CommandCompletionServiceOuterClass.CompletionStreamRequest toProto() {
return CommandCompletionServiceOuterClass.CompletionStreamRequest.newBuilder()
.setLedgerId(this.ledgerId)
.setApplicationId(this.applicationId)
.addAllParties(this.parties)
.setOffset(this.offset.toProto())
.build();
CommandCompletionServiceOuterClass.CompletionStreamRequest.Builder protoBuilder =
CommandCompletionServiceOuterClass.CompletionStreamRequest.newBuilder()
.setLedgerId(this.ledgerId)
.setApplicationId(this.applicationId)
.addAllParties(this.parties);
this.offset.ifPresent(offset -> protoBuilder.setOffset(offset.toProto()));
return protoBuilder.build();
}
@Override
@ -76,15 +78,27 @@ public class CompletionStreamRequest {
return parties;
}
/**
* @deprecated Legacy, nullable version of {@link #getLedgerOffset()}, which should be used instead.
*/
@Deprecated
public LedgerOffset getOffset() {
return offset;
return offset.orElse(null);
}
public CompletionStreamRequest(String ledgerId, String applicationId, Set<String> parties, LedgerOffset offset) {
public Optional<LedgerOffset> getLedgerOffset() { return offset; }
public CompletionStreamRequest(String ledgerId, String applicationId, Set<String> parties) {
this.ledgerId = ledgerId;
this.applicationId = applicationId;
this.parties = parties;
this.offset = offset;
this.offset = Optional.empty();
}
public CompletionStreamRequest(String ledgerId, String applicationId, Set<String> parties, LedgerOffset offset) {
this.ledgerId = ledgerId;
this.applicationId = applicationId;
this.parties = parties;
this.offset = Optional.of(offset);
}
}

View File

@ -16,8 +16,8 @@ import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset.Value.Boundary
import com.digitalasset.ledger.api.v1.trace_context.TraceContext
import com.digitalasset.ledger.api.v1.transaction.{Transaction, TransactionTree, TreeEvent}
import com.digitalasset.ledger.api.v1.transaction_filter.{Filters, TransactionFilter}
import com.digitalasset.ledger.api.v1.value.{Identifier, Value}
import com.digitalasset.ledger.api.v1.value.Value.Sum.Text
import com.digitalasset.ledger.api.v1.value.{Identifier, Value}
import com.google.protobuf.timestamp.Timestamp
import scala.util.Random

View File

@ -32,8 +32,7 @@ class CompletionServiceRequestValidator(ledgerId: LedgerId, partyNameChecker: Pa
.map(invalidField("application_id", _))
nonEmptyParties <- requireNonEmpty(request.parties, "parties")
knownParties <- partyValidator.requireKnownParties(nonEmptyParties)
offset <- FieldValidations.requirePresence(request.offset, "offset")
convertedOffset <- LedgerOffsetValidator.validate(offset, "offset")
convertedOffset <- LedgerOffsetValidator.validateOptional(request.offset, "offset")
} yield
CompletionStreamRequest(
ledgerId,

View File

@ -17,6 +17,14 @@ object LedgerOffsetValidator {
private val boundary = "boundary"
def validateOptional(
ledgerOffset: Option[LedgerOffset],
fieldName: String): Either[StatusRuntimeException, Option[domain.LedgerOffset]] =
ledgerOffset
.map(validate(_, fieldName))
.fold[Either[StatusRuntimeException, Option[domain.LedgerOffset]]](Right(None))(
_.map(Some(_)))
def validate(
ledgerOffset: LedgerOffset,
fieldName: String): Either[StatusRuntimeException, domain.LedgerOffset] = {

View File

@ -55,9 +55,7 @@ class TransactionServiceRequestValidator(
filter <- requirePresence(req.filter, "filter")
requiredBegin <- requirePresence(req.begin, "begin")
convertedBegin <- LedgerOffsetValidator.validate(requiredBegin, "begin")
convertedEnd <- req.end
.fold[Result[Option[domain.LedgerOffset]]](rightNone)(end =>
LedgerOffsetValidator.validate(end, "end").map(Some(_)))
convertedEnd <- LedgerOffsetValidator.validateOptional(req.end, "end")
knownParties <- partyValidator.requireKnownParties(req.getFilter.filtersByParty.keySet)
} yield
PartialValidation(

View File

@ -19,8 +19,26 @@ import com.google.rpc.status.Status
import io.grpc.Status.Code
import scalaz.syntax.tag._
import com.digitalasset.ledger.api.messages.command.completion.{
CompletionStreamRequest => ValidatedCompletionStreamRequest
}
import scala.concurrent.Future
object GrpcCommandCompletionService {
private[this] val completionStreamDefaultOffset = Some(domain.LedgerOffset.LedgerEnd)
private def fillInWithDefaults(
request: ValidatedCompletionStreamRequest): ValidatedCompletionStreamRequest =
if (request.offset.isDefined) {
request
} else {
request.copy(offset = completionStreamDefaultOffset)
}
}
class GrpcCommandCompletionService(
ledgerId: LedgerId,
service: CommandCompletionService,
@ -28,19 +46,23 @@ class GrpcCommandCompletionService(
)(implicit protected val esf: ExecutionSequencerFactory, protected val mat: Materializer)
extends CommandCompletionServiceAkkaGrpc {
import GrpcCommandCompletionService.fillInWithDefaults
private val validator = new CompletionServiceRequestValidator(ledgerId, partyNameChecker)
override def completionStreamSource(
request: CompletionStreamRequest): Source[CompletionStreamResponse, akka.NotUsed] =
request: CompletionStreamRequest): Source[CompletionStreamResponse, akka.NotUsed] = {
validator
.validateCompletionStreamRequest(request)
.fold(
Source.failed[CompletionStreamResponse],
validatedRequest =>
validatedRequest => {
service
.completionStreamSource(validatedRequest)
.completionStreamSource(fillInWithDefaults(validatedRequest))
.map(toApiCompletion)
}
)
}
override def completionEnd(request: CompletionEndRequest): Future[CompletionEndResponse] =
validator

View File

@ -10,5 +10,5 @@ case class CompletionStreamRequest(
ledgerId: LedgerId,
applicationId: ApplicationId,
parties: Set[Ref.Party],
offset: LedgerOffset
offset: Option[LedgerOffset]
)

View File

@ -14,22 +14,31 @@ import com.digitalasset.ledger.api.testing.utils.{
import com.digitalasset.ledger.api.v1.command_completion_service.CommandCompletionServiceGrpc.CommandCompletionService
import com.digitalasset.ledger.api.v1.command_completion_service.{
Checkpoint,
CompletionStreamRequest
CompletionStreamRequest,
CompletionStreamResponse
}
import com.digitalasset.ledger.api.v1.commands.{
Command => ProtoCommand,
CreateCommand => ProtoCreateCommand
}
import com.digitalasset.ledger.api.v1.completion.Completion
import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset
import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset.LedgerBoundary.LEDGER_BEGIN
import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset.Value.Boundary
import com.digitalasset.ledger.api.v1.transaction_service.GetLedgerEndRequest
import com.digitalasset.ledger.api.v1.value.{Record, RecordField, Value}
import com.digitalasset.ledger.client.services.commands.CompletionStreamElement.{
CheckpointElement,
CompletionElement
}
import com.digitalasset.ledger.client.services.commands.{
CommandClient,
CommandCompletionSource,
CompletionStreamElement
}
import com.digitalasset.platform.apitesting.LedgerContextExtensions._
import com.digitalasset.platform.apitesting.MultiLedgerFixture
import com.digitalasset.platform.apitesting.{LedgerContext, MultiLedgerFixture, TestTemplateIds}
import com.digitalasset.platform.sandbox.utils.FirstElementObserver
import com.digitalasset.platform.services.time.TimeProviderType.WallClock
import com.digitalasset.util.Ctx
import org.scalatest.{AsyncWordSpec, Matchers}
@ -46,14 +55,16 @@ class CommandCompletionServiceIT
with MultiLedgerFixture
with SuiteResourceManagementAroundAll {
private[this] val templateIds = new TestTemplateIds(config).templateIds
private def completionSource(
completionService: CommandCompletionService,
ledgerId: domain.LedgerId,
applicationId: String,
parties: Seq[String],
offset: LedgerOffset): Source[CompletionStreamElement, NotUsed] =
offset: Option[LedgerOffset]): Source[CompletionStreamElement, NotUsed] =
CommandCompletionSource(
CompletionStreamRequest(ledgerId.unwrap, applicationId, parties, Some(offset)),
CompletionStreamRequest(ledgerId.unwrap, applicationId, parties, offset),
completionService.completionStream)
"Commands Completion Service" when {
@ -79,7 +90,7 @@ class CommandCompletionServiceIT
ctx.ledgerId,
applicationId,
Seq(party),
LedgerOffset(Boundary(LEDGER_BEGIN)))
Some(LedgerOffset(Boundary(LEDGER_BEGIN))))
val recordTimes = completionService.collect {
case CheckpointElement(Checkpoint(Some(recordTime), _)) => recordTime
@ -107,7 +118,7 @@ class CommandCompletionServiceIT
ctx.ledgerId,
applicationId,
configuredParties,
offset)
Some(offset))
.sliding(2, 1)
.map(_.toList)
.collect {
@ -154,6 +165,58 @@ class CommandCompletionServiceIT
commandIds3 should not contain (commandIds1(1))
}
}
"implicitly tail the stream if no offset is passed" in allFixtures { ctx =>
// A command to create a Test.Dummy contract (see //ledger/sandbox/src/main/resources/damls/Test.daml)
val createDummyCommand = ProtoCommand(
ProtoCommand.Command.Create(
ProtoCreateCommand(
templateId = Some(templateIds.dummy),
createArguments = Some(
Record(
fields = Seq(
RecordField("operator", Some(Value(Value.Sum.Party(party))))
)
))
)))
def trackDummyCreation(client: CommandClient, context: LedgerContext, id: String) =
client.trackSingleCommand(context.command(id, Seq(createDummyCommand)))
def trackDummyCreations(client: CommandClient, context: LedgerContext, ids: List[String]) =
Future.sequence(ids.map(trackDummyCreation(client, context, _)))
// Don't use the `completionSource` to ensure that the RPC lands on the server before anything else happens
def tailCompletions(
context: LedgerContext
): Future[Completion] = {
val (streamObserver, future) = FirstElementObserver[CompletionStreamResponse]
context.commandCompletionService.completionStream(
CompletionStreamRequest(
context.ledgerId.unwrap,
applicationId,
Seq(party),
offset = None),
streamObserver)
future.map(_.get).collect {
case CompletionStreamResponse(_, completion +: _) => completion
}
}
val arbitraryCommandIds = List.tabulate(10)(_.toString)
val expectedCommandId = "the-one"
for {
client <- ctx.commandClient(ctx.ledgerId)
_ <- trackDummyCreations(client, ctx, arbitraryCommandIds)
futureCompletion = tailCompletions(ctx) // this action and the following have to run concurrently
_ = trackDummyCreation(client, ctx, expectedCommandId) // concurrent to previous action
completion <- futureCompletion
} yield {
completion.commandId shouldBe expectedCommandId
}
}
}
}

View File

@ -45,8 +45,10 @@ class ApiCommandCompletionService private (
subscriptionId: Any,
request)
val offset = request.offset.getOrElse(LedgerOffset.LedgerEnd)
completionsService
.getCompletions(request.offset, request.applicationId, request.parties)
.getCompletions(offset, request.applicationId, request.parties)
.via(Slf4JLog(logger, s"Serving response for completion subscription $subscriptionId"))
}

View File

@ -9,6 +9,9 @@ This page contains release notes for the SDK.
HEAD — ongoing
--------------
- [Sandbox] The completion stream method of the command completion service uses the ledger end as a default value for the offset. See `#1913 <https://github.com/digital-asset/daml/issues/1913>`__.
- [Java bindings] Added overloads to the Java bindings ``CompletionStreamRequest`` constructor and the ``CommandCompletionClient`` to accept a request without an explicit ledger offset. See `#1913 <https://github.com/digital-asset/daml/issues/1913>`__.
- [Java bindings] **DEPRECATION**: the ``CompletionStreamRequest#getOffset`` method is deprecated in favor of the non-nullable ``CompletionStreamRequest#getLedgerOffset``. See `#1913 <https://github.com/digital-asset/daml/issues/1913>`__.
- [Scala bindings] Contract keys are exposed on CreatedEvent. See `#1681 <https://github.com/digital-asset/daml/issues/1681>`__.
- [Navigator] Contract keys are show in the contract details page. See `#1681 <https://github.com/digital-asset/daml/issues/1681>`__.
- [DAML Standard Library] **BREAKING CHANGE**: Remove the deprecated modules ``DA.Map``, ``DA.Set``, ``DA.Experimental.Map`` and ``DA.Experimental.Set``. Please use ``DA.Next.Map`` and ``DA.Next.Set`` instead.