Add TransactionService methods for looking up flat transactions (#830)

This change is needed in preparation of #406, where we want to return a
transaction tree and flat transaction after a SubmitAndWaitForTransaction(Tree).
This commit is contained in:
Gerolf Seitz 2019-05-03 09:03:12 +02:00 committed by GitHub
parent 0b5630bbe6
commit f4d8e134e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 598 additions and 211 deletions

View File

@ -16,6 +16,8 @@ HEAD — ongoing
--------------------
- Fix an issue with Postgres of potentially not stopping the transaction stream at required ceiling offset. See more `here <https://github.com/digital-asset/daml/pull/802>`
- Ledger API: You can now look up flat transactions with the new TransactionService methods
``GetFlatTransactionByEventId`` and ``GetFlatTransactionById``.
0.12.12 - 2019-04-30
--------------------

View File

@ -118,7 +118,7 @@ getTransactionStream h party = do
forkIO_ tag $
withGRPCClient (config port) $ \client -> do
rpcs <- TS.transactionServiceClient client
let (TS.TransactionService rpc1 _ _ _ _) = rpcs
let (TS.TransactionService rpc1 _ _ _ _ _ _) = rpcs
sendToChan request fromServiceResponse chan rpc1
return $ ResponseStream{chan}

View File

@ -29,6 +29,10 @@ public interface TransactionsClient {
Single<TransactionTree> getTransactionById(String transactionId, Set<String> requestingParties);
Single<Transaction> getFlatTransactionByEventId(String eventId, Set<String> requestingParties);
Single<Transaction> getFlatTransactionById(String transactionId, Set<String> requestingParties);
Single<LedgerOffset> getLedgerEnd();
}

View File

@ -96,6 +96,33 @@ public class TransactionClientImpl implements TransactionsClient {
.map(GetTransactionResponse::getTransaction);
}
@Override
public Single<Transaction> getFlatTransactionByEventId(String eventId, Set<String> requestingParties) {
TransactionServiceOuterClass.GetTransactionByEventIdRequest request = TransactionServiceOuterClass.GetTransactionByEventIdRequest.newBuilder()
.setLedgerId(ledgerId)
.setEventId(eventId)
.addAllRequestingParties(requestingParties)
.build();
return extractTransaction(serviceFutureStub.getFlatTransactionByEventId(request));
}
@Override
public Single<Transaction> getFlatTransactionById(String transactionId, Set<String> requestingParties) {
TransactionServiceOuterClass.GetTransactionByIdRequest request = TransactionServiceOuterClass.GetTransactionByIdRequest.newBuilder()
.setLedgerId(ledgerId)
.setTransactionId(transactionId)
.addAllRequestingParties(requestingParties)
.build();
return extractTransaction(serviceFutureStub.getFlatTransactionById(request));
}
private Single<Transaction> extractTransaction(Future<TransactionServiceOuterClass.GetFlatTransactionResponse> future) {
return Single
.fromFuture(future)
.map(GetFlatTransactionResponse::fromProto)
.map(GetFlatTransactionResponse::getTransaction);
}
@Override
public Single<LedgerOffset> getLedgerEnd() {
TransactionServiceOuterClass.GetLedgerEndRequest request = TransactionServiceOuterClass.GetLedgerEndRequest.newBuilder().setLedgerId(ledgerId).build();

View File

@ -383,7 +383,7 @@ class BotTest extends FlatSpec with Matchers with DataLayerHelpers {
client.connect()
/* The bot is wired here and inside wire is where the race condition can happen. We catch the possible
* error to pretty-print it. This try-catch depends on the implementation of `TRansactionServiceImpl.getTransaction()`
* error to pretty-print it. This try-catch depends on the implementation of `TransactionServiceImpl.getTransactionTree()`
*/
try {
Bot.wireSimple(

View File

@ -51,7 +51,7 @@ class LedgerViewFlowableSpec extends FlatSpec with Matchers {
)
ledgerViewFlowable
.timeout(10, TimeUnit.MILLISECONDS)
.timeout(1, TimeUnit.SECONDS)
.blockingFirst() shouldBe initialLedgerView
}

View File

@ -79,6 +79,14 @@ class DummyLedgerClient(
requestingParties: util.Set[String]): Single[TransactionTree] =
???
override def getFlatTransactionByEventId(
eventId: String,
requestingParties: util.Set[String]): Single[Transaction] = ???
override def getFlatTransactionById(
transactionId: String,
requestingParties: util.Set[String]): Single[Transaction] = ???
override def getLedgerEnd: Single[LedgerOffset] = Single.just(ledgerEnd)
}

View File

@ -0,0 +1,53 @@
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.javaapi.data;
import com.digitalasset.ledger.api.v1.TransactionServiceOuterClass;
import org.checkerframework.checker.nullness.qual.NonNull;
import java.util.Objects;
public class GetFlatTransactionResponse {
private final Transaction transaction;
public GetFlatTransactionResponse(@NonNull Transaction transaction) {
this.transaction = transaction;
}
public static GetFlatTransactionResponse fromProto(TransactionServiceOuterClass.GetFlatTransactionResponse response) {
return new GetFlatTransactionResponse(Transaction.fromProto(response.getTransaction()));
}
public TransactionServiceOuterClass.GetFlatTransactionResponse toProto() {
return TransactionServiceOuterClass.GetFlatTransactionResponse.newBuilder()
.setTransaction(this.transaction.toProto())
.build();
}
public Transaction getTransaction() {
return transaction;
}
@Override
public String toString() {
return "GetFlatTransactionResponse{" +
"transaction=" + transaction +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GetFlatTransactionResponse that = (GetFlatTransactionResponse) o;
return Objects.equals(transaction, that.transaction);
}
@Override
public int hashCode() {
return Objects.hash(transaction);
}
}

View File

@ -35,6 +35,8 @@ class TransactionServiceImpl(ledgerContent: Observable[LedgerItem]) extends Tran
val lastTransactionsTreesRequest = new AtomicReference[GetTransactionsRequest]()
val lastTransactionByEventIdRequest = new AtomicReference[GetTransactionByEventIdRequest]()
val lastTransactionByIdRequest = new AtomicReference[GetTransactionByIdRequest]()
val lastFlatTransactionByEventIdRequest = new AtomicReference[GetTransactionByEventIdRequest]()
val lastFlatTransactionByIdRequest = new AtomicReference[GetTransactionByIdRequest]()
val lastLedgerEndRequest = new AtomicReference[GetLedgerEndRequest]()
override def getTransactions(
@ -88,6 +90,22 @@ class TransactionServiceImpl(ledgerContent: Observable[LedgerItem]) extends Tran
new StatusRuntimeException(Status.UNIMPLEMENTED.withDescription("Pending"))
}
override def getFlatTransactionByEventId(
request: GetTransactionByEventIdRequest): Future[GetFlatTransactionResponse] =
Future.failed[GetFlatTransactionResponse] {
lastFlatTransactionByEventIdRequest.set(request)
// TODO DEL-6007
new StatusRuntimeException(Status.UNIMPLEMENTED.withDescription("Pending"))
}
override def getFlatTransactionById(
request: GetTransactionByIdRequest): Future[GetFlatTransactionResponse] =
Future.failed[GetFlatTransactionResponse] {
lastFlatTransactionByIdRequest.set(request)
// TODO DEL-6007
new StatusRuntimeException(Status.UNIMPLEMENTED.withDescription("Pending"))
}
override def getLedgerEnd(request: GetLedgerEndRequest): Future[GetLedgerEndResponse] = {
lastLedgerEndRequest.set(request)

View File

@ -20,16 +20,26 @@ service TransactionService {
// Read the ledger's filtered transaction stream for a set of parties.
rpc GetTransactions (GetTransactionsRequest) returns (stream GetTransactionsResponse);
// Read the ledger's complete transaction stream for a set of parties.
// Read the ledger's complete transaction tree stream for a set of parties.
rpc GetTransactionTrees (GetTransactionsRequest) returns (stream GetTransactionTreesResponse);
// Lookup a transaction tree by the ID of an event that appears within it.
// Returns ``NOT_FOUND`` if no such transaction exists.
// For looking up a transaction instead of a transaction tree, please see GetFlatTransactionByEventId
rpc GetTransactionByEventId (GetTransactionByEventIdRequest) returns (GetTransactionResponse);
// Lookup a transaction tree by its ID.
// Returns ``NOT_FOUND`` if no such transaction exists.
// For looking up a transaction instead of a transaction tree, please see GetFlatTransactionById
rpc GetTransactionById (GetTransactionByIdRequest) returns (GetTransactionResponse);
// Lookup a transaction by the ID of an event that appears within it.
// Returns ``NOT_FOUND`` if no such transaction exists.
rpc GetTransactionByEventId (GetTransactionByEventIdRequest) returns (GetTransactionResponse);
rpc GetFlatTransactionByEventId (GetTransactionByEventIdRequest) returns (GetFlatTransactionResponse);
// Lookup a transaction by its ID.
// Returns ``NOT_FOUND`` if no such transaction exists.
rpc GetTransactionById (GetTransactionByIdRequest) returns (GetTransactionResponse);
rpc GetFlatTransactionById (GetTransactionByIdRequest) returns (GetFlatTransactionResponse);
// Get the current ledger end.
// Subscriptions started with the returned offset will serve transactions created after this RPC was called.
@ -121,6 +131,10 @@ message GetTransactionResponse {
TransactionTree transaction = 1;
}
message GetFlatTransactionResponse {
Transaction transaction = 1;
}
message GetLedgerEndRequest {
// Must correspond to the ledger ID reported by the Ledger Identification Service.
// Required

View File

@ -27,6 +27,7 @@ import com.digitalasset.ledger.api.v1.transaction_service.{
}
import com.digitalasset.ledger.api.validation.PartyNameChecker
import com.digitalasset.platform.participant.util.EventFilter
import com.digitalasset.platform.participant.util.EventFilter.TemplateAwareFilter
import com.digitalasset.platform.server.api._
import com.digitalasset.platform.server.api.services.domain.TransactionService
import com.digitalasset.platform.server.api.services.grpc.GrpcTransactionService
@ -85,52 +86,67 @@ class DamlOnXTransactionService private (val indexService: IndexService, paralle
runTransactionPipeline(ledgerId, request.begin, request.end, request.filter)
.mapConcat {
case (offset, (trans, blindingInfo)) =>
val transactionWithEventIds =
trans.transaction.mapNodeId(nodeIdToEventId(trans.transactionId, _))
val events =
TransactionConversion
.genToFlatTransaction(
transactionWithEventIds,
blindingInfo.explicitDisclosure.map {
case (nodeId, parties) =>
nodeIdToEventId(trans.transactionId, nodeId) -> parties.map(_.underlyingString)
},
request.verbose
)
acceptedToFlat(offset, trans, blindingInfo, request.verbose, eventFilter) match {
case Some(transaction) =>
val response = GetTransactionsResponse(Seq(transaction))
logger.debug(
"Serving item {} (offset: {}) in transaction subscription {} to client",
transaction.transactionId,
transaction.offset,
subscriptionId)
List(response)
val submitterIsSubscriber =
trans.optSubmitterInfo
.map(_.submitter)
.fold(false)(eventFilter.isSubmitterSubscriber)
if (events.nonEmpty || submitterIsSubscriber) {
val transaction = PTransaction(
transactionId = trans.transactionId,
commandId =
if (submitterIsSubscriber)
trans.optSubmitterInfo.map(_.commandId).getOrElse("")
else "",
workflowId = trans.transactionMeta.workflowId,
effectiveAt = Some(fromInstant(trans.transactionMeta.ledgerEffectiveTime.toInstant)), // FIXME(JM): conversion
events = events,
offset = offset.toString,
)
val response = GetTransactionsResponse(Seq(transaction))
logger.debug(
"Serving item {} (offset: {}) in transaction subscription {} to client",
transaction.transactionId,
transaction.offset,
subscriptionId)
List(response)
} else {
logger.trace(
"Not serving item {} for transaction subscription {} as no events are visible",
trans.transactionId,
subscriptionId: Any)
Nil
case None =>
logger.trace(
"Not serving item {} for transaction subscription {} as no events are visible",
trans.transactionId,
subscriptionId: Any)
Nil
}
}
}
private def acceptedToFlat(
offset: Offset,
trans: TransactionAccepted,
blindingInfo: BlindingInfo,
verbose: Boolean,
eventFilter: TemplateAwareFilter) = {
val transactionWithEventIds =
trans.transaction.mapNodeId(nodeIdToEventId(trans.transactionId, _))
val events =
TransactionConversion
.genToFlatTransaction(
transactionWithEventIds,
blindingInfo.explicitDisclosure.map {
case (nodeId, parties) =>
nodeIdToEventId(trans.transactionId, nodeId) -> parties.map(_.underlyingString)
},
verbose
)
val submitterIsSubscriber =
trans.optSubmitterInfo
.map(_.submitter)
.fold(false)(eventFilter.isSubmitterSubscriber)
if (events.nonEmpty || submitterIsSubscriber) {
Some(
PTransaction(
transactionId = trans.transactionId,
commandId =
if (submitterIsSubscriber)
trans.optSubmitterInfo.map(_.commandId).getOrElse("")
else "",
workflowId = trans.transactionMeta.workflowId,
effectiveAt = Some(fromInstant(trans.transactionMeta.ledgerEffectiveTime.toInstant)), // FIXME(JM): conversion
events = events,
offset = offset.toString,
))
} else {
None
}
}
override def getTransactionTrees(request: GetTransactionTreesRequest)
: Source[WithOffset[String, VisibleTransaction], NotUsed] = {
logger.debug("Received {}", request)
@ -168,20 +184,34 @@ class DamlOnXTransactionService private (val indexService: IndexService, paralle
eventIdToTransactionId(request.eventId) match {
case None => Future.successful(None)
case Some(txId) =>
lookupTransactionById(request.ledgerId.unwrap, txId, request.requestingParties)
lookupTransactionTreeById(request.ledgerId.unwrap, txId, request.requestingParties)
}
}
def getTransactionById(request: GetTransactionByIdRequest): Future[Option[VisibleTransaction]] = {
logger.debug("Received {}", request)
lookupTransactionById(
lookupTransactionTreeById(
request.ledgerId.unwrap,
request.transactionId.unwrap,
request.requestingParties)
}
private def lookupTransactionById(
override def getFlatTransactionByEventId(
request: GetTransactionByEventIdRequest): Future[Option[PTransaction]] = {
eventIdToTransactionId(request.eventId) match {
case None => Future.successful(None)
case Some(txId) =>
lookupFlatTransactionById(request.ledgerId.unwrap, txId, request.requestingParties)
}
}
override def getFlatTransactionById(
req: GetTransactionByIdRequest): Future[Option[PTransaction]] = {
lookupFlatTransactionById(req.ledgerId.unwrap, req.transactionId.unwrap, req.requestingParties)
}
private def lookupTransactionTreeById[A](
ledgerId: String,
txId: String,
requestingParties: Set[Party]): Future[Option[VisibleTransaction]] = {
@ -189,37 +219,61 @@ class DamlOnXTransactionService private (val indexService: IndexService, paralle
// FIXME(JM): Move to IndexService
consumeAsyncResult(
indexService
.getAcceptedTransactions(
ledgerId = Ref.SimpleString.assertFromString(ledgerId),
beginAfter = None,
endAt = None,
filter = filter
)).flatMap { txs =>
txs
.collect {
case (offset: Offset, (t: TransactionAccepted, _)) if t.transactionId == txId =>
t
}
.runWith(Sink.headOption)
.flatMap {
case Some(trans) =>
val result = VisibleTransaction.toVisibleTransaction(
filter,
toTransactionWithMeta(trans)
)
Future.successful(result)
runTransactionPipeline(
Ref.SimpleString.assertFromString(ledgerId),
LedgerOffset.LedgerBegin,
Some(LedgerOffset.LedgerEnd),
filter)
.collect {
case (o, (t: TransactionAccepted, bi)) if t.transactionId == txId => (o, t, bi)
}
.runWith(Sink.headOption)
.flatMap {
case Some((_, trans, _)) =>
val result = VisibleTransaction.toVisibleTransaction(
filter,
toTransactionWithMeta(trans)
)
Future.successful(result)
case None =>
Future.failed(
Status.INVALID_ARGUMENT
.withDescription(s"$txId could not be found")
.asRuntimeException)
case None =>
Future.failed(
Status.INVALID_ARGUMENT
.withDescription(s"$txId could not be found")
.asRuntimeException)
}
}
}
private def lookupFlatTransactionById[A](
ledgerId: String,
txId: String,
requestingParties: Set[Party]): Future[Option[PTransaction]] = {
val filter = TransactionFilter.allForParties(requestingParties)
}
// FIXME(JM): Move to IndexService
runTransactionPipeline(
Ref.SimpleString.assertFromString(ledgerId),
LedgerOffset.LedgerBegin,
Some(LedgerOffset.LedgerEnd),
filter)
.collect {
case (o, (t: TransactionAccepted, bi)) if t.transactionId == txId => (o, t, bi)
}
.runWith(Sink.headOption)
.flatMap {
case Some((offset, trans, blindingInfo)) =>
val eventFilter = EventFilter.byTemplates(
TransactionFilter(requestingParties.map(_ -> Filters.noFilter)(breakOut)))
Future.successful(
acceptedToFlat(offset, trans, blindingInfo, verbose = true, eventFilter))
case None =>
Future.failed(
Status.INVALID_ARGUMENT
.withDescription(s"$txId could not be found")
.asRuntimeException)
}
}
override def getLedgerEnd(ledgerId: String): Future[LedgerOffset.Absolute] =

View File

@ -52,6 +52,17 @@ final class TransactionClient(ledgerId: String, transactionService: TransactionS
transactionService
.getTransactionByEventId(GetTransactionByEventIdRequest(ledgerId, eventId, parties))
def getFlatTransactionById(transactionId: String, parties: Seq[String])(
implicit ec: ExecutionContext): Future[GetFlatTransactionResponse] = {
transactionService
.getFlatTransactionById(GetTransactionByIdRequest(ledgerId, transactionId, parties))
}
def getFlatTransactionByEventId(eventId: String, parties: Seq[String])(
implicit ec: ExecutionContext): Future[GetFlatTransactionResponse] =
transactionService
.getFlatTransactionByEventId(GetTransactionByEventIdRequest(ledgerId, eventId, parties))
def getLedgerEnd: Future[GetLedgerEndResponse] =
transactionService.getLedgerEnd(GetLedgerEndRequest(ledgerId))
}

View File

@ -13,6 +13,7 @@ import com.digitalasset.ledger.api.messages.transaction.{
GetTransactionsRequest
}
import com.digitalasset.ledger.api.v1.transaction_service.GetTransactionsResponse
import com.digitalasset.ledger.api.v1.transaction.Transaction
import com.digitalasset.platform.server.api.WithOffset
import com.digitalasset.platform.server.services.transaction.VisibleTransaction
@ -34,4 +35,8 @@ trait TransactionService {
def getTransactionByEventId(
req: GetTransactionByEventIdRequest): Future[Option[VisibleTransaction]]
def getFlatTransactionById(req: GetTransactionByIdRequest): Future[Option[Transaction]]
def getFlatTransactionByEventId(req: GetTransactionByEventIdRequest): Future[Option[Transaction]]
}

View File

@ -9,7 +9,7 @@ import akka.stream.scaladsl.Source
import com.digitalasset.api.util.TimestampConversion
import com.digitalasset.grpc.adapter.ExecutionSequencerFactory
import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset
import com.digitalasset.ledger.api.v1.transaction.TransactionTree
import com.digitalasset.ledger.api.v1.transaction.{Transaction, TransactionTree}
import com.digitalasset.ledger.api.v1.transaction_service.TransactionServiceGrpc.{
TransactionService => ApiTransactionService
}
@ -104,6 +104,12 @@ class GrpcTransactionService(
)
}
private def optionalTransactionToApiResponse(txO: Option[Transaction]) =
txO.fold {
throw new ApiException(
Status.INVALID_ARGUMENT.withDescription("Transaction not found, or not visible."))
}(t => GetFlatTransactionResponse(Some(t)))
override protected def getTransactionTreesSource(
request: GetTransactionsRequest): Source[GetTransactionTreesResponse, NotUsed] = {
logger.debug("Received new transaction tree request {}", request)
@ -154,6 +160,32 @@ class GrpcTransactionService(
.map(opt => optionalVisibleToApiTx(opt))(DirectExecutionContext))
}
override def getFlatTransactionByEventId(
request: GetTransactionByEventIdRequest): Future[GetFlatTransactionResponse] = {
val validation = validator.validateTransactionByEventId(request)
validation.fold(
Future.failed,
txId =>
service
.getFlatTransactionByEventId(txId)
.map(optionalTransactionToApiResponse)(DirectExecutionContext)
)
}
override def getFlatTransactionById(
request: GetTransactionByIdRequest): Future[GetFlatTransactionResponse] = {
val validation = validator.validateTransactionById(request)
validation.fold(
Future.failed,
txId =>
service
.getFlatTransactionById(txId)
.map(optionalTransactionToApiResponse)(DirectExecutionContext)
)
}
override def getLedgerEnd(request: GetLedgerEndRequest): Future[GetLedgerEndResponse] = {
val validation = validator.validateLedgerEnd(request)

View File

@ -67,7 +67,11 @@ object TransactionFiltration {
}
)
if (filteredPartiesByNode.exists(_._2.nonEmpty)) {
// We currently allow composite commands without any actual commands and
// emit empty flat transactions. To be consistent with that behavior,
// we check for filteredPartiesByNode.isEmpty so that we also emit empty
// transaction trees.
if (filteredPartiesByNode.exists(_._2.nonEmpty) || filteredPartiesByNode.isEmpty) {
val nodeIdToParty: Map[String, immutable.Set[Party]] = filteredPartiesByNode.map {
case (k, v) => (nidToString(k), v)
}(breakOut)

View File

@ -857,29 +857,35 @@ class TransactionServiceIT
}
}
"asking for historical transactions by id" should {
"asking for historical transaction trees by id" should {
"return the transaction if it exists, and the party can see it" in allFixtures { context =>
val beginOffset =
LedgerOffset(LedgerOffset.Value.Boundary(LedgerOffset.LedgerBoundary.LEDGER_BEGIN))
for {
_ <- insertCommands(getTrackerFlow(context), "provenance-by-id", 1, config.getLedgerId)
firstTransaction <- context.transactionClient
.getTransactions(beginOffset, None, transactionFilter)
.runWith(Sink.head)
transactionId = firstTransaction.transactionId
response <- context.transactionClient.getTransactionById(transactionId, List("party"))
notVisibleError <- context.transactionClient
.getTransactionById(transactionId, List("Alice"))
.failed
} yield {
response.transaction should not be empty
inside(notVisibleError) {
case sre: StatusRuntimeException =>
sre.getStatus.getCode shouldEqual Status.INVALID_ARGUMENT.getCode
sre.getStatus.getDescription shouldEqual "Transaction not found, or not visible."
"return the transaction tree if it exists, and the party can see it" in allFixtures {
context =>
val beginOffset =
LedgerOffset(LedgerOffset.Value.Boundary(LedgerOffset.LedgerBoundary.LEDGER_BEGIN))
for {
_ <- insertCommands(
getTrackerFlow(context),
"tree-provenance-by-id",
1,
config.getLedgerId)
firstTransaction <- context.transactionClient
.getTransactions(beginOffset, None, transactionFilter)
.runWith(Sink.head)
transactionId = firstTransaction.transactionId
response <- context.transactionClient
.getTransactionById(transactionId, List("party"))
notVisibleError <- context.transactionClient
.getTransactionById(transactionId, List("Alice"))
.failed
} yield {
response.transaction should not be empty
inside(notVisibleError) {
case sre: StatusRuntimeException =>
sre.getStatus.getCode shouldEqual Status.INVALID_ARGUMENT.getCode
sre.getStatus.getDescription shouldEqual "Transaction not found, or not visible."
}
}
}
}
"return INVALID_ARGUMENT if it does not exist" in allFixtures { context =>
@ -950,14 +956,86 @@ class TransactionServiceIT
}
}
"asking for historical transactions by event id" should {
"return the transaction if it exists" in allFixtures { context =>
"asking for historical flat transactions by id" should {
"return the flat transaction if it exists, and the party can see it" in allFixtures {
context =>
val beginOffset =
LedgerOffset(LedgerOffset.Value.Boundary(LedgerOffset.LedgerBoundary.LEDGER_BEGIN))
for {
_ <- insertCommands(
getTrackerFlow(context),
"flat-provenance-by-id",
1,
config.getLedgerId)
firstTransaction <- context.transactionClient
.getTransactions(beginOffset, None, transactionFilter)
.runWith(Sink.head)
transactionId = firstTransaction.transactionId
response <- context.transactionClient
.getFlatTransactionById(transactionId, List("party"))
notVisibleError <- context.transactionClient
.getFlatTransactionById(transactionId, List("Alice"))
.failed
} yield {
response.transaction should not be empty
inside(notVisibleError) {
case sre: StatusRuntimeException =>
sre.getStatus.getCode shouldEqual Status.INVALID_ARGUMENT.getCode
sre.getStatus.getDescription shouldEqual "Transaction not found, or not visible."
}
}
}
"return INVALID_ARGUMENT if it does not exist" in allFixtures { context =>
context.transactionClient
.getFlatTransactionById(
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
List("party"))
.failed
.map(IsStatusException(Status.INVALID_ARGUMENT))
}
"fail with the expected status on a ledger Id mismatch" in allFixtures { context =>
newClient(context.transactionService, "not" + config.getLedgerId)
.getFlatTransactionById(transactionId, List("party"))
.failed
.map(IsStatusException(Status.NOT_FOUND))
}
"fail with INVALID_ARGUMENT status if the requesting parties field is empty" in allFixtures {
context =>
context.transactionClient
.getFlatTransactionById(transactionId, Nil)
.failed
.map(IsStatusException(Status.INVALID_ARGUMENT))
}
"return the same events for each tx as the transaction stream itself" in allFixtures {
context =>
val requestingParties = transactionFilter.filtersByParty.keySet
context.transactionClient
.getTransactions(ledgerBegin, Some(ledgerEnd), transactionFilter, true)
.mapAsyncUnordered(16) { tx =>
context.transactionClient
.getFlatTransactionById(tx.transactionId, requestingParties.toList)
.map(tx -> _.getTransaction)
}
.runFold(succeed) {
case (acc, (original, byId)) =>
byId shouldBe original
}
}
}
"asking for historical transaction trees by event id" should {
"return the transaction tree if it exists" in allFixtures { context =>
val beginOffset =
LedgerOffset(LedgerOffset.Value.Boundary(LedgerOffset.LedgerBoundary.LEDGER_BEGIN))
for {
_ <- insertCommands(
getTrackerFlow(context),
"provenance-by-event-id",
"tree provenance-by-event-id",
1,
config.getLedgerId)
tx <- context.transactionClient
@ -973,13 +1051,12 @@ class TransactionServiceIT
.value
result <- context.transactionClient
.getTransactionByEventId(eventId, Seq(party))
.map(_.transaction)
notVisibleError <- context.transactionClient
.getTransactionByEventId(eventId, List("Alice"))
.failed
} yield {
result should not be empty
result.transaction should not be empty
inside(notVisibleError) {
case sre: StatusRuntimeException =>
@ -1021,6 +1098,76 @@ class TransactionServiceIT
}
}
"asking for historical flat transactions by event id" should {
"return the flat transaction if it exists" in allFixtures { context =>
val beginOffset =
LedgerOffset(LedgerOffset.Value.Boundary(LedgerOffset.LedgerBoundary.LEDGER_BEGIN))
for {
_ <- insertCommands(
getTrackerFlow(context),
"flat-provenance-by-event-id",
1,
config.getLedgerId)
tx <- context.transactionClient
.getTransactions(beginOffset, None, transactionFilter)
.runWith(Sink.head)
eventId = tx.events.headOption
.map(_.event match {
case Archived(v) => v.eventId
case Created(v) => v.eventId
case Event.Event.Exercised(v) => v.eventId
case Event.Event.Empty => fail(s"Received empty event in $tx")
})
.value
result <- context.transactionClient
.getFlatTransactionByEventId(eventId, Seq(party))
notVisibleError <- context.transactionClient
.getFlatTransactionByEventId(eventId, List("Alice"))
.failed
} yield {
result.transaction should not be empty
inside(notVisibleError) {
case sre: StatusRuntimeException =>
sre.getStatus.getCode shouldEqual Status.INVALID_ARGUMENT.getCode
sre.getStatus.getDescription shouldEqual "Transaction not found, or not visible."
}
}
}
"return INVALID_ARGUMENT for invalid event IDs" in allFixtures { context =>
context.transactionClient
.getFlatTransactionByEventId("don't worry, be happy", List("party"))
.failed
.map(IsStatusException(Status.INVALID_ARGUMENT))
}
"return INVALID_ARGUMENT if it does not exist" in allFixtures { context =>
context.transactionClient
.getFlatTransactionByEventId(
"#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa:000",
List("party"))
.failed
.map(IsStatusException(Status.INVALID_ARGUMENT))
}
"fail with the expected status on a ledger Id mismatch" in allFixtures { context =>
newClient(context.transactionService, "not" + config.getLedgerId)
.getFlatTransactionByEventId("#42:0", List("party"))
.failed
.map(IsStatusException(Status.NOT_FOUND))
}
"fail with INVALID_ARGUMENT status if the requesting parties field is empty" in allFixtures {
context =>
context.transactionClient
.getFlatTransactionByEventId(transactionId, Nil)
.failed
.map(IsStatusException(Status.INVALID_ARGUMENT))
}
}
"reading transactions events " should {
def validateStream(getEvents: () => Future[Seq[Event.Event]]) =

View File

@ -9,14 +9,7 @@ import com.digitalasset.ledger.api.logging.FailingServerFixture.Exceptions.{
InternalGrpc
}
import com.digitalasset.ledger.api.v1.command_completion_service.CommandCompletionServiceGrpc.CommandCompletionService
import com.digitalasset.ledger.api.v1.command_completion_service.{
CommandCompletionServiceGrpc,
CommandCompletionServiceLogging,
CompletionEndRequest,
CompletionEndResponse,
CompletionStreamRequest,
CompletionStreamResponse
}
import com.digitalasset.ledger.api.v1.command_completion_service._
import com.digitalasset.ledger.api.v1.ledger_identity_service.LedgerIdentityServiceGrpc.LedgerIdentityService
import com.digitalasset.ledger.api.v1.ledger_identity_service.{
GetLedgerIdentityRequest,
@ -24,39 +17,11 @@ import com.digitalasset.ledger.api.v1.ledger_identity_service.{
LedgerIdentityServiceGrpc,
LedgerIdentityServiceLogging
}
import com.digitalasset.ledger.api.v1.package_service.{
GetPackageRequest,
GetPackageResponse,
GetPackageStatusRequest,
GetPackageStatusResponse,
ListPackagesRequest,
ListPackagesResponse,
PackageServiceGrpc,
PackageServiceLogging
}
import com.digitalasset.ledger.api.v1.package_service._
import com.digitalasset.ledger.api.v1.package_service.PackageServiceGrpc.PackageService
import com.digitalasset.ledger.api.v1.transaction_service.{
GetLedgerEndRequest,
GetLedgerEndResponse,
GetTransactionByEventIdRequest,
GetTransactionByIdRequest,
GetTransactionResponse,
GetTransactionTreesResponse,
GetTransactionsRequest,
GetTransactionsResponse,
TransactionServiceGrpc,
TransactionServiceLogging
}
import com.digitalasset.ledger.api.v1.transaction_service._
import com.digitalasset.ledger.api.v1.transaction_service.TransactionServiceGrpc.TransactionService
import io.grpc.{
BindableService,
Channel,
ManagedChannel,
Server,
ServerServiceDefinition,
Status,
StatusRuntimeException
}
import io.grpc._
import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
import io.grpc.stub.StreamObserver
import org.scalatest.{BeforeAndAfterAll, Suite}
@ -137,6 +102,12 @@ object FailingServerFixture {
override def getTransactionById(
request: GetTransactionByIdRequest): Future[GetTransactionResponse] =
Future.failed(AbortedGrpc)
override def getFlatTransactionByEventId(
request: GetTransactionByEventIdRequest): Future[GetFlatTransactionResponse] =
Future.failed(AbortedGrpc)
override def getFlatTransactionById(
request: GetTransactionByIdRequest): Future[GetFlatTransactionResponse] =
Future.failed(AbortedGrpc)
override def getLedgerEnd(request: GetLedgerEndRequest): Future[GetLedgerEndResponse] =
throw AbortedGrpc
override def bindService(): ServerServiceDefinition =

View File

@ -22,6 +22,7 @@ import com.digitalasset.ledger.api.validation.PartyNameChecker
import com.digitalasset.ledger.backend.api.v1.LedgerBackend
import com.digitalasset.ledger.backend.api.v1.LedgerSyncEvent.AcceptedTransaction
import com.digitalasset.platform.participant.util.EventFilter
import com.digitalasset.platform.participant.util.EventFilter.TemplateAwareFilter
import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdFormatter.TransactionIdWithIndex
import com.digitalasset.platform.server.api._
import com.digitalasset.platform.server.api.services.domain.TransactionService
@ -65,7 +66,8 @@ class SandboxTransactionService private (val ledgerBackend: LedgerBackend, paral
private val transactionPipeline = TransactionPipeline(ledgerBackend)
@SuppressWarnings(Array("org.wartremover.warts.Option2Iterable"))
def getTransactions(request: GetTransactionsRequest): Source[GetTransactionsResponse, NotUsed] = {
override def getTransactions(
request: GetTransactionsRequest): Source[GetTransactionsResponse, NotUsed] = {
val subscriptionId = subscriptionIdCounter.incrementAndGet().toString
logger.debug(
"Received request for transaction subscription {}: {}",
@ -78,45 +80,54 @@ class SandboxTransactionService private (val ledgerBackend: LedgerBackend, paral
transactionPipeline
.run(request.begin, request.end)
.mapConcat { trans =>
val events =
TransactionConversion
.genToFlatTransaction(
trans.transaction,
trans.explicitDisclosure.mapValues(set => set.map(_.underlyingString)),
request.verbose)
.flatMap(eventFilter.filterEvent _)
val submitterIsSubscriber =
trans.submitter
.map(SimpleString.assertFromString)
.fold(false)(eventFilter.isSubmitterSubscriber)
if (events.nonEmpty || submitterIsSubscriber) {
val transaction = PTransaction(
transactionId = trans.transactionId,
commandId = if (submitterIsSubscriber) trans.commandId.getOrElse("") else "",
workflowId = trans.workflowId,
effectiveAt = Some(fromInstant(trans.recordTime)),
events = events,
offset = trans.offset
)
val response = GetTransactionsResponse(Seq(transaction))
logger.debug(
"Serving item {} (offset: {}) in transaction subscription {} to client",
transaction.transactionId,
transaction.offset,
subscriptionId)
List(response)
} else {
logger.trace(
"Not serving item {} for transaction subscription {} as no events are visible",
trans.transactionId,
subscriptionId: Any)
Nil
acceptedToFlat(trans, request.verbose, eventFilter) match {
case Some(transaction) =>
val response = GetTransactionsResponse(Seq(transaction))
logger.debug(
"Serving item {} (offset: {}) in transaction subscription {} to client",
transaction.transactionId,
transaction.offset,
subscriptionId)
List(response)
case None =>
logger.trace(
"Not serving item {} for transaction subscription {} as no events are visible",
trans.transactionId,
subscriptionId: Any)
Nil
}
}
}
private def acceptedToFlat(
trans: AcceptedTransaction,
verbose: Boolean,
eventFilter: TemplateAwareFilter): Option[PTransaction] = {
val events =
TransactionConversion
.genToFlatTransaction(
trans.transaction,
trans.explicitDisclosure.mapValues(set => set.map(_.underlyingString)),
verbose)
.flatMap(eventFilter.filterEvent(_).toList)
val submitterIsSubscriber =
trans.submitter
.map(SimpleString.assertFromString)
.fold(false)(eventFilter.isSubmitterSubscriber)
if (events.nonEmpty || submitterIsSubscriber) {
Some(
PTransaction(
transactionId = trans.transactionId,
commandId = if (submitterIsSubscriber) trans.commandId.getOrElse("") else "",
workflowId = trans.workflowId,
effectiveAt = Some(fromInstant(trans.recordTime)),
events = events,
offset = trans.offset
))
} else None
}
override def getTransactionTrees(request: GetTransactionTreesRequest)
: Source[WithOffset[String, VisibleTransaction], NotUsed] = {
logger.debug("Received {}", request)
@ -126,25 +137,20 @@ class SandboxTransactionService private (val ledgerBackend: LedgerBackend, paral
request.end
)
.mapConcat { trans =>
toResponseIfVisible(request, request.parties, trans.offset, trans)
toResponseIfVisible(request.parties, trans)
.fold(List.empty[WithOffset[String, VisibleTransaction]])(e =>
List(WithOffset(trans.offset, e)))
}
}
private def toResponseIfVisible(
request: GetTransactionTreesRequest,
subscribingParties: Set[Party],
offset: String,
trans: AcceptedTransaction) = {
val eventFilter = TransactionFilter(request.parties.map(_ -> Filters.noFilter)(breakOut))
private def toResponseIfVisible(subscribingParties: Set[Party], trans: AcceptedTransaction) = {
val eventFilter = TransactionFilter(subscribingParties.map(_ -> Filters.noFilter)(breakOut))
val withMeta = toTransactionWithMeta(trans)
VisibleTransaction.toVisibleTransaction(eventFilter, withMeta)
}
def getTransactionByEventId(
override def getTransactionByEventId(
request: GetTransactionByEventIdRequest): Future[Option[VisibleTransaction]] = {
logger.debug("Received {}", request)
SandboxEventIdFormatter
@ -155,46 +161,76 @@ class SandboxTransactionService private (val ledgerBackend: LedgerBackend, paral
.withDescription(s"invalid eventId: ${request.eventId}")
.asRuntimeException())) {
case TransactionIdWithIndex(transactionId, index) =>
ledgerBackend.getCurrentLedgerEnd.flatMap(
le =>
lookUpByTransactionId(
TransactionId(transactionId),
request.requestingParties,
le,
true))
lookUpTreeByTransactionId(TransactionId(transactionId), request.requestingParties)
}
}
def getTransactionById(request: GetTransactionByIdRequest): Future[Option[VisibleTransaction]] = {
override def getTransactionById(
request: GetTransactionByIdRequest): Future[Option[VisibleTransaction]] = {
logger.debug("Received {}", request)
ledgerBackend.getCurrentLedgerEnd.flatMap(le =>
lookUpByTransactionId(request.transactionId, request.requestingParties, le, true))
lookUpTreeByTransactionId(request.transactionId, request.requestingParties)
}
def getLedgerEnd(ledgerId: String): Future[LedgerOffset.Absolute] =
override def getFlatTransactionByEventId(
request: GetTransactionByEventIdRequest): Future[Option[PTransaction]] = {
SandboxEventIdFormatter
.split(request.eventId.unwrap)
.fold(
Future.failed[Option[PTransaction]](
Status.INVALID_ARGUMENT
.withDescription(s"invalid eventId: ${request.eventId}")
.asRuntimeException())) {
case TransactionIdWithIndex(transactionId, index) =>
lookUpFlatByTransactionId(TransactionId(transactionId), request.requestingParties)
}
}
override def getFlatTransactionById(
request: GetTransactionByIdRequest): Future[Option[PTransaction]] = {
lookUpFlatByTransactionId(request.transactionId, request.requestingParties)
}
override def getLedgerEnd(ledgerId: String): Future[LedgerOffset.Absolute] =
ledgerBackend.getCurrentLedgerEnd.map(LedgerOffset.Absolute)
override lazy val offsetOrdering: Ordering[LedgerOffset.Absolute] =
Ordering.by(abs => BigInt(abs.value))
private def lookUpByTransactionId(
private def lookUpTreeByTransactionId(
transactionId: TransactionId,
requestingParties: Set[Party],
ledgerEnd: String,
verbose: Boolean): Future[Option[VisibleTransaction]] =
//TODO: very inefficient especially with Postgres, see https://github.com/digital-asset/daml/issues/831
requestingParties: Set[Party]): Future[Option[VisibleTransaction]] = {
transactionPipeline
.run(LedgerOffset.LedgerBegin, Some(LedgerOffset.Absolute(ledgerEnd)))
.run(LedgerOffset.LedgerBegin, Some(LedgerOffset.LedgerEnd))
.collect {
case t: AcceptedTransaction if t.transactionId == transactionId => t
}
.runWith(Sink.headOption)
.flatMap {
case Some(trans) =>
val result = VisibleTransaction.toVisibleTransaction(
TransactionFilter.allForParties(requestingParties),
toTransactionWithMeta(trans)
)
Future.successful(toResponseIfVisible(requestingParties, trans))
case None =>
Future.failed(
Status.INVALID_ARGUMENT
.withDescription(s"$transactionId could not be found")
.asRuntimeException())
}
}
private def lookUpFlatByTransactionId(
transactionId: TransactionId,
requestingParties: Set[Party]): Future[Option[PTransaction]] = {
transactionPipeline
.run(LedgerOffset.LedgerBegin, Some(LedgerOffset.LedgerEnd))
.collect {
case t: AcceptedTransaction if t.transactionId == transactionId => t
}
.runWith(Sink.headOption)
.flatMap {
case Some(trans) =>
val eventFilter = EventFilter.byTemplates(
TransactionFilter(requestingParties.map(_ -> Filters.noFilter)(breakOut)))
val result = acceptedToFlat(trans, verbose = true, eventFilter)
Future.successful(result)
case None =>
@ -203,6 +239,7 @@ class SandboxTransactionService private (val ledgerBackend: LedgerBackend, paral
.withDescription(s"$transactionId could not be found")
.asRuntimeException())
}
}
private def toTransactionWithMeta(trans: AcceptedTransaction) =
TransactionWithMeta(