mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-19 16:57:40 +03:00
Extract the envelope from daml_kvutils.proto [KVL-1090] (#11192)
CHANGELOG_BEGIN CHANGELOG_END
This commit is contained in:
parent
eb834907d7
commit
d045ffaf1f
@ -24,35 +24,6 @@ import "com/daml/lf/transaction.proto";
|
||||
import "com/daml/ledger/configuration/ledger_configuration.proto";
|
||||
import "com/daml/ledger/participant/state/kvutils/store/state.proto";
|
||||
import "com/daml/ledger/participant/state/kvutils/store/events/configuration.proto";
|
||||
// Envelope with which we wrap all kvutils messages that are sent over the network
|
||||
// or persisted on disk. The envelope specifies the kvutils version that defines how
|
||||
// a message is decoded and processed. Optionally the message payload may be stored
|
||||
// compressed.
|
||||
message Envelope {
|
||||
enum MessageKind {
|
||||
SUBMISSION = 0;
|
||||
LOG_ENTRY = 1;
|
||||
STATE_VALUE = 2;
|
||||
SUBMISSION_BATCH = 3;
|
||||
}
|
||||
|
||||
enum CompressionSchema {
|
||||
NONE = 0;
|
||||
GZIP = 1;
|
||||
}
|
||||
|
||||
// Kvutils version number
|
||||
int64 version = 1;
|
||||
|
||||
// Kind of message contained within.
|
||||
MessageKind kind = 2;
|
||||
|
||||
// Compression schema, if any, used to compress the message.
|
||||
CompressionSchema compression = 3;
|
||||
|
||||
// The enclosed, potentially compressed, message
|
||||
bytes message = 4;
|
||||
}
|
||||
|
||||
// A log entry for a committed Daml submission.
|
||||
// Produced by [[KeyValueCommitting]] from the `DamlSubmission` message.
|
||||
|
@ -0,0 +1,48 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
//
|
||||
// Protocol buffer messages used by the participant state key-value utilities
|
||||
// for implementing a Daml ledger backed by a key-value store.
|
||||
//
|
||||
// These messages should only be produced and consumed by the methods in
|
||||
// `KeyValueCommitting`, `KeyValueConsumption` and `KeyValueSubmission` objects.
|
||||
//
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package com.daml.ledger.participant.state.kvutils.envelope;
|
||||
|
||||
option java_package = "com.daml.ledger.participant.state.kvutils.envelope";
|
||||
option java_multiple_files = true;
|
||||
option csharp_namespace = "Com.Daml.Ledger.Participant.State.KVUtils.Envelope";
|
||||
|
||||
// Envelope with which we wrap all kvutils messages that are sent over the network
|
||||
// or persisted on disk. The envelope specifies the kvutils version that defines how
|
||||
// a message is decoded and processed. Optionally the message payload may be stored
|
||||
// compressed.
|
||||
message Envelope {
|
||||
enum MessageKind {
|
||||
SUBMISSION = 0;
|
||||
LOG_ENTRY = 1;
|
||||
STATE_VALUE = 2;
|
||||
SUBMISSION_BATCH = 3;
|
||||
}
|
||||
|
||||
enum CompressionSchema {
|
||||
NONE = 0;
|
||||
GZIP = 1;
|
||||
}
|
||||
|
||||
// Kvutils version number
|
||||
int64 version = 1;
|
||||
|
||||
// Kind of message contained within.
|
||||
MessageKind kind = 2;
|
||||
|
||||
// Compression schema, if any, used to compress the message.
|
||||
CompressionSchema compression = 3;
|
||||
|
||||
// The enclosed, potentially compressed, message
|
||||
bytes message = 4;
|
||||
}
|
@ -6,7 +6,7 @@ package com.daml.ledger.participant.state.kvutils
|
||||
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
|
||||
|
||||
import com.daml.ledger.participant.state.kvutils.store.DamlStateValue
|
||||
import com.daml.ledger.participant.state.kvutils.{DamlKvutils => Proto}
|
||||
import com.daml.ledger.participant.state.kvutils.{DamlKvutils => Proto, envelope => proto}
|
||||
import com.google.protobuf.ByteString
|
||||
|
||||
import scala.util.Try
|
||||
@ -30,20 +30,20 @@ object Envelope {
|
||||
private val DefaultCompression = true
|
||||
|
||||
private def enclose(
|
||||
kind: Proto.Envelope.MessageKind,
|
||||
kind: proto.Envelope.MessageKind,
|
||||
bytes: ByteString,
|
||||
compression: Boolean,
|
||||
): Raw.Envelope =
|
||||
Raw.Envelope(
|
||||
Proto.Envelope.newBuilder
|
||||
proto.Envelope.newBuilder
|
||||
.setVersion(Version.version)
|
||||
.setKind(kind)
|
||||
.setMessage(if (compression) compress(bytes) else bytes)
|
||||
.setCompression(
|
||||
if (compression)
|
||||
Proto.Envelope.CompressionSchema.GZIP
|
||||
proto.Envelope.CompressionSchema.GZIP
|
||||
else
|
||||
Proto.Envelope.CompressionSchema.NONE
|
||||
proto.Envelope.CompressionSchema.NONE
|
||||
)
|
||||
.build
|
||||
)
|
||||
@ -52,60 +52,60 @@ object Envelope {
|
||||
enclose(sub, compression = DefaultCompression)
|
||||
|
||||
def enclose(sub: wire.DamlSubmission, compression: Boolean): Raw.Envelope =
|
||||
enclose(Proto.Envelope.MessageKind.SUBMISSION, sub.toByteString, compression)
|
||||
enclose(proto.Envelope.MessageKind.SUBMISSION, sub.toByteString, compression)
|
||||
|
||||
def enclose(logEntry: Proto.DamlLogEntry): Raw.Envelope =
|
||||
enclose(logEntry, compression = DefaultCompression)
|
||||
|
||||
def enclose(logEntry: Proto.DamlLogEntry, compression: Boolean): Raw.Envelope =
|
||||
enclose(Proto.Envelope.MessageKind.LOG_ENTRY, logEntry.toByteString, compression)
|
||||
enclose(proto.Envelope.MessageKind.LOG_ENTRY, logEntry.toByteString, compression)
|
||||
|
||||
def enclose(stateValue: DamlStateValue): Raw.Envelope =
|
||||
enclose(stateValue, compression = DefaultCompression)
|
||||
|
||||
def enclose(stateValue: DamlStateValue, compression: Boolean): Raw.Envelope =
|
||||
enclose(Proto.Envelope.MessageKind.STATE_VALUE, stateValue.toByteString, compression)
|
||||
enclose(proto.Envelope.MessageKind.STATE_VALUE, stateValue.toByteString, compression)
|
||||
|
||||
def enclose(batch: wire.DamlSubmissionBatch): Raw.Envelope =
|
||||
enclose(Proto.Envelope.MessageKind.SUBMISSION_BATCH, batch.toByteString, compression = false)
|
||||
enclose(proto.Envelope.MessageKind.SUBMISSION_BATCH, batch.toByteString, compression = false)
|
||||
|
||||
def open(envelopeBytes: Raw.Envelope): Either[String, Message] =
|
||||
openWithParser(() => Proto.Envelope.parseFrom(envelopeBytes.bytes))
|
||||
openWithParser(() => proto.Envelope.parseFrom(envelopeBytes.bytes))
|
||||
|
||||
def open(envelopeBytes: Array[Byte]): Either[String, Message] =
|
||||
openWithParser(() => Proto.Envelope.parseFrom(envelopeBytes))
|
||||
openWithParser(() => proto.Envelope.parseFrom(envelopeBytes))
|
||||
|
||||
private def openWithParser(parseEnvelope: () => Proto.Envelope): Either[String, Message] =
|
||||
private def openWithParser(parseEnvelope: () => proto.Envelope): Either[String, Message] =
|
||||
for {
|
||||
envelope <- Try(parseEnvelope()).toEither.left.map(_.getMessage)
|
||||
parsedEnvelope <- Try(parseEnvelope()).toEither.left.map(_.getMessage)
|
||||
_ <- Either.cond(
|
||||
envelope.getVersion == Version.version,
|
||||
parsedEnvelope.getVersion == Version.version,
|
||||
(),
|
||||
s"Unsupported version ${envelope.getVersion}",
|
||||
s"Unsupported version ${parsedEnvelope.getVersion}",
|
||||
)
|
||||
uncompressedMessage <- envelope.getCompression match {
|
||||
case Proto.Envelope.CompressionSchema.GZIP =>
|
||||
parseMessageSafe(() => decompress(envelope.getMessage))
|
||||
case Proto.Envelope.CompressionSchema.NONE =>
|
||||
Right(envelope.getMessage)
|
||||
case Proto.Envelope.CompressionSchema.UNRECOGNIZED =>
|
||||
Left(s"Unrecognized compression schema: ${envelope.getCompressionValue}")
|
||||
uncompressedMessage <- parsedEnvelope.getCompression match {
|
||||
case proto.Envelope.CompressionSchema.GZIP =>
|
||||
parseMessageSafe(() => decompress(parsedEnvelope.getMessage))
|
||||
case proto.Envelope.CompressionSchema.NONE =>
|
||||
Right(parsedEnvelope.getMessage)
|
||||
case proto.Envelope.CompressionSchema.UNRECOGNIZED =>
|
||||
Left(s"Unrecognized compression schema: ${parsedEnvelope.getCompressionValue}")
|
||||
}
|
||||
message <- envelope.getKind match {
|
||||
case Proto.Envelope.MessageKind.LOG_ENTRY =>
|
||||
message <- parsedEnvelope.getKind match {
|
||||
case proto.Envelope.MessageKind.LOG_ENTRY =>
|
||||
parseMessageSafe(() => Proto.DamlLogEntry.parseFrom(uncompressedMessage))
|
||||
.map(LogEntryMessage)
|
||||
case Proto.Envelope.MessageKind.SUBMISSION =>
|
||||
case proto.Envelope.MessageKind.SUBMISSION =>
|
||||
parseMessageSafe(() => wire.DamlSubmission.parseFrom(uncompressedMessage))
|
||||
.map(SubmissionMessage)
|
||||
case Proto.Envelope.MessageKind.STATE_VALUE =>
|
||||
case proto.Envelope.MessageKind.STATE_VALUE =>
|
||||
parseMessageSafe(() => DamlStateValue.parseFrom(uncompressedMessage))
|
||||
.map(StateValueMessage)
|
||||
case Proto.Envelope.MessageKind.SUBMISSION_BATCH =>
|
||||
case proto.Envelope.MessageKind.SUBMISSION_BATCH =>
|
||||
parseMessageSafe(() => wire.DamlSubmissionBatch.parseFrom(uncompressedMessage))
|
||||
.map(SubmissionBatchMessage)
|
||||
case Proto.Envelope.MessageKind.UNRECOGNIZED =>
|
||||
Left(s"Unrecognized message kind: ${envelope.getKind}")
|
||||
case proto.Envelope.MessageKind.UNRECOGNIZED =>
|
||||
Left(s"Unrecognized message kind: ${parsedEnvelope.getKind}")
|
||||
}
|
||||
} yield message
|
||||
|
||||
|
@ -59,8 +59,8 @@ object Raw {
|
||||
final case class Envelope(override val bytes: ByteString) extends Value
|
||||
|
||||
object Envelope extends Companion[Envelope] {
|
||||
def apply(envelope: DamlKvutils.Envelope): Envelope =
|
||||
apply(envelope.toByteString)
|
||||
def apply(protoEnvelope: envelope.Envelope): Envelope =
|
||||
apply(protoEnvelope.toByteString)
|
||||
}
|
||||
|
||||
type LogEntry = (LogEntryId, Envelope)
|
||||
|
@ -9,15 +9,15 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
|
||||
DamlLogEntryId,
|
||||
DamlPartyAllocationEntry,
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.store.events.DamlConfigurationEntry
|
||||
import com.daml.ledger.participant.state.kvutils.store.{
|
||||
DamlPartyAllocation,
|
||||
DamlStateKey,
|
||||
DamlStateValue,
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.store.events.DamlConfigurationEntry
|
||||
import com.daml.ledger.participant.state.kvutils.tools.integritycheck.RawWriteSetComparisonSpec._
|
||||
import com.daml.ledger.participant.state.kvutils.tools.integritycheck.WriteSetComparison.rawHexString
|
||||
import com.daml.ledger.participant.state.kvutils.{DamlKvutils, Envelope, Raw, Version}
|
||||
import com.daml.ledger.participant.state.kvutils.{Envelope, Raw, Version, envelope => protoEnvelope}
|
||||
import com.daml.ledger.validator.StateKeySerializationStrategy
|
||||
import com.google.protobuf.{ByteString, Empty}
|
||||
import org.scalatest.Inside
|
||||
@ -55,7 +55,7 @@ final class RawWriteSetComparisonSpec extends AsyncWordSpec with Matchers with I
|
||||
|
||||
"fail on an unknown entry" in {
|
||||
val envelope = Raw.Envelope(
|
||||
DamlKvutils.Envelope.newBuilder
|
||||
protoEnvelope.Envelope.newBuilder
|
||||
.setVersion(Version.version)
|
||||
.build
|
||||
)
|
||||
@ -66,9 +66,9 @@ final class RawWriteSetComparisonSpec extends AsyncWordSpec with Matchers with I
|
||||
|
||||
"fail on a submission entry" in {
|
||||
val envelope = Raw.Envelope(
|
||||
DamlKvutils.Envelope.newBuilder
|
||||
protoEnvelope.Envelope.newBuilder
|
||||
.setVersion(Version.version)
|
||||
.setKind(DamlKvutils.Envelope.MessageKind.SUBMISSION)
|
||||
.setKind(protoEnvelope.Envelope.MessageKind.SUBMISSION)
|
||||
.build
|
||||
)
|
||||
inside(writeSetComparison.checkEntryIsReadable(noKey, envelope)) { case Left(message) =>
|
||||
@ -78,9 +78,9 @@ final class RawWriteSetComparisonSpec extends AsyncWordSpec with Matchers with I
|
||||
|
||||
"fail on a submission batch entry" in {
|
||||
val envelope = Raw.Envelope(
|
||||
DamlKvutils.Envelope.newBuilder
|
||||
protoEnvelope.Envelope.newBuilder
|
||||
.setVersion(Version.version)
|
||||
.setKind(DamlKvutils.Envelope.MessageKind.SUBMISSION_BATCH)
|
||||
.setKind(protoEnvelope.Envelope.MessageKind.SUBMISSION_BATCH)
|
||||
.build
|
||||
)
|
||||
inside(writeSetComparison.checkEntryIsReadable(noKey, envelope)) { case Left(message) =>
|
||||
|
Loading…
Reference in New Issue
Block a user