From c38703ec841bf5b176b36cfb8f5086745e3f41d5 Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Wed, 18 Aug 2021 19:59:14 +0200 Subject: [PATCH] participant-integration-api: Store a status gRPC protobuf. [KVL-1005] (#10600) * participant-integration-api: Construct completions in one place. * sandbox-classic: Inline `CompletionFromTransaction#apply`. It's only used here; there's no reason to keep it in the _participant-integration-api_. * participant-integration-api: Store a status gRPC protobuf. Instead of storing the status code and message, we store a serialized `google.rpc.Status` protocol buffers message. This allows us to pass through any additional information reported by the driver `ReadService`. The migration is only done for the append-only database, and preserves old data in the existing columns. New data will only be written to the new column. CHANGELOG_BEGIN CHANGELOG_END * participant-integration-api: Improve comments in migrations. Co-authored-by: Fabio Tudone * participant-integration-api: Further improvements to migrations. * participant-integration-api: Store the rejection status as 3 columns. Serializing the details but keeping the code and message columns populated. * participant-integration-api: Publish the indexer protobuf to Maven. Co-authored-by: Fabio Tudone --- .../participant-integration-api/BUILD.bazel | 15 ++++ .../main/protobuf/daml/platform/index.proto | 22 ++++++ .../V1__Append_only_schema.sha256 | 2 +- .../V1__Append_only_schema.sql | 10 ++- .../V1__Append_only_schema.sha256 | 2 +- .../V1__Append_only_schema.sql | 17 +++-- ...__add_rejection_status_proto_column.sha256 | 1 + ...106__add_rejection_status_proto_column.sql | 15 ++++ .../store/CompletionFromTransaction.scala | 74 +++++++------------ .../scala/platform/store/backend/DbDto.scala | 5 +- .../store/backend/UpdateToDbDto.scala | 12 ++- .../CompletionStorageBackendTemplate.scala | 41 +++++----- .../store/backend/common/Schema.scala | 5 +- .../store/dao/CommandCompletionsTable.scala | 16 ++-- .../store/backend/UpdateToDbDtoSpec.scala | 58 +++++++++------ .../ledger/inmemory/InMemoryLedger.scala | 52 +++++++++++-- release/artifacts.yaml | 2 + 17 files changed, 222 insertions(+), 127 deletions(-) create mode 100644 ledger/participant-integration-api/src/main/protobuf/daml/platform/index.proto create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql diff --git a/ledger/participant-integration-api/BUILD.bazel b/ledger/participant-integration-api/BUILD.bazel index ed24df7206..e96c3c8357 100644 --- a/ledger/participant-integration-api/BUILD.bazel +++ b/ledger/participant-integration-api/BUILD.bazel @@ -4,6 +4,7 @@ load("@oracle//:index.bzl", "oracle_testing") load("@os_info//:os_info.bzl", "is_windows") load("@scala_version//:index.bzl", "scala_major_version", "scala_major_version_suffix") +load("//bazel_tools:proto.bzl", "proto_jars") load( "//bazel_tools:scala.bzl", "da_scala_binary", @@ -16,7 +17,20 @@ load( load("//bazel_tools:pom_file.bzl", "pom_file") load("//rules_daml:daml.bzl", "daml_compile") +proto_jars( + name = "participant-integration-api-proto", + srcs = glob(["src/main/protobuf/**/*.proto"]), + maven_artifact_prefix = "participant-integration-api", + maven_group = "com.daml", + strip_import_prefix = "src/main/protobuf", + visibility = ["//visibility:public"], + deps = [ + "@com_google_protobuf//:any_proto", + ], +) + compile_deps = [ + ":participant-integration-api-proto_scala", "//daml-lf/archive:daml_lf_archive_reader", "//daml-lf/archive:daml_lf_dev_archive_proto_java", "//daml-lf/data", @@ -238,6 +252,7 @@ da_scala_test_suite( ], deps = [ ":participant-integration-api", + ":participant-integration-api-proto_scala", ":participant-integration-api-tests-lib", "//bazel_tools/runfiles:scala_runfiles", "//daml-lf/archive:daml_lf_archive_reader", diff --git a/ledger/participant-integration-api/src/main/protobuf/daml/platform/index.proto b/ledger/participant-integration-api/src/main/protobuf/daml/platform/index.proto new file mode 100644 index 0000000000..089a9f529a --- /dev/null +++ b/ledger/participant-integration-api/src/main/protobuf/daml/platform/index.proto @@ -0,0 +1,22 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +// Serialization format for Protocol Buffers values stored in the index. +// +// WARNING: +// As all messages declared here represent values stored to the index database, we MUST ensure that +// they remain backwards-compatible forever. + +syntax = "proto3"; + +package daml.platform.index; + +option java_package = "com.daml.platform.index"; + +import "google/protobuf/any.proto"; + +// Serialized status details, conveyed from the driver `ReadService` to the ledger API client. +// To be combined with a status code and message. +message StatusDetails { + repeated google.protobuf.Any details = 1; +} diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 index e4020ae8be..4c3c9ff059 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 @@ -1 +1 @@ -0d8451829143e22581afc5b31930bb0ff8c5efab89a3eaca0ed2e57454fc6823 +e560dcc7af3b3b333a3d61668c351c7c1a1a0ac088bf83b59e0b4699a8073951 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql index b7adac9311..ba9d4b1264 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql @@ -121,9 +121,15 @@ CREATE TABLE participant_command_completions ( application_id VARCHAR NOT NULL, submitters ARRAY NOT NULL, command_id VARCHAR NOT NULL, + -- The transaction ID is `NULL` for rejected transactions. transaction_id VARCHAR, - status_code INTEGER, - status_message VARCHAR + -- The three columns below are `NULL` if the completion is for an accepted transaction. + -- The `rejection_status_details` column contains a Protocol-Buffers-serialized message of type + -- `daml.platform.index.StatusDetails`, containing the code, message, and further details + -- (decided by the ledger driver), and may be `NULL` even if the other two columns are set. + rejection_status_code INTEGER, + rejection_status_message VARCHAR, + rejection_status_details BYTEA ); CREATE INDEX participant_command_completion_offset_application_idx ON participant_command_completions (completion_offset, application_id); diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 index 8bfc6d49db..3cb235847d 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 @@ -1 +1 @@ -c6e4c74b1c854a51d9cd79d3c614b60a372ca60e88b8d69cbd90a7a674389080 +16adf83956d0f884d1d8886f317b3c0929839a9b0c385b5cb40c6e42cf328ef4 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql index 1570b88d6b..41fd17fe1a 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql @@ -124,16 +124,17 @@ CREATE INDEX idx_party_entries ON party_entries(submission_id); CREATE TABLE participant_command_completions ( - completion_offset VARCHAR2(4000) not null, - record_time TIMESTAMP not null, + completion_offset VARCHAR2(4000) NOT NULL, + record_time TIMESTAMP NOT NULL, - application_id NVARCHAR2(1000) not null, - submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON), - command_id NVARCHAR2(1000) not null, + application_id NVARCHAR2(1000) NOT NULL, + submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON), + command_id NVARCHAR2(1000) NOT NULL, - transaction_id NVARCHAR2(1000), -- null if the command was rejected and checkpoints - status_code INTEGER, -- null for successful command and checkpoints - status_message CLOB -- null for successful command and checkpoints + transaction_id NVARCHAR2(1000), -- null for rejected transactions and checkpoints + rejection_status_code INTEGER, -- null for accepted transactions and checkpoints + rejection_status_message CLOB, -- null for accepted transactions and checkpoints + rejection_status_details BLOB -- null for accepted transactions and checkpoints ); CREATE INDEX participant_command_completions_idx ON participant_command_completions(completion_offset, application_id); diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 new file mode 100644 index 0000000000..d30ded888f --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 @@ -0,0 +1 @@ +824b012135f72b24d8393c412535432e4b2c669075e012971220f14eeb7bdaae diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql new file mode 100644 index 0000000000..0c4befb837 --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql @@ -0,0 +1,15 @@ +-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + +-- Adds a column to store extra details for the rejection status. +-- +-- The `rejection_status_details` column contains a Protocol-Buffers-serialized message of type +-- `daml.platform.index.StatusDetails`, containing the code, message, and further details +-- (decided by the ledger driver), and may be `NULL` even if the other two columns are set. + +ALTER TABLE participant_command_completions + RENAME COLUMN status_code TO rejection_status_code; +ALTER TABLE participant_command_completions + RENAME COLUMN status_message TO rejection_status_message; +ALTER TABLE participant_command_completions + ADD COLUMN rejection_status_details BYTEA; diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/CompletionFromTransaction.scala b/ledger/participant-integration-api/src/main/scala/platform/store/CompletionFromTransaction.scala index b2250d5ea4..e51780b710 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/CompletionFromTransaction.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/CompletionFromTransaction.scala @@ -10,60 +10,42 @@ import com.daml.ledger.api.v1.command_completion_service.{Checkpoint, Completion import com.daml.ledger.api.v1.completion.Completion import com.daml.ledger.api.v1.ledger_offset.LedgerOffset import com.daml.ledger.offset.Offset -import com.daml.lf.data.Ref import com.daml.platform.ApiOffset.ApiOffsetConverter -import com.daml.platform.store.Conversions.RejectionReasonOps -import com.daml.platform.store.entries.LedgerEntry -import com.google.rpc.status.Status +import com.google.rpc.status.{Status => StatusProto} +import io.grpc.Status // Turn a stream of transactions into a stream of completions for a given application and set of parties // TODO Restrict the scope of this to com.daml.platform.store.dao when // TODO - the in-memory sandbox is gone private[platform] object CompletionFromTransaction { + private val OkStatus = StatusProto.of(Status.Code.OK.value(), "", Seq.empty) + private val RejectionTransactionId = "" - def toApiCheckpoint(recordTime: Instant, offset: Offset): Some[Checkpoint] = - Some( - Checkpoint( - recordTime = Some(fromInstant(recordTime)), - offset = Some(LedgerOffset(LedgerOffset.Value.Absolute(offset.toApiString))), - ) + def acceptedCompletion( + recordTime: Instant, + offset: Offset, + commandId: String, + transactionId: String, + ): CompletionStreamResponse = + CompletionStreamResponse.of( + checkpoint = Some(toApiCheckpoint(recordTime, offset)), + completions = Seq(Completion.of(commandId, Some(OkStatus), transactionId)), ) - // Filter completions for transactions for which we have the full submitter information: appId, submitter, cmdId - // This doesn't make a difference for the sandbox (because it represents the ledger backend + api server in single package). - // But for an api server that is part of a distributed ledger network, we might see - // transactions that originated from some other api server. These transactions don't contain the submitter information, - // and therefore we don't emit CommandAccepted completions for those - def apply( - appId: Ref.ApplicationId, - parties: Set[Ref.Party], - ): PartialFunction[(Offset, LedgerEntry), (Offset, CompletionStreamResponse)] = { - case ( - offset, - LedgerEntry.Transaction( - Some(commandId), - transactionId, - Some(`appId`), - _, - actAs, - _, - _, - recordTime, - _, - _, - ), - ) if actAs.exists(parties) => - offset -> CompletionStreamResponse( - checkpoint = toApiCheckpoint(recordTime, offset), - Seq(Completion(commandId, Some(Status()), transactionId)), - ) + def rejectedCompletion( + recordTime: Instant, + offset: Offset, + commandId: String, + status: StatusProto, + ): CompletionStreamResponse = + CompletionStreamResponse.of( + checkpoint = Some(toApiCheckpoint(recordTime, offset)), + completions = Seq(Completion.of(commandId, Some(status), RejectionTransactionId)), + ) - case (offset, LedgerEntry.Rejection(recordTime, commandId, `appId`, _, actAs, reason)) - if actAs.exists(parties) => - val status = reason.toParticipantStateRejectionReason.status - offset -> CompletionStreamResponse( - checkpoint = toApiCheckpoint(recordTime, offset), - Seq(Completion(commandId, Some(status))), - ) - } + private def toApiCheckpoint(recordTime: Instant, offset: Offset): Checkpoint = + Checkpoint.of( + recordTime = Some(fromInstant(recordTime)), + offset = Some(LedgerOffset.of(LedgerOffset.Value.Absolute(offset.toApiString))), + ) } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala index 09a5c1b4e1..ac25caa191 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala @@ -133,8 +133,9 @@ object DbDto { submitters: Set[String], command_id: String, transaction_id: Option[String], - status_code: Option[Int], - status_message: Option[String], + rejection_status_code: Option[Int], + rejection_status_message: Option[String], + rejection_status_details: Option[Array[Byte]], ) extends DbDto final case class CommandDeduplication(deduplication_key: String) extends DbDto diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala index 005a7f68b5..01c35fdb2e 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala @@ -12,6 +12,7 @@ import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf.data.Ref import com.daml.lf.engine.Blinding import com.daml.lf.ledger.EventId +import com.daml.platform.index.index.StatusDetails import com.daml.platform.store.appendonlydao.JdbcLedgerDao import com.daml.platform.store.appendonlydao.events._ import com.daml.platform.store.dao.DeduplicationKeyMaker @@ -34,8 +35,10 @@ object UpdateToDbDto { submitters = u.completionInfo.actAs.toSet, command_id = u.completionInfo.commandId, transaction_id = None, - status_code = Some(u.reasonTemplate.code), - status_message = Some(u.reasonTemplate.message), + rejection_status_code = Some(u.reasonTemplate.code), + rejection_status_message = Some(u.reasonTemplate.message), + rejection_status_details = + Some(StatusDetails.of(u.reasonTemplate.status.details).toByteArray), ), DbDto.CommandDeduplication( DeduplicationKeyMaker.make( @@ -269,8 +272,9 @@ object UpdateToDbDto { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(u.transactionId), - status_code = None, - status_message = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ) } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala index 0e58bdcb42..659c91c218 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala @@ -6,42 +6,44 @@ package com.daml.platform.store.backend.common import java.sql.Connection import java.time.Instant +import anorm.SqlParser.{binaryStream, int, str} import anorm.{RowParser, ~} -import anorm.SqlParser.{int, str} import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse -import com.daml.ledger.api.v1.completion.Completion import com.daml.ledger.offset.Offset import com.daml.lf.data.Ref import com.daml.lf.data.Ref.Party -import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint +import com.daml.platform.index.index.StatusDetails +import com.daml.platform.store.CompletionFromTransaction import com.daml.platform.store.Conversions.{instant, offset} import com.daml.platform.store.backend.CompletionStorageBackend -import com.google.rpc.status.Status +import com.google.rpc.status.{Status => StatusProto} trait CompletionStorageBackendTemplate extends CompletionStorageBackend { def queryStrategy: QueryStrategy - private val sharedCompletionColumns: RowParser[Offset ~ Instant ~ String] = + private val sharedColumns: RowParser[Offset ~ Instant ~ String] = offset("completion_offset") ~ instant("record_time") ~ str("command_id") private val acceptedCommandParser: RowParser[CompletionStreamResponse] = - sharedCompletionColumns ~ str("transaction_id") map { + sharedColumns ~ str("transaction_id") map { case offset ~ recordTime ~ commandId ~ transactionId => - CompletionStreamResponse( - checkpoint = toApiCheckpoint(recordTime, offset), - completions = Seq(Completion(commandId, Some(Status()), transactionId)), - ) + CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId) } private val rejectedCommandParser: RowParser[CompletionStreamResponse] = - sharedCompletionColumns ~ int("status_code") ~ str("status_message") map { - case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage => - CompletionStreamResponse( - checkpoint = toApiCheckpoint(recordTime, offset), - completions = Seq(Completion(commandId, Some(Status(statusCode, statusMessage)))), - ) - } + sharedColumns ~ + int("rejection_status_code") ~ + str("rejection_status_message") ~ + binaryStream("rejection_status_details").? map { + case offset ~ recordTime ~ commandId ~ + rejectionStatusCode ~ rejectionStatusMessage ~ rejectionStatusDetails => + val details = rejectionStatusDetails + .map(stream => StatusDetails.parseFrom(stream).details) + .getOrElse(Seq.empty) + val status = StatusProto.of(rejectionStatusCode, rejectionStatusMessage, details) + CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status) + } private val completionParser: RowParser[CompletionStreamResponse] = acceptedCommandParser | rejectedCommandParser @@ -61,8 +63,9 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend { record_time, command_id, transaction_id, - status_code, - status_message + rejection_status_code, + rejection_status_message, + rejection_status_details FROM participant_command_completions WHERE diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala index 5214c23ad3..e76a5d2ac6 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala @@ -225,8 +225,9 @@ private[backend] object AppendOnlySchema { "submitters" -> fieldStrategy.stringArray(_.submitters), "command_id" -> fieldStrategy.string(_.command_id), "transaction_id" -> fieldStrategy.stringOptional(_.transaction_id), - "status_code" -> fieldStrategy.intOptional(_.status_code), - "status_message" -> fieldStrategy.stringOptional(_.status_message), + "rejection_status_code" -> fieldStrategy.intOptional(_.rejection_status_code), + "rejection_status_message" -> fieldStrategy.stringOptional(_.rejection_status_message), + "rejection_status_details" -> fieldStrategy.byteaOptional(_.rejection_status_details), ) val commandSubmissionDeletes: Table[DbDto.CommandDeduplication] = diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/dao/CommandCompletionsTable.scala b/ledger/participant-integration-api/src/main/scala/platform/store/dao/CommandCompletionsTable.scala index 5e106df1de..ae1dca0a71 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/dao/CommandCompletionsTable.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/dao/CommandCompletionsTable.scala @@ -7,13 +7,12 @@ import java.time.Instant import anorm.{Row, RowParser, SimpleSql, SqlParser, SqlStringInterpolation, ~} import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse -import com.daml.ledger.api.v1.completion.Completion import com.daml.ledger.offset.Offset import com.daml.lf.data.Ref -import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint +import com.daml.platform.store.CompletionFromTransaction import com.daml.platform.store.Conversions._ import com.daml.platform.store.dao.events.SqlFunctions -import com.google.rpc.status.Status +import com.google.rpc.status.{Status => StatusProto} private[platform] object CommandCompletionsTable { @@ -25,19 +24,14 @@ private[platform] object CommandCompletionsTable { private val acceptedCommandParser: RowParser[CompletionStreamResponse] = sharedColumns ~ str("transaction_id") map { case offset ~ recordTime ~ commandId ~ transactionId => - CompletionStreamResponse( - checkpoint = toApiCheckpoint(recordTime, offset), - completions = Seq(Completion(commandId, Some(Status()), transactionId)), - ) + CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId) } private val rejectedCommandParser: RowParser[CompletionStreamResponse] = sharedColumns ~ int("status_code") ~ str("status_message") map { case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage => - CompletionStreamResponse( - checkpoint = toApiCheckpoint(recordTime, offset), - completions = Seq(Completion(commandId, Some(Status(statusCode, statusMessage)))), - ) + val status = StatusProto.of(statusCode, statusMessage, Seq.empty) + CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status) } val parser: RowParser[CompletionStreamResponse] = acceptedCommandParser | rejectedCommandParser diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala index 9edeb5afa2..5a69df559e 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala @@ -18,6 +18,7 @@ import com.daml.lf.transaction.BlindingInfo import com.daml.lf.transaction.test.TransactionBuilder import com.daml.lf.value.Value import com.daml.logging.LoggingContext +import com.daml.platform.index.index.StatusDetails import com.daml.platform.store.appendonlydao.JdbcLedgerDao import com.daml.platform.store.appendonlydao.events.Raw.TreeEvent import com.daml.platform.store.appendonlydao.events.{ @@ -31,7 +32,7 @@ import com.daml.platform.store.appendonlydao.events.{ } import com.daml.platform.store.dao.DeduplicationKeyMaker import com.google.protobuf.ByteString -import com.google.rpc.status.{Status => RpcStatus} +import com.google.rpc.status.{Status => StatusProto} import io.grpc.Status import org.scalactic.TripleEquals._ import org.scalatest.matchers.should.Matchers @@ -258,12 +259,11 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { optDeduplicationPeriod = None, submissionId = someSubmissionId, ) + val status = StatusProto.of(Status.Code.ABORTED.value(), "test reason", Seq.empty) val update = state.Update.CommandRejected( someRecordTime, completionInfo, - new state.Update.CommandRejected.FinalReason( - RpcStatus.of(Status.Code.ABORTED.value(), "test reason", Seq.empty) - ), + new state.Update.CommandRejected.FinalReason(status), ) val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)( someOffset @@ -277,8 +277,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = Set(someParty), command_id = someCommandId, transaction_id = None, - status_code = Some(Status.Code.ABORTED.value()), - status_message = Some("test reason"), + rejection_status_code = Some(status.code), + rejection_status_message = Some(status.message), + rejection_status_details = Some(StatusDetails.of(status.details).toByteArray), ), DbDto.CommandDeduplication( DeduplicationKeyMaker.make( @@ -348,8 +349,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -415,8 +417,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -494,8 +497,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -573,8 +577,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -735,8 +740,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -831,8 +837,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -951,8 +958,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -1048,8 +1056,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -1122,8 +1131,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } diff --git a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala index a88abd9c8d..269dfb2e2f 100644 --- a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala +++ b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala @@ -56,6 +56,7 @@ import com.daml.platform.sandbox.stores.ledger.inmemory.InMemoryLedger._ import com.daml.platform.sandbox.stores.ledger.{Ledger, Rejection} import com.daml.platform.store.CompletionFromTransaction import com.daml.platform.store.Contract.ActiveContract +import com.daml.platform.store.Conversions.RejectionReasonOps import com.daml.platform.store.entries.{ ConfigurationEntry, LedgerEntry, @@ -166,13 +167,50 @@ private[sandbox] final class InMemoryLedger( endInclusive: Option[Offset], applicationId: ApplicationId, parties: Set[Ref.Party], - )(implicit loggingContext: LoggingContext): Source[(Offset, CompletionStreamResponse), NotUsed] = - entries - .getSource(startExclusive, endInclusive) - .collect { case (offset, InMemoryLedgerEntry(entry)) => - (offset, entry) - } - .collect(CompletionFromTransaction(applicationId.unwrap, parties)) + )(implicit + loggingContext: LoggingContext + ): Source[(Offset, CompletionStreamResponse), NotUsed] = { + val appId = applicationId.unwrap + entries.getSource(startExclusive, endInclusive).collect { + case ( + offset, + InMemoryLedgerEntry( + LedgerEntry.Transaction( + Some(commandId), + transactionId, + Some(`appId`), + _, + actAs, + _, + _, + recordTime, + _, + _, + ) + ), + ) if actAs.exists(parties) => + offset -> CompletionFromTransaction.acceptedCompletion( + recordTime, + offset, + commandId, + transactionId, + ) + + case ( + offset, + InMemoryLedgerEntry( + LedgerEntry.Rejection(recordTime, commandId, `appId`, _, actAs, reason) + ), + ) if actAs.exists(parties) => + val status = reason.toParticipantStateRejectionReason.status + offset -> CompletionFromTransaction.rejectedCompletion( + recordTime, + offset, + commandId, + status, + ) + } + } override def ledgerEnd()(implicit loggingContext: LoggingContext): Offset = entries.ledgerEnd diff --git a/release/artifacts.yaml b/release/artifacts.yaml index 006ce9174b..ce9d6bae69 100644 --- a/release/artifacts.yaml +++ b/release/artifacts.yaml @@ -155,6 +155,8 @@ type: jar-scala - target: //ledger/participant-integration-api:participant-integration-api type: jar-scala +- target: //ledger/participant-integration-api:participant-integration-api-proto_scala + type: jar-scala - target: //ledger/participant-state:participant-state type: jar-scala - target: //ledger/participant-state/kvutils:daml_kvutils_proto_jar