mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
participant-integration-api: Store a status gRPC protobuf. [KVL-1005] (#10600)
* participant-integration-api: Construct completions in one place. * sandbox-classic: Inline `CompletionFromTransaction#apply`. It's only used here; there's no reason to keep it in the _participant-integration-api_. * participant-integration-api: Store a status gRPC protobuf. Instead of storing the status code and message, we store a serialized `google.rpc.Status` protocol buffers message. This allows us to pass through any additional information reported by the driver `ReadService`. The migration is only done for the append-only database, and preserves old data in the existing columns. New data will only be written to the new column. CHANGELOG_BEGIN CHANGELOG_END * participant-integration-api: Improve comments in migrations. Co-authored-by: Fabio Tudone <fabio.tudone@digitalasset.com> * participant-integration-api: Further improvements to migrations. * participant-integration-api: Store the rejection status as 3 columns. Serializing the details but keeping the code and message columns populated. * participant-integration-api: Publish the indexer protobuf to Maven. Co-authored-by: Fabio Tudone <fabio.tudone@digitalasset.com>
This commit is contained in:
parent
0af5b49484
commit
c38703ec84
@ -4,6 +4,7 @@
|
||||
load("@oracle//:index.bzl", "oracle_testing")
|
||||
load("@os_info//:os_info.bzl", "is_windows")
|
||||
load("@scala_version//:index.bzl", "scala_major_version", "scala_major_version_suffix")
|
||||
load("//bazel_tools:proto.bzl", "proto_jars")
|
||||
load(
|
||||
"//bazel_tools:scala.bzl",
|
||||
"da_scala_binary",
|
||||
@ -16,7 +17,20 @@ load(
|
||||
load("//bazel_tools:pom_file.bzl", "pom_file")
|
||||
load("//rules_daml:daml.bzl", "daml_compile")
|
||||
|
||||
proto_jars(
|
||||
name = "participant-integration-api-proto",
|
||||
srcs = glob(["src/main/protobuf/**/*.proto"]),
|
||||
maven_artifact_prefix = "participant-integration-api",
|
||||
maven_group = "com.daml",
|
||||
strip_import_prefix = "src/main/protobuf",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"@com_google_protobuf//:any_proto",
|
||||
],
|
||||
)
|
||||
|
||||
compile_deps = [
|
||||
":participant-integration-api-proto_scala",
|
||||
"//daml-lf/archive:daml_lf_archive_reader",
|
||||
"//daml-lf/archive:daml_lf_dev_archive_proto_java",
|
||||
"//daml-lf/data",
|
||||
@ -238,6 +252,7 @@ da_scala_test_suite(
|
||||
],
|
||||
deps = [
|
||||
":participant-integration-api",
|
||||
":participant-integration-api-proto_scala",
|
||||
":participant-integration-api-tests-lib",
|
||||
"//bazel_tools/runfiles:scala_runfiles",
|
||||
"//daml-lf/archive:daml_lf_archive_reader",
|
||||
|
@ -0,0 +1,22 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
// Serialization format for Protocol Buffers values stored in the index.
|
||||
//
|
||||
// WARNING:
|
||||
// As all messages declared here represent values stored to the index database, we MUST ensure that
|
||||
// they remain backwards-compatible forever.
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package daml.platform.index;
|
||||
|
||||
option java_package = "com.daml.platform.index";
|
||||
|
||||
import "google/protobuf/any.proto";
|
||||
|
||||
// Serialized status details, conveyed from the driver `ReadService` to the ledger API client.
|
||||
// To be combined with a status code and message.
|
||||
message StatusDetails {
|
||||
repeated google.protobuf.Any details = 1;
|
||||
}
|
@ -1 +1 @@
|
||||
0d8451829143e22581afc5b31930bb0ff8c5efab89a3eaca0ed2e57454fc6823
|
||||
e560dcc7af3b3b333a3d61668c351c7c1a1a0ac088bf83b59e0b4699a8073951
|
||||
|
@ -121,9 +121,15 @@ CREATE TABLE participant_command_completions (
|
||||
application_id VARCHAR NOT NULL,
|
||||
submitters ARRAY NOT NULL,
|
||||
command_id VARCHAR NOT NULL,
|
||||
-- The transaction ID is `NULL` for rejected transactions.
|
||||
transaction_id VARCHAR,
|
||||
status_code INTEGER,
|
||||
status_message VARCHAR
|
||||
-- The three columns below are `NULL` if the completion is for an accepted transaction.
|
||||
-- The `rejection_status_details` column contains a Protocol-Buffers-serialized message of type
|
||||
-- `daml.platform.index.StatusDetails`, containing the code, message, and further details
|
||||
-- (decided by the ledger driver), and may be `NULL` even if the other two columns are set.
|
||||
rejection_status_code INTEGER,
|
||||
rejection_status_message VARCHAR,
|
||||
rejection_status_details BYTEA
|
||||
);
|
||||
|
||||
CREATE INDEX participant_command_completion_offset_application_idx ON participant_command_completions (completion_offset, application_id);
|
||||
|
@ -1 +1 @@
|
||||
c6e4c74b1c854a51d9cd79d3c614b60a372ca60e88b8d69cbd90a7a674389080
|
||||
16adf83956d0f884d1d8886f317b3c0929839a9b0c385b5cb40c6e42cf328ef4
|
||||
|
@ -124,16 +124,17 @@ CREATE INDEX idx_party_entries ON party_entries(submission_id);
|
||||
|
||||
CREATE TABLE participant_command_completions
|
||||
(
|
||||
completion_offset VARCHAR2(4000) not null,
|
||||
record_time TIMESTAMP not null,
|
||||
completion_offset VARCHAR2(4000) NOT NULL,
|
||||
record_time TIMESTAMP NOT NULL,
|
||||
|
||||
application_id NVARCHAR2(1000) not null,
|
||||
submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON),
|
||||
command_id NVARCHAR2(1000) not null,
|
||||
application_id NVARCHAR2(1000) NOT NULL,
|
||||
submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON),
|
||||
command_id NVARCHAR2(1000) NOT NULL,
|
||||
|
||||
transaction_id NVARCHAR2(1000), -- null if the command was rejected and checkpoints
|
||||
status_code INTEGER, -- null for successful command and checkpoints
|
||||
status_message CLOB -- null for successful command and checkpoints
|
||||
transaction_id NVARCHAR2(1000), -- null for rejected transactions and checkpoints
|
||||
rejection_status_code INTEGER, -- null for accepted transactions and checkpoints
|
||||
rejection_status_message CLOB, -- null for accepted transactions and checkpoints
|
||||
rejection_status_details BLOB -- null for accepted transactions and checkpoints
|
||||
);
|
||||
|
||||
CREATE INDEX participant_command_completions_idx ON participant_command_completions(completion_offset, application_id);
|
||||
|
@ -0,0 +1 @@
|
||||
824b012135f72b24d8393c412535432e4b2c669075e012971220f14eeb7bdaae
|
@ -0,0 +1,15 @@
|
||||
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
-- SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
-- Adds a column to store extra details for the rejection status.
|
||||
--
|
||||
-- The `rejection_status_details` column contains a Protocol-Buffers-serialized message of type
|
||||
-- `daml.platform.index.StatusDetails`, containing the code, message, and further details
|
||||
-- (decided by the ledger driver), and may be `NULL` even if the other two columns are set.
|
||||
|
||||
ALTER TABLE participant_command_completions
|
||||
RENAME COLUMN status_code TO rejection_status_code;
|
||||
ALTER TABLE participant_command_completions
|
||||
RENAME COLUMN status_message TO rejection_status_message;
|
||||
ALTER TABLE participant_command_completions
|
||||
ADD COLUMN rejection_status_details BYTEA;
|
@ -10,60 +10,42 @@ import com.daml.ledger.api.v1.command_completion_service.{Checkpoint, Completion
|
||||
import com.daml.ledger.api.v1.completion.Completion
|
||||
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
|
||||
import com.daml.ledger.offset.Offset
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.platform.ApiOffset.ApiOffsetConverter
|
||||
import com.daml.platform.store.Conversions.RejectionReasonOps
|
||||
import com.daml.platform.store.entries.LedgerEntry
|
||||
import com.google.rpc.status.Status
|
||||
import com.google.rpc.status.{Status => StatusProto}
|
||||
import io.grpc.Status
|
||||
|
||||
// Turn a stream of transactions into a stream of completions for a given application and set of parties
|
||||
// TODO Restrict the scope of this to com.daml.platform.store.dao when
|
||||
// TODO - the in-memory sandbox is gone
|
||||
private[platform] object CompletionFromTransaction {
|
||||
private val OkStatus = StatusProto.of(Status.Code.OK.value(), "", Seq.empty)
|
||||
private val RejectionTransactionId = ""
|
||||
|
||||
def toApiCheckpoint(recordTime: Instant, offset: Offset): Some[Checkpoint] =
|
||||
Some(
|
||||
Checkpoint(
|
||||
recordTime = Some(fromInstant(recordTime)),
|
||||
offset = Some(LedgerOffset(LedgerOffset.Value.Absolute(offset.toApiString))),
|
||||
)
|
||||
def acceptedCompletion(
|
||||
recordTime: Instant,
|
||||
offset: Offset,
|
||||
commandId: String,
|
||||
transactionId: String,
|
||||
): CompletionStreamResponse =
|
||||
CompletionStreamResponse.of(
|
||||
checkpoint = Some(toApiCheckpoint(recordTime, offset)),
|
||||
completions = Seq(Completion.of(commandId, Some(OkStatus), transactionId)),
|
||||
)
|
||||
|
||||
// Filter completions for transactions for which we have the full submitter information: appId, submitter, cmdId
|
||||
// This doesn't make a difference for the sandbox (because it represents the ledger backend + api server in single package).
|
||||
// But for an api server that is part of a distributed ledger network, we might see
|
||||
// transactions that originated from some other api server. These transactions don't contain the submitter information,
|
||||
// and therefore we don't emit CommandAccepted completions for those
|
||||
def apply(
|
||||
appId: Ref.ApplicationId,
|
||||
parties: Set[Ref.Party],
|
||||
): PartialFunction[(Offset, LedgerEntry), (Offset, CompletionStreamResponse)] = {
|
||||
case (
|
||||
offset,
|
||||
LedgerEntry.Transaction(
|
||||
Some(commandId),
|
||||
transactionId,
|
||||
Some(`appId`),
|
||||
_,
|
||||
actAs,
|
||||
_,
|
||||
_,
|
||||
recordTime,
|
||||
_,
|
||||
_,
|
||||
),
|
||||
) if actAs.exists(parties) =>
|
||||
offset -> CompletionStreamResponse(
|
||||
checkpoint = toApiCheckpoint(recordTime, offset),
|
||||
Seq(Completion(commandId, Some(Status()), transactionId)),
|
||||
)
|
||||
def rejectedCompletion(
|
||||
recordTime: Instant,
|
||||
offset: Offset,
|
||||
commandId: String,
|
||||
status: StatusProto,
|
||||
): CompletionStreamResponse =
|
||||
CompletionStreamResponse.of(
|
||||
checkpoint = Some(toApiCheckpoint(recordTime, offset)),
|
||||
completions = Seq(Completion.of(commandId, Some(status), RejectionTransactionId)),
|
||||
)
|
||||
|
||||
case (offset, LedgerEntry.Rejection(recordTime, commandId, `appId`, _, actAs, reason))
|
||||
if actAs.exists(parties) =>
|
||||
val status = reason.toParticipantStateRejectionReason.status
|
||||
offset -> CompletionStreamResponse(
|
||||
checkpoint = toApiCheckpoint(recordTime, offset),
|
||||
Seq(Completion(commandId, Some(status))),
|
||||
)
|
||||
}
|
||||
private def toApiCheckpoint(recordTime: Instant, offset: Offset): Checkpoint =
|
||||
Checkpoint.of(
|
||||
recordTime = Some(fromInstant(recordTime)),
|
||||
offset = Some(LedgerOffset.of(LedgerOffset.Value.Absolute(offset.toApiString))),
|
||||
)
|
||||
}
|
||||
|
@ -133,8 +133,9 @@ object DbDto {
|
||||
submitters: Set[String],
|
||||
command_id: String,
|
||||
transaction_id: Option[String],
|
||||
status_code: Option[Int],
|
||||
status_message: Option[String],
|
||||
rejection_status_code: Option[Int],
|
||||
rejection_status_message: Option[String],
|
||||
rejection_status_details: Option[Array[Byte]],
|
||||
) extends DbDto
|
||||
|
||||
final case class CommandDeduplication(deduplication_key: String) extends DbDto
|
||||
|
@ -12,6 +12,7 @@ import com.daml.ledger.participant.state.{v2 => state}
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.lf.engine.Blinding
|
||||
import com.daml.lf.ledger.EventId
|
||||
import com.daml.platform.index.index.StatusDetails
|
||||
import com.daml.platform.store.appendonlydao.JdbcLedgerDao
|
||||
import com.daml.platform.store.appendonlydao.events._
|
||||
import com.daml.platform.store.dao.DeduplicationKeyMaker
|
||||
@ -34,8 +35,10 @@ object UpdateToDbDto {
|
||||
submitters = u.completionInfo.actAs.toSet,
|
||||
command_id = u.completionInfo.commandId,
|
||||
transaction_id = None,
|
||||
status_code = Some(u.reasonTemplate.code),
|
||||
status_message = Some(u.reasonTemplate.message),
|
||||
rejection_status_code = Some(u.reasonTemplate.code),
|
||||
rejection_status_message = Some(u.reasonTemplate.message),
|
||||
rejection_status_details =
|
||||
Some(StatusDetails.of(u.reasonTemplate.status.details).toByteArray),
|
||||
),
|
||||
DbDto.CommandDeduplication(
|
||||
DeduplicationKeyMaker.make(
|
||||
@ -269,8 +272,9 @@ object UpdateToDbDto {
|
||||
submitters = completionInfo.actAs.toSet,
|
||||
command_id = completionInfo.commandId,
|
||||
transaction_id = Some(u.transactionId),
|
||||
status_code = None,
|
||||
status_message = None,
|
||||
rejection_status_code = None,
|
||||
rejection_status_message = None,
|
||||
rejection_status_details = None,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -6,42 +6,44 @@ package com.daml.platform.store.backend.common
|
||||
import java.sql.Connection
|
||||
import java.time.Instant
|
||||
|
||||
import anorm.SqlParser.{binaryStream, int, str}
|
||||
import anorm.{RowParser, ~}
|
||||
import anorm.SqlParser.{int, str}
|
||||
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
|
||||
import com.daml.ledger.api.v1.completion.Completion
|
||||
import com.daml.ledger.offset.Offset
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.lf.data.Ref.Party
|
||||
import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint
|
||||
import com.daml.platform.index.index.StatusDetails
|
||||
import com.daml.platform.store.CompletionFromTransaction
|
||||
import com.daml.platform.store.Conversions.{instant, offset}
|
||||
import com.daml.platform.store.backend.CompletionStorageBackend
|
||||
import com.google.rpc.status.Status
|
||||
import com.google.rpc.status.{Status => StatusProto}
|
||||
|
||||
trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
|
||||
|
||||
def queryStrategy: QueryStrategy
|
||||
|
||||
private val sharedCompletionColumns: RowParser[Offset ~ Instant ~ String] =
|
||||
private val sharedColumns: RowParser[Offset ~ Instant ~ String] =
|
||||
offset("completion_offset") ~ instant("record_time") ~ str("command_id")
|
||||
|
||||
private val acceptedCommandParser: RowParser[CompletionStreamResponse] =
|
||||
sharedCompletionColumns ~ str("transaction_id") map {
|
||||
sharedColumns ~ str("transaction_id") map {
|
||||
case offset ~ recordTime ~ commandId ~ transactionId =>
|
||||
CompletionStreamResponse(
|
||||
checkpoint = toApiCheckpoint(recordTime, offset),
|
||||
completions = Seq(Completion(commandId, Some(Status()), transactionId)),
|
||||
)
|
||||
CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId)
|
||||
}
|
||||
|
||||
private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
|
||||
sharedCompletionColumns ~ int("status_code") ~ str("status_message") map {
|
||||
case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage =>
|
||||
CompletionStreamResponse(
|
||||
checkpoint = toApiCheckpoint(recordTime, offset),
|
||||
completions = Seq(Completion(commandId, Some(Status(statusCode, statusMessage)))),
|
||||
)
|
||||
}
|
||||
sharedColumns ~
|
||||
int("rejection_status_code") ~
|
||||
str("rejection_status_message") ~
|
||||
binaryStream("rejection_status_details").? map {
|
||||
case offset ~ recordTime ~ commandId ~
|
||||
rejectionStatusCode ~ rejectionStatusMessage ~ rejectionStatusDetails =>
|
||||
val details = rejectionStatusDetails
|
||||
.map(stream => StatusDetails.parseFrom(stream).details)
|
||||
.getOrElse(Seq.empty)
|
||||
val status = StatusProto.of(rejectionStatusCode, rejectionStatusMessage, details)
|
||||
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
|
||||
}
|
||||
|
||||
private val completionParser: RowParser[CompletionStreamResponse] =
|
||||
acceptedCommandParser | rejectedCommandParser
|
||||
@ -61,8 +63,9 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
|
||||
record_time,
|
||||
command_id,
|
||||
transaction_id,
|
||||
status_code,
|
||||
status_message
|
||||
rejection_status_code,
|
||||
rejection_status_message,
|
||||
rejection_status_details
|
||||
FROM
|
||||
participant_command_completions
|
||||
WHERE
|
||||
|
@ -225,8 +225,9 @@ private[backend] object AppendOnlySchema {
|
||||
"submitters" -> fieldStrategy.stringArray(_.submitters),
|
||||
"command_id" -> fieldStrategy.string(_.command_id),
|
||||
"transaction_id" -> fieldStrategy.stringOptional(_.transaction_id),
|
||||
"status_code" -> fieldStrategy.intOptional(_.status_code),
|
||||
"status_message" -> fieldStrategy.stringOptional(_.status_message),
|
||||
"rejection_status_code" -> fieldStrategy.intOptional(_.rejection_status_code),
|
||||
"rejection_status_message" -> fieldStrategy.stringOptional(_.rejection_status_message),
|
||||
"rejection_status_details" -> fieldStrategy.byteaOptional(_.rejection_status_details),
|
||||
)
|
||||
|
||||
val commandSubmissionDeletes: Table[DbDto.CommandDeduplication] =
|
||||
|
@ -7,13 +7,12 @@ import java.time.Instant
|
||||
|
||||
import anorm.{Row, RowParser, SimpleSql, SqlParser, SqlStringInterpolation, ~}
|
||||
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
|
||||
import com.daml.ledger.api.v1.completion.Completion
|
||||
import com.daml.ledger.offset.Offset
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint
|
||||
import com.daml.platform.store.CompletionFromTransaction
|
||||
import com.daml.platform.store.Conversions._
|
||||
import com.daml.platform.store.dao.events.SqlFunctions
|
||||
import com.google.rpc.status.Status
|
||||
import com.google.rpc.status.{Status => StatusProto}
|
||||
|
||||
private[platform] object CommandCompletionsTable {
|
||||
|
||||
@ -25,19 +24,14 @@ private[platform] object CommandCompletionsTable {
|
||||
private val acceptedCommandParser: RowParser[CompletionStreamResponse] =
|
||||
sharedColumns ~ str("transaction_id") map {
|
||||
case offset ~ recordTime ~ commandId ~ transactionId =>
|
||||
CompletionStreamResponse(
|
||||
checkpoint = toApiCheckpoint(recordTime, offset),
|
||||
completions = Seq(Completion(commandId, Some(Status()), transactionId)),
|
||||
)
|
||||
CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId)
|
||||
}
|
||||
|
||||
private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
|
||||
sharedColumns ~ int("status_code") ~ str("status_message") map {
|
||||
case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage =>
|
||||
CompletionStreamResponse(
|
||||
checkpoint = toApiCheckpoint(recordTime, offset),
|
||||
completions = Seq(Completion(commandId, Some(Status(statusCode, statusMessage)))),
|
||||
)
|
||||
val status = StatusProto.of(statusCode, statusMessage, Seq.empty)
|
||||
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
|
||||
}
|
||||
|
||||
val parser: RowParser[CompletionStreamResponse] = acceptedCommandParser | rejectedCommandParser
|
||||
|
@ -18,6 +18,7 @@ import com.daml.lf.transaction.BlindingInfo
|
||||
import com.daml.lf.transaction.test.TransactionBuilder
|
||||
import com.daml.lf.value.Value
|
||||
import com.daml.logging.LoggingContext
|
||||
import com.daml.platform.index.index.StatusDetails
|
||||
import com.daml.platform.store.appendonlydao.JdbcLedgerDao
|
||||
import com.daml.platform.store.appendonlydao.events.Raw.TreeEvent
|
||||
import com.daml.platform.store.appendonlydao.events.{
|
||||
@ -31,7 +32,7 @@ import com.daml.platform.store.appendonlydao.events.{
|
||||
}
|
||||
import com.daml.platform.store.dao.DeduplicationKeyMaker
|
||||
import com.google.protobuf.ByteString
|
||||
import com.google.rpc.status.{Status => RpcStatus}
|
||||
import com.google.rpc.status.{Status => StatusProto}
|
||||
import io.grpc.Status
|
||||
import org.scalactic.TripleEquals._
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
@ -258,12 +259,11 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
|
||||
optDeduplicationPeriod = None,
|
||||
submissionId = someSubmissionId,
|
||||
)
|
||||
val status = StatusProto.of(Status.Code.ABORTED.value(), "test reason", Seq.empty)
|
||||
val update = state.Update.CommandRejected(
|
||||
someRecordTime,
|
||||
completionInfo,
|
||||
new state.Update.CommandRejected.FinalReason(
|
||||
RpcStatus.of(Status.Code.ABORTED.value(), "test reason", Seq.empty)
|
||||
),
|
||||
new state.Update.CommandRejected.FinalReason(status),
|
||||
)
|
||||
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
|
||||
someOffset
|
||||
@ -277,8 +277,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
|
||||
submitters = Set(someParty),
|
||||
command_id = someCommandId,
|
||||
transaction_id = None,
|
||||
status_code = Some(Status.Code.ABORTED.value()),
|
||||
status_message = Some("test reason"),
|
||||
rejection_status_code = Some(status.code),
|
||||
rejection_status_message = Some(status.message),
|
||||
rejection_status_details = Some(StatusDetails.of(status.details).toByteArray),
|
||||
),
|
||||
DbDto.CommandDeduplication(
|
||||
DeduplicationKeyMaker.make(
|
||||
@ -348,8 +349,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
|
||||
submitters = completionInfo.actAs.toSet,
|
||||
command_id = completionInfo.commandId,
|
||||
transaction_id = Some(update.transactionId),
|
||||
status_code = None,
|
||||
status_message = None,
|
||||
rejection_status_code = None,
|
||||
rejection_status_message = None,
|
||||
rejection_status_details = None,
|
||||
),
|
||||
)
|
||||
}
|
||||
@ -415,8 +417,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
|
||||
submitters = completionInfo.actAs.toSet,
|
||||
command_id = completionInfo.commandId,
|
||||
transaction_id = Some(update.transactionId),
|
||||
status_code = None,
|
||||
status_message = None,
|
||||
rejection_status_code = None,
|
||||
rejection_status_message = None,
|
||||
rejection_status_details = None,
|
||||
),
|
||||
)
|
||||
}
|
||||
@ -494,8 +497,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
|
||||
submitters = completionInfo.actAs.toSet,
|
||||
command_id = completionInfo.commandId,
|
||||
transaction_id = Some(update.transactionId),
|
||||
status_code = None,
|
||||
status_message = None,
|
||||
rejection_status_code = None,
|
||||
rejection_status_message = None,
|
||||
rejection_status_details = None,
|
||||
),
|
||||
)
|
||||
}
|
||||
@ -573,8 +577,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
|
||||
submitters = completionInfo.actAs.toSet,
|
||||
command_id = completionInfo.commandId,
|
||||
transaction_id = Some(update.transactionId),
|
||||
status_code = None,
|
||||
status_message = None,
|
||||
rejection_status_code = None,
|
||||
rejection_status_message = None,
|
||||
rejection_status_details = None,
|
||||
),
|
||||
)
|
||||
}
|
||||
@ -735,8 +740,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
|
||||
submitters = completionInfo.actAs.toSet,
|
||||
command_id = completionInfo.commandId,
|
||||
transaction_id = Some(update.transactionId),
|
||||
status_code = None,
|
||||
status_message = None,
|
||||
rejection_status_code = None,
|
||||
rejection_status_message = None,
|
||||
rejection_status_details = None,
|
||||
),
|
||||
)
|
||||
}
|
||||
@ -831,8 +837,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
|
||||
submitters = completionInfo.actAs.toSet,
|
||||
command_id = completionInfo.commandId,
|
||||
transaction_id = Some(update.transactionId),
|
||||
status_code = None,
|
||||
status_message = None,
|
||||
rejection_status_code = None,
|
||||
rejection_status_message = None,
|
||||
rejection_status_details = None,
|
||||
),
|
||||
)
|
||||
}
|
||||
@ -951,8 +958,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
|
||||
submitters = completionInfo.actAs.toSet,
|
||||
command_id = completionInfo.commandId,
|
||||
transaction_id = Some(update.transactionId),
|
||||
status_code = None,
|
||||
status_message = None,
|
||||
rejection_status_code = None,
|
||||
rejection_status_message = None,
|
||||
rejection_status_details = None,
|
||||
),
|
||||
)
|
||||
}
|
||||
@ -1048,8 +1056,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
|
||||
submitters = completionInfo.actAs.toSet,
|
||||
command_id = completionInfo.commandId,
|
||||
transaction_id = Some(update.transactionId),
|
||||
status_code = None,
|
||||
status_message = None,
|
||||
rejection_status_code = None,
|
||||
rejection_status_message = None,
|
||||
rejection_status_details = None,
|
||||
),
|
||||
)
|
||||
}
|
||||
@ -1122,8 +1131,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
|
||||
submitters = completionInfo.actAs.toSet,
|
||||
command_id = completionInfo.commandId,
|
||||
transaction_id = Some(update.transactionId),
|
||||
status_code = None,
|
||||
status_message = None,
|
||||
rejection_status_code = None,
|
||||
rejection_status_message = None,
|
||||
rejection_status_details = None,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
@ -56,6 +56,7 @@ import com.daml.platform.sandbox.stores.ledger.inmemory.InMemoryLedger._
|
||||
import com.daml.platform.sandbox.stores.ledger.{Ledger, Rejection}
|
||||
import com.daml.platform.store.CompletionFromTransaction
|
||||
import com.daml.platform.store.Contract.ActiveContract
|
||||
import com.daml.platform.store.Conversions.RejectionReasonOps
|
||||
import com.daml.platform.store.entries.{
|
||||
ConfigurationEntry,
|
||||
LedgerEntry,
|
||||
@ -166,13 +167,50 @@ private[sandbox] final class InMemoryLedger(
|
||||
endInclusive: Option[Offset],
|
||||
applicationId: ApplicationId,
|
||||
parties: Set[Ref.Party],
|
||||
)(implicit loggingContext: LoggingContext): Source[(Offset, CompletionStreamResponse), NotUsed] =
|
||||
entries
|
||||
.getSource(startExclusive, endInclusive)
|
||||
.collect { case (offset, InMemoryLedgerEntry(entry)) =>
|
||||
(offset, entry)
|
||||
}
|
||||
.collect(CompletionFromTransaction(applicationId.unwrap, parties))
|
||||
)(implicit
|
||||
loggingContext: LoggingContext
|
||||
): Source[(Offset, CompletionStreamResponse), NotUsed] = {
|
||||
val appId = applicationId.unwrap
|
||||
entries.getSource(startExclusive, endInclusive).collect {
|
||||
case (
|
||||
offset,
|
||||
InMemoryLedgerEntry(
|
||||
LedgerEntry.Transaction(
|
||||
Some(commandId),
|
||||
transactionId,
|
||||
Some(`appId`),
|
||||
_,
|
||||
actAs,
|
||||
_,
|
||||
_,
|
||||
recordTime,
|
||||
_,
|
||||
_,
|
||||
)
|
||||
),
|
||||
) if actAs.exists(parties) =>
|
||||
offset -> CompletionFromTransaction.acceptedCompletion(
|
||||
recordTime,
|
||||
offset,
|
||||
commandId,
|
||||
transactionId,
|
||||
)
|
||||
|
||||
case (
|
||||
offset,
|
||||
InMemoryLedgerEntry(
|
||||
LedgerEntry.Rejection(recordTime, commandId, `appId`, _, actAs, reason)
|
||||
),
|
||||
) if actAs.exists(parties) =>
|
||||
val status = reason.toParticipantStateRejectionReason.status
|
||||
offset -> CompletionFromTransaction.rejectedCompletion(
|
||||
recordTime,
|
||||
offset,
|
||||
commandId,
|
||||
status,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override def ledgerEnd()(implicit loggingContext: LoggingContext): Offset = entries.ledgerEnd
|
||||
|
||||
|
@ -155,6 +155,8 @@
|
||||
type: jar-scala
|
||||
- target: //ledger/participant-integration-api:participant-integration-api
|
||||
type: jar-scala
|
||||
- target: //ledger/participant-integration-api:participant-integration-api-proto_scala
|
||||
type: jar-scala
|
||||
- target: //ledger/participant-state:participant-state
|
||||
type: jar-scala
|
||||
- target: //ledger/participant-state/kvutils:daml_kvutils_proto_jar
|
||||
|
Loading…
Reference in New Issue
Block a user