ActiveContractsService stream ow always returns at least 1 element (#2799)

This removes the need for clients to handle the special case where the
stream might be empty.
Now the clients can always assume that they receive at least one
response element in the stream.
This commit is contained in:
Gerolf Seitz 2019-09-18 13:16:56 +02:00 committed by mergify[bot]
parent 96642d9b19
commit b70e289303
6 changed files with 17 additions and 10 deletions

View File

@ -17,7 +17,9 @@ option csharp_namespace = "Com.DigitalAsset.Ledger.Api.V1";
// Allows clients to initialize themselves according to a fairly recent state of the ledger without reading through all transactions that were committed since the ledger's creation.
service ActiveContractsService {
// Returns a stream of the latest snapshot of active contracts. Getting an empty stream means that the active contracts set is empty and the client should listen to transactions using ``LEDGER_BEGIN``.
// Returns a stream of the latest snapshot of active contracts.
// If there are no active contracts, the stream returns a single GetActiveContractsResponse message with the offset at which the snapshot has been taken.
// Clients SHOULD use the offset in the last GetActiveContractsResponse message to continue streaming transactions with the transaction service.
// Clients SHOULD NOT assume that the set of active contracts they receive reflects the state at the ledger end.
rpc GetActiveContracts (GetActiveContractsRequest) returns (stream GetActiveContractsResponse);

View File

@ -3,7 +3,8 @@
package com.digitalasset.http
import akka.stream.scaladsl.{Keep, Source}
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.digitalasset.grpc.adapter.ExecutionSequencerFactory
import com.digitalasset.http.util.FutureUtil.toFuture
import com.digitalasset.jwt.domain.Jwt
@ -15,7 +16,6 @@ import com.digitalasset.ledger.api.v1.command_service.{
import com.digitalasset.ledger.api.v1.transaction_filter.TransactionFilter
import com.digitalasset.ledger.client.LedgerClient
import com.digitalasset.ledger.client.configuration.LedgerClientConfiguration
import com.digitalasset.util.akkastreams.ExtractMaterializedValue
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
import io.grpc.stub.MetadataUtils
import io.grpc.{Channel, ClientInterceptors, Metadata}
@ -29,7 +29,7 @@ object LedgerClientJwt {
(Jwt, SubmitAndWaitRequest) => Future[SubmitAndWaitForTransactionResponse]
type GetActiveContracts =
(Jwt, TransactionFilter, Boolean) => Source[GetActiveContractsResponse, Future[String]]
(Jwt, TransactionFilter, Boolean) => Source[GetActiveContractsResponse, NotUsed]
def singleHostChannel(
hostIp: String,
@ -84,9 +84,6 @@ object LedgerClientJwt {
(jwt, req) =>
forChannel(jwt, config, channel)
.flatMap(_.commandServiceClient.submitAndWaitForTransaction(req))
private val exMat = new ExtractMaterializedValue((_: LedgerClient) => None: Option[String])
@SuppressWarnings(Array("org.wartremover.warts.Any"))
def getActiveContracts(config: LedgerClientConfiguration, channel: io.grpc.Channel)(
implicit ec: ExecutionContext,
@ -94,6 +91,5 @@ object LedgerClientJwt {
(jwt, filter, flag) =>
Source
.fromFuture(forChannel(jwt, config, channel))
.viaMat(exMat)(Keep.right)
.flatMapConcat(client => client.activeContractSetClient.getActiveContracts(filter, flag))
}

View File

@ -19,6 +19,13 @@ import scalaz.syntax.tag._
class ActiveContractSetClient(ledgerId: LedgerId, activeContractsService: ActiveContractsService)(
implicit esf: ExecutionSequencerFactory) {
/*
Returns a stream of GetActiveContractsResponse messages. The materialized value will
be resolved to the offset that can be used as a starting offset for streaming transactions
via the transaction service.
If the stream completes before the offset can be set, the materialized future will
be failed with an exception.
*/
def getActiveContracts(
filter: TransactionFilter,
verbose: Boolean = false): Source[GetActiveContractsResponse, Future[String]] = {

View File

@ -25,6 +25,7 @@ object ActiveContractSetSource {
ClientAdapter
.serverStreaming(request, stub)
.viaMat(new ExtractMaterializedValue(r => Some(r.offset)))(Keep.right)
.viaMat(new ExtractMaterializedValue(r => if (r.offset.nonEmpty) Some(r.offset) else None))(
Keep.right)
}
}

View File

@ -38,7 +38,7 @@ class ExtractSingleMaterializedValueTest
}
"there are multiple valid values" should {
"extract the first" in {
"extract the first matching element" in {
val elemToExtract = -1
val otherCandidateShuffledIn = -2

View File

@ -14,3 +14,4 @@ HEAD — ongoing
+ [Sandbox] Dramatically increased performance of the ActiveContractService by only loading the contracts that the parties in the transaction filter are allowed to see.
+ [DAML-LF] change signature of MUL_NUMERIC and DIV_NUMERIC
+ [DAML Integration Kit] fix contract key uniqueness check in kvutils.
+ [Ledger API] ActiveContractsService now specifies to always return at least one message with the offset. This removes a special case where clients would need to check if the stream was empty or not.