update canton to 20231201.11605.0.vdef89654/2.8.0-snapshot.20231201.11605.0.vdef89654 (#17960)

CHANGELOG_BEGIN
CHANGELOG_END

Co-authored-by: Azure Pipelines Daml Build <support@digitalasset.com>
This commit is contained in:
azure-pipelines[bot] 2023-12-04 10:53:00 +01:00 committed by GitHub
parent e22f8bebd9
commit d054187b5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
162 changed files with 3097 additions and 742 deletions

View File

@ -91,10 +91,10 @@ object DomainAdminCommands {
staticDomainParametersInternal <- StaticDomainParametersInternal.fromProtoV0(
parametersV0
)
sraticDomainParametersConfig <- StaticDomainParametersConfig(
staticDomainParametersConfig <- StaticDomainParametersConfig(
staticDomainParametersInternal
)
} yield sraticDomainParametersConfig).leftMap(_.toString)
} yield staticDomainParametersConfig).leftMap(_.toString)
case Parameters.ParametersV1(parametersV1) =>
(for {
@ -105,6 +105,16 @@ object DomainAdminCommands {
staticDomainParametersInternal
)
} yield staticDomainParametersConfig).leftMap(_.toString)
case Parameters.ParametersV2(parametersV2) =>
(for {
staticDomainParametersInternal <- StaticDomainParametersInternal.fromProtoV2(
parametersV2
)
staticDomainParametersConfig <- StaticDomainParametersConfig(
staticDomainParametersInternal
)
} yield staticDomainParametersConfig).leftMap(_.toString)
}
}
}

View File

@ -217,7 +217,9 @@ object EnterpriseSequencerAdminCommands {
case v0.Snapshot.Response.Value.Success(v0.Snapshot.Success(Some(result))) =>
SequencerSnapshot.fromProtoV0(result).leftMap(_.toString)
case v0.Snapshot.Response.Value.VersionedSuccess(v0.Snapshot.VersionedSuccess(snapshot)) =>
SequencerSnapshot.fromByteString(snapshot).leftMap(_.toString)
SequencerSnapshot
.fromByteStringUnsafe(snapshot)
.leftMap(_.toString)
case _ => Left("response is empty")
}

View File

@ -11,6 +11,7 @@ import com.digitalasset.canton.config.{NonNegativeFiniteDuration, PositiveDurati
import com.digitalasset.canton.protocol.DomainParameters.MaxRequestSize
import com.digitalasset.canton.protocol.DynamicDomainParameters.InvalidDynamicDomainParameters
import com.digitalasset.canton.protocol.{
CatchUpConfig,
DynamicDomainParameters as DynamicDomainParametersInternal,
StaticDomainParameters as StaticDomainParametersInternal,
v0 as protocolV0,
@ -48,6 +49,7 @@ sealed trait StaticDomainParameters {
maxRatePerParticipant: NonNegativeInt,
reconciliationInterval: PositiveDurationSeconds,
maxRequestSize: MaxRequestSize,
catchUpParameters: Option[CatchUpConfig],
): StaticDomainParametersInternal =
StaticDomainParametersInternal.create(
maxRatePerParticipant = maxRatePerParticipant,
@ -70,6 +72,7 @@ sealed trait StaticDomainParameters {
),
protocolVersion = protocolVersion,
reconciliationInterval = reconciliationInterval.toInternal,
catchUpParameters = catchUpParameters,
)
}
@ -86,7 +89,12 @@ sealed abstract case class StaticDomainParametersV0(
protocolVersion: ProtocolVersion,
) extends StaticDomainParameters {
override private[canton] def toInternal: StaticDomainParametersInternal =
toInternal(maxRatePerParticipant, reconciliationInterval, MaxRequestSize(maxInboundMessageSize))
toInternal(
maxRatePerParticipant,
reconciliationInterval,
MaxRequestSize(maxInboundMessageSize),
None,
)
}
sealed abstract case class StaticDomainParametersV1(
@ -102,6 +110,25 @@ sealed abstract case class StaticDomainParametersV1(
StaticDomainParametersInternal.defaultMaxRatePerParticipant,
StaticDomainParametersInternal.defaultReconciliationInterval.toConfig,
StaticDomainParametersInternal.defaultMaxRequestSize,
None,
)
}
sealed abstract case class StaticDomainParametersV2(
uniqueContractKeys: Boolean,
requiredSigningKeySchemes: Set[SigningKeyScheme],
requiredEncryptionKeySchemes: Set[EncryptionKeyScheme],
requiredSymmetricKeySchemes: Set[SymmetricKeyScheme],
requiredHashAlgorithms: Set[HashAlgorithm],
requiredCryptoKeyFormats: Set[CryptoKeyFormat],
protocolVersion: ProtocolVersion,
catchUpParameters: Option[CatchUpConfig],
) extends StaticDomainParameters {
override private[canton] def toInternal: StaticDomainParametersInternal = toInternal(
StaticDomainParametersInternal.defaultMaxRatePerParticipant,
StaticDomainParametersInternal.defaultReconciliationInterval.toConfig,
StaticDomainParametersInternal.defaultMaxRequestSize,
catchUpParameters,
)
}
@ -150,13 +177,31 @@ object StaticDomainParameters {
protocolVersion = domain.protocolVersion,
) {}
)
else if (protoVersion == 2)
Right(
new StaticDomainParametersV2(
uniqueContractKeys = domain.uniqueContractKeys,
requiredSigningKeySchemes =
domain.requiredSigningKeySchemes.forgetNE.map(_.transformInto[SigningKeyScheme]),
requiredEncryptionKeySchemes =
domain.requiredEncryptionKeySchemes.forgetNE.map(_.transformInto[EncryptionKeyScheme]),
requiredSymmetricKeySchemes =
domain.requiredSymmetricKeySchemes.forgetNE.map(_.transformInto[SymmetricKeyScheme]),
requiredHashAlgorithms =
domain.requiredHashAlgorithms.forgetNE.map(_.transformInto[HashAlgorithm]),
requiredCryptoKeyFormats =
domain.requiredCryptoKeyFormats.forgetNE.map(_.transformInto[CryptoKeyFormat]),
protocolVersion = domain.protocolVersion,
catchUpParameters = domain.catchUpParameters,
) {}
)
else
Left(ProtoDeserializationError.VersionError("StaticDomainParameters", protoVersion))
}
def tryReadFromFile(inputFile: String): StaticDomainParameters = {
val staticDomainParametersInternal = StaticDomainParametersInternal
.readFromFile(inputFile)
.readFromFileUnsafe(inputFile)
.valueOr(err =>
throw new IllegalArgumentException(
s"Reading static domain parameters from file $inputFile failed: $err"

View File

@ -176,7 +176,9 @@ object ListSignedLegalIdentityClaimResult {
context <- BaseResult.fromProtoV0(contextProto)
itemProto <- ProtoConverter.required("item", value.item)
item <- SignedLegalIdentityClaim.fromProtoV0(itemProto)
claim <- LegalIdentityClaim.fromByteString(item.claim)
claim <- LegalIdentityClaim.fromByteStringUnsafe(
item.claim
)
} yield ListSignedLegalIdentityClaimResult(context, claim)
}
}

View File

@ -60,6 +60,7 @@ import com.digitalasset.canton.platform.apiserver.SeedService.Seeding
import com.digitalasset.canton.platform.apiserver.configuration.RateLimitingConfig
import com.digitalasset.canton.platform.config.ActiveContractsServiceStreamsConfig
import com.digitalasset.canton.platform.indexer.PackageMetadataViewConfig
import com.digitalasset.canton.protocol.CatchUpConfig
import com.digitalasset.canton.protocol.DomainParameters.MaxRequestSize
import com.digitalasset.canton.pureconfigutils.HttpServerConfig
import com.digitalasset.canton.pureconfigutils.SharedConfigReaders.catchConvertError
@ -831,6 +832,9 @@ object CantonConfig {
deriveReader[AuthServiceConfig]
lazy implicit val rateLimitConfigReader: ConfigReader[RateLimitingConfig] =
deriveReader[RateLimitingConfig]
lazy implicit val enableEventsByContractKeyCacheReader
: ConfigReader[EnableEventsByContractKeyCache] =
deriveReader[EnableEventsByContractKeyCache]
lazy implicit val ledgerApiServerConfigReader: ConfigReader[LedgerApiServerConfig] =
deriveReader[LedgerApiServerConfig].applyDeprecations
@ -901,6 +905,8 @@ object CantonConfig {
deriveReader[CommunitySequencerConfig]
lazy implicit val domainParametersConfigReader: ConfigReader[DomainParametersConfig] =
deriveReader[DomainParametersConfig]
lazy implicit val catchUpParametersConfigReader: ConfigReader[CatchUpConfig] =
deriveReader[CatchUpConfig]
lazy implicit val domainNodeParametersConfigReader: ConfigReader[DomainNodeParametersConfig] =
deriveReader[DomainNodeParametersConfig]
lazy implicit val deadlockDetectionConfigReader: ConfigReader[DeadlockDetectionConfig] =
@ -1210,6 +1216,9 @@ object CantonConfig {
deriveWriter[AuthServiceConfig]
lazy implicit val rateLimitConfigWriter: ConfigWriter[RateLimitingConfig] =
deriveWriter[RateLimitingConfig]
lazy implicit val enableEventsByContractKeyCacheWriter
: ConfigWriter[EnableEventsByContractKeyCache] =
deriveWriter[EnableEventsByContractKeyCache]
lazy implicit val ledgerApiServerConfigWriter: ConfigWriter[LedgerApiServerConfig] =
deriveWriter[LedgerApiServerConfig]
@ -1274,6 +1283,8 @@ object CantonConfig {
deriveWriter[CommunitySequencerConfig]
lazy implicit val domainParametersConfigWriter: ConfigWriter[DomainParametersConfig] =
deriveWriter[DomainParametersConfig]
lazy implicit val catchUpParametersConfigWriter: ConfigWriter[CatchUpConfig] =
deriveWriter[CatchUpConfig]
lazy implicit val domainNodeParametersConfigWriter: ConfigWriter[DomainNodeParametersConfig] =
deriveWriter[DomainNodeParametersConfig]
lazy implicit val deadlockDetectionConfigWriter: ConfigWriter[DeadlockDetectionConfig] =

View File

@ -256,8 +256,6 @@ object CommunityConfigValidations
val pv = config.init.domainParameters.protocolVersion.unwrap
if (pv.isDeprecated && !config.init.domainParameters.dontWarnOnDeprecatedPV)
DeprecatedProtocolVersion.WarnDomain(name, pv).discard
logger.info(s"Domain $name is using protocol version $pv")
}
Validated.valid(())
}

View File

@ -17,6 +17,7 @@ import com.digitalasset.canton.admin.api.client.data.{
StaticDomainParameters,
StaticDomainParametersV0,
StaticDomainParametersV1,
StaticDomainParametersV2,
}
import com.digitalasset.canton.config.RequireTypes.NonNegativeInt
import com.digitalasset.canton.config.{
@ -186,6 +187,11 @@ trait DomainAdministration {
throw new IllegalStateException(
s"Error when trying to get $operation: versions of static and dynamic domains parameters should be consistent but got 1 and 0 respectively"
)
case _: StaticDomainParametersV2 =>
throw new IllegalStateException(
s"Error when trying to get $operation: versions of static and dynamic domains parameters should be consistent but got 2 and 0 respectively"
)
}
case dynamicV1: DynamicDomainParametersV1 => fromDynamicV1(dynamicV1)

View File

@ -117,7 +117,7 @@ abstract class CantonAppDriver[E <: Environment] extends App with NamedLogging w
}
}
}))
logger.info("Registered shutdown-hook.")
logger.debug("Registered shutdown-hook.")
val cantonConfig: E#Config = {
val mergedUserConfigsE = NonEmpty.from(cliOptions.configFiles) match {

View File

@ -8,6 +8,7 @@ package com.digitalasset.canton.domain.api.v0;
import "com/digitalasset/canton/domain/api/v0/service_agreement.proto";
import "com/digitalasset/canton/protocol/v0/sequencing.proto";
import "com/digitalasset/canton/protocol/v1/sequencing.proto";
import "com/digitalasset/canton/protocol/v2/domain_params.proto";
service SequencerConnectService {
rpc Handshake(com.digitalasset.canton.protocol.v0.Handshake.Request) returns (com.digitalasset.canton.protocol.v0.Handshake.Response);
@ -36,6 +37,7 @@ message SequencerConnect {
oneof parameters {
com.digitalasset.canton.protocol.v0.StaticDomainParameters parameters_v0 = 1;
com.digitalasset.canton.protocol.v1.StaticDomainParameters parameters_v1 = 2;
com.digitalasset.canton.protocol.v2.StaticDomainParameters parameters_v2 = 3;
}
}
}

View File

@ -10,6 +10,25 @@ import "com/digitalasset/canton/protocol/v0/traffic_control_parameters.proto";
import "google/protobuf/duration.proto";
import "scalapb/scalapb.proto";
// catch-up configuration parameters
message CatchUpConfig {
uint32 catchup_interval_skip = 1;
uint32 nr_intervals_to_trigger_catchup = 2;
}
message StaticDomainParameters {
option (scalapb.message).companion_extends = "com.digitalasset.canton.version.UnstableProtoVersion";
bool unique_contract_keys = 1;
repeated com.digitalasset.canton.crypto.v0.SigningKeyScheme required_signing_key_schemes = 2;
repeated com.digitalasset.canton.crypto.v0.EncryptionKeyScheme required_encryption_key_schemes = 3;
repeated com.digitalasset.canton.crypto.v0.SymmetricKeyScheme required_symmetric_key_schemes = 4;
repeated com.digitalasset.canton.crypto.v0.HashAlgorithm required_hash_algorithms = 5;
repeated com.digitalasset.canton.crypto.v0.CryptoKeyFormat required_crypto_key_formats = 6;
int32 protocol_version = 7;
CatchUpConfig catch_up_parameters = 8;
}
message StaticDomainParametersX {
repeated com.digitalasset.canton.crypto.v0.SigningKeyScheme required_signing_key_schemes = 1;
repeated com.digitalasset.canton.crypto.v0.EncryptionKeyScheme required_encryption_key_schemes = 2;
@ -17,6 +36,7 @@ message StaticDomainParametersX {
repeated com.digitalasset.canton.crypto.v0.HashAlgorithm required_hash_algorithms = 4;
repeated com.digitalasset.canton.crypto.v0.CryptoKeyFormat required_crypto_key_formats = 5;
int32 protocol_version = 7;
CatchUpConfig catch_up_parameters = 8;
}
// individual per participant limits

View File

@ -4,6 +4,7 @@
package com.digitalasset.canton.config
import cats.syntax.option.*
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.serialization.ProtoConverter
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.time.admin.v0
@ -26,20 +27,50 @@ import com.digitalasset.canton.time.admin.v0
* @param timeRequest configuration for how we ask for a time proof.
*/
final case class DomainTimeTrackerConfig(
observationLatency: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofMillis(250),
patienceDuration: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofMillis(500),
minObservationDuration: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofHours(24),
observationLatency: NonNegativeFiniteDuration =
DomainTimeTrackerConfig.defaultObservationLatency,
patienceDuration: NonNegativeFiniteDuration = DomainTimeTrackerConfig.defaultPatienceDuration,
minObservationDuration: NonNegativeFiniteDuration =
DomainTimeTrackerConfig.defaultMinObservationDuration,
timeRequest: TimeProofRequestConfig = TimeProofRequestConfig(),
) {
) extends PrettyPrinting {
def toProtoV0: v0.DomainTimeTrackerConfig = v0.DomainTimeTrackerConfig(
observationLatency.toProtoPrimitive.some,
patienceDuration.toProtoPrimitive.some,
minObservationDuration.toProtoPrimitive.some,
timeRequest.toProtoV0.some,
)
override def pretty: Pretty[DomainTimeTrackerConfig] = prettyOfClass(
paramIfNotDefault(
"observationLatency",
_.observationLatency,
DomainTimeTrackerConfig.defaultObservationLatency,
),
paramIfNotDefault(
"patienceDuration",
_.patienceDuration,
DomainTimeTrackerConfig.defaultPatienceDuration,
),
paramIfNotDefault(
"minObservationDuration",
_.minObservationDuration,
DomainTimeTrackerConfig.defaultMinObservationDuration,
),
paramIfNotDefault("timeRequest", _.timeRequest, TimeProofRequestConfig()),
)
}
object DomainTimeTrackerConfig {
private val defaultObservationLatency: NonNegativeFiniteDuration =
NonNegativeFiniteDuration.ofMillis(250)
private val defaultPatienceDuration: NonNegativeFiniteDuration =
NonNegativeFiniteDuration.ofMillis(500)
private val defaultMinObservationDuration: NonNegativeFiniteDuration =
NonNegativeFiniteDuration.ofHours(24)
def fromProto(
configP: v0.DomainTimeTrackerConfig
): ParsingResult[DomainTimeTrackerConfig] =

View File

@ -4,6 +4,7 @@
package com.digitalasset.canton.config
import cats.syntax.option.*
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.serialization.ProtoConverter
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.time.admin.v0
@ -14,18 +15,46 @@ import com.digitalasset.canton.time.admin.v0
* to observe it from the sequencer before starting a new request.
*/
final case class TimeProofRequestConfig(
initialRetryDelay: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofMillis(200),
maxRetryDelay: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofSeconds(5),
maxSequencingDelay: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofSeconds(10),
) {
initialRetryDelay: NonNegativeFiniteDuration = TimeProofRequestConfig.defaultInitialRetryDelay,
maxRetryDelay: NonNegativeFiniteDuration = TimeProofRequestConfig.defaultMaxRetryDelay,
maxSequencingDelay: NonNegativeFiniteDuration = TimeProofRequestConfig.defaultMaxSequencingDelay,
) extends PrettyPrinting {
private[config] def toProtoV0: v0.TimeProofRequestConfig = v0.TimeProofRequestConfig(
initialRetryDelay.toProtoPrimitive.some,
maxRetryDelay.toProtoPrimitive.some,
maxSequencingDelay.toProtoPrimitive.some,
)
override def pretty: Pretty[TimeProofRequestConfig] = prettyOfClass(
paramIfNotDefault(
"initialRetryDelay",
_.initialRetryDelay,
TimeProofRequestConfig.defaultInitialRetryDelay,
),
paramIfNotDefault(
"maxRetryDelay",
_.maxRetryDelay,
TimeProofRequestConfig.defaultMaxRetryDelay,
),
paramIfNotDefault(
"maxSequencingDelay",
_.maxSequencingDelay,
TimeProofRequestConfig.defaultMaxSequencingDelay,
),
)
}
object TimeProofRequestConfig {
private val defaultInitialRetryDelay: NonNegativeFiniteDuration =
NonNegativeFiniteDuration.ofMillis(200)
private val defaultMaxRetryDelay: NonNegativeFiniteDuration =
NonNegativeFiniteDuration.ofSeconds(5)
private val defaultMaxSequencingDelay: NonNegativeFiniteDuration =
NonNegativeFiniteDuration.ofSeconds(10)
private[config] def fromProtoV0(
configP: v0.TimeProofRequestConfig
): ParsingResult[TimeProofRequestConfig] =

View File

@ -301,6 +301,7 @@ object GenTransactionTree {
val rootViewsUnsafe: Lens[GenTransactionTree, MerkleSeq[TransactionView]] =
GenLens[GenTransactionTree](_.rootViews)
// TODO(#12626) try with context
def fromProtoV0(
hashOps: HashOps,
protoTransactionTree: v0.GenTransactionTree,
@ -309,12 +310,12 @@ object GenTransactionTree {
submitterMetadata <- MerkleTree
.fromProtoOptionV0(
protoTransactionTree.submitterMetadata,
SubmitterMetadata.fromByteString(hashOps),
SubmitterMetadata.fromByteStringUnsafe(hashOps),
)
commonMetadata <- MerkleTree
.fromProtoOptionV0(
protoTransactionTree.commonMetadata,
CommonMetadata.fromByteString(hashOps),
CommonMetadata.fromByteStringUnsafe(hashOps),
)
commonMetadataUnblinded <- commonMetadata.unwrap.leftMap(_ =>
InvariantViolation("GenTransactionTree.commonMetadata is blinded")
@ -322,14 +323,14 @@ object GenTransactionTree {
participantMetadata <- MerkleTree
.fromProtoOptionV0(
protoTransactionTree.participantMetadata,
ParticipantMetadata.fromByteString(hashOps),
ParticipantMetadata.fromByteStringUnsafe(hashOps),
)
rootViewsP <- ProtoConverter
.required("GenTransactionTree.rootViews", protoTransactionTree.rootViews)
rootViews <- MerkleSeq.fromProtoV0(
(
hashOps,
TransactionView.fromByteString(ProtoVersion(0))(
TransactionView.fromByteStringLegacy(ProtoVersion(0))(
(hashOps, commonMetadataUnblinded.confirmationPolicy)
),
),
@ -344,6 +345,7 @@ object GenTransactionTree {
)
} yield genTransactionTree
// TODO(#12626) try with context
def fromProtoV1(
hashOps: HashOps,
protoTransactionTree: v1.GenTransactionTree,
@ -352,12 +354,12 @@ object GenTransactionTree {
submitterMetadata <- MerkleTree
.fromProtoOptionV1(
protoTransactionTree.submitterMetadata,
SubmitterMetadata.fromByteString(hashOps),
SubmitterMetadata.fromByteStringUnsafe(hashOps),
)
commonMetadata <- MerkleTree
.fromProtoOptionV1(
protoTransactionTree.commonMetadata,
CommonMetadata.fromByteString(hashOps),
CommonMetadata.fromByteStringUnsafe(hashOps),
)
commonMetadataUnblinded <- commonMetadata.unwrap.leftMap(_ =>
InvariantViolation("GenTransactionTree.commonMetadata is blinded")
@ -365,14 +367,14 @@ object GenTransactionTree {
participantMetadata <- MerkleTree
.fromProtoOptionV1(
protoTransactionTree.participantMetadata,
ParticipantMetadata.fromByteString(hashOps),
ParticipantMetadata.fromByteStringUnsafe(hashOps),
)
rootViewsP <- ProtoConverter
.required("GenTransactionTree.rootViews", protoTransactionTree.rootViews)
rootViews <- MerkleSeq.fromProtoV1(
(
hashOps,
TransactionView.fromByteString(ProtoVersion(1))(
TransactionView.fromByteStringLegacy(ProtoVersion(1))(
(hashOps, commonMetadataUnblinded.confirmationPolicy)
),
),

View File

@ -214,7 +214,7 @@ object TransactionSubviews {
subviews <- subviewsP.traverse(subviewP =>
MerkleTree.fromProtoOptionV0(
Some(subviewP),
TransactionView.fromByteString(ProtoVersion(0))(context),
TransactionView.fromByteStringLegacy(ProtoVersion(0))(context),
)
)
} yield TransactionSubviewsV0(subviews)
@ -226,7 +226,7 @@ object TransactionSubviews {
val (hashOps, _) = context
for {
subviewsP <- ProtoConverter.required("ViewNode.subviews", subviewsPO)
tvParser = TransactionView.fromByteString(ProtoVersion(1))(context)
tvParser = TransactionView.fromByteStringLegacy(ProtoVersion(1))(context)
subviews <- MerkleSeq.fromProtoV1((hashOps, tvParser), subviewsP)
} yield TransactionSubviewsV1(subviews)
}

View File

@ -524,6 +524,7 @@ object TransactionView
val subviewsUnsafe: Lens[TransactionView, TransactionSubviews] =
GenLens[TransactionView](_.subviews)
// TODO(#12626) try with context
private def fromProtoV0(
context: (HashOps, ConfirmationPolicy),
protoView: v0.ViewNode,
@ -532,11 +533,11 @@ object TransactionView
for {
commonData <- MerkleTree.fromProtoOptionV0(
protoView.viewCommonData,
ViewCommonData.fromByteString(context),
ViewCommonData.fromByteStringUnsafe(context),
)
participantData <- MerkleTree.fromProtoOptionV0(
protoView.viewParticipantData,
ViewParticipantData.fromByteString(hashOps),
ViewParticipantData.fromByteStringUnsafe(hashOps),
)
subViews <- TransactionSubviews.fromProtoV0(context, protoView.subviews)
view <- createFromRepresentativePV(hashOps)(
@ -550,6 +551,7 @@ object TransactionView
} yield view
}
// TODO(#12626) try with context
private def fromProtoV1(
context: (HashOps, ConfirmationPolicy),
protoView: v1.ViewNode,
@ -558,11 +560,11 @@ object TransactionView
for {
commonData <- MerkleTree.fromProtoOptionV1(
protoView.viewCommonData,
ViewCommonData.fromByteString(context),
ViewCommonData.fromByteStringUnsafe(context),
)
participantData <- MerkleTree.fromProtoOptionV1(
protoView.viewParticipantData,
ViewParticipantData.fromByteString(hashOps),
ViewParticipantData.fromByteStringUnsafe(hashOps),
)
subViews <- TransactionSubviews.fromProtoV1(context, protoView.subviews)
view <- createFromRepresentativePV(hashOps)(

View File

@ -101,22 +101,24 @@ object TransferInViewTree
),
)
// TODO(#12626) try with context
def fromProtoV0(
hashOps: HashOps,
transferInViewTreeP: v0.TransferViewTree,
): ParsingResult[TransferInViewTree] =
GenTransferViewTree.fromProtoV0(
TransferInCommonData.fromByteString(hashOps),
TransferInView.fromByteString(hashOps),
TransferInCommonData.fromByteStringUnsafe(hashOps),
TransferInView.fromByteStringUnsafe(hashOps),
)((commonData, view) => new TransferInViewTree(commonData, view)(hashOps))(transferInViewTreeP)
// TODO(#12626) try with context
def fromProtoV1(
hashOps: HashOps,
transferInViewTreeP: v1.TransferViewTree,
): ParsingResult[TransferInViewTree] =
GenTransferViewTree.fromProtoV1(
TransferInCommonData.fromByteString(hashOps),
TransferInView.fromByteString(hashOps),
TransferInCommonData.fromByteStringUnsafe(hashOps),
TransferInView.fromByteStringUnsafe(hashOps),
)((commonData, view) => new TransferInViewTree(commonData, view)(hashOps))(transferInViewTreeP)
}

View File

@ -101,12 +101,13 @@ object TransferOutViewTree
hashOps,
)
// TODO(#12626) try with context
def fromProtoV0(hashOps: HashOps)(
transferOutViewTreeP: v0.TransferViewTree
): ParsingResult[TransferOutViewTree] =
GenTransferViewTree.fromProtoV0(
TransferOutCommonData.fromByteString(hashOps),
TransferOutView.fromByteString(hashOps),
TransferOutCommonData.fromByteStringUnsafe(hashOps),
TransferOutView.fromByteStringUnsafe(hashOps),
)((commonData, view) =>
TransferOutViewTree(commonData, view)(
protocolVersionRepresentativeFor(ProtoVersion(0)),
@ -114,12 +115,13 @@ object TransferOutViewTree
)
)(transferOutViewTreeP)
// TODO(#12626) try with context
def fromProtoV1(hashOps: HashOps)(
transferOutViewTreeP: v1.TransferViewTree
): ParsingResult[TransferOutViewTree] =
GenTransferViewTree.fromProtoV1(
TransferOutCommonData.fromByteString(hashOps),
TransferOutView.fromByteString(hashOps),
TransferOutCommonData.fromByteStringUnsafe(hashOps),
TransferOutView.fromByteStringUnsafe(hashOps),
)((commonData, view) =>
TransferOutViewTree(commonData, view)(
protocolVersionRepresentativeFor(ProtoVersion(1)),
@ -920,7 +922,7 @@ object FullTransferOutTree {
crypto: CryptoPureApi
)(bytes: ByteString): ParsingResult[FullTransferOutTree] =
for {
tree <- TransferOutViewTree.fromByteString(crypto)(bytes)
tree <- TransferOutViewTree.fromByteStringUnsafe(crypto)(bytes)
_ <- EitherUtil.condUnitE(
tree.isFullyUnblinded,
OtherError(s"Transfer-out request ${tree.rootHash} is not fully unblinded"),

View File

@ -38,6 +38,9 @@ object ErrorCodeUtils {
}
/** The main Canton error for everything that should be logged and notified
*
* PREFER [[CantonError]] OVER [[BaseCantonError]] IN ORDER TO LOG THE ERROR IMMEDIATELY UPON CREATION
* TO ENSURE WE DON'T LOSE THE ERROR MESSAGE.
*
* In many cases, we return errors that are communicated to clients as a Left. For such cases,
* we should use CantonError to report them.

View File

@ -23,7 +23,7 @@ trait OnShutdownRunner { this: AutoCloseable =>
protected def logger: TracedLogger
/** Check whether we're closing.
* Susceptible to race conditions; unless you're using using this as a flag to the retry lib or you really know
* Susceptible to race conditions; unless you're using this as a flag to the retry lib or you really know
* what you're doing, prefer `performUnlessClosing` and friends.
*/
def isClosing: Boolean = closingFlag.get()

View File

@ -132,7 +132,7 @@ trait PrettyInstances {
implicit val prettyUuid: Pretty[UUID] = prettyOfString(_.toString.readableHash.toString)
// There is deliberately no instance for `String` to force clients
// use ShowUtil.ShowStringSyntax instead.
// use ShowUtil.ShowStringSyntax instead (e.g. "string".singleQuoted).
def prettyString: Pretty[String] = prettyOfString(identity)
implicit val prettyByteString: Pretty[ByteString] =
@ -274,9 +274,9 @@ trait PrettyInstances {
implicit def prettyV2DeduplicationPeriod: Pretty[DeduplicationPeriod] =
prettyOfString {
case deduplicationDuration: DeduplicationPeriod.DeduplicationDuration =>
s"DeduplicationDuration(duration=${deduplicationDuration.duration}"
s"(duration=${deduplicationDuration.duration})"
case dedupOffset: DeduplicationPeriod.DeduplicationOffset =>
s"DeduplicationOffset(offset=${dedupOffset.offset}"
s"(offset=${dedupOffset.offset})"
}
implicit def prettyCompletion: Pretty[Completion] =

View File

@ -57,6 +57,14 @@ trait PrettyUtil {
): T => Option[Tree] =
conditionalParam[T, V](getValue, cond, value => mkNameValue(name, value.toTree))
/** A tree only written if not matching the default value */
def paramIfNotDefault[T, V: Pretty](
name: String,
getValue: T => V,
default: V,
): T => Option[Tree] =
param(name, getValue, getValue(_) != default)
private def conditionalParam[T, V](
getValue: T => V,
cond: T => Boolean,
@ -115,6 +123,10 @@ trait PrettyUtil {
): T => Option[Tree] =
conditionalParam(getValue, cond, treeOfString)
/** Use this to indicate that you've omitted fields from pretty printing */
def indicateOmittedFields[T]: T => Option[Tree] =
customParam(_ => "...")
/** Use this to give a class with a singleton parameter the same pretty representation as the parameter.
*/
def prettyOfParam[T, V: Pretty](getValue: T => V): Pretty[T] = inst => getValue(inst).toTree

View File

@ -8,7 +8,7 @@ import cats.syntax.either.*
import cats.syntax.traverse.*
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.ProtoDeserializationError.InvariantViolation
import com.digitalasset.canton.config.RequireTypes.NonNegativeInt
import com.digitalasset.canton.config.RequireTypes.{NonNegativeInt, PositiveInt}
import com.digitalasset.canton.crypto.*
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
@ -58,6 +58,14 @@ object DomainParameters {
}
}
/** @param catchUpParameters Optional parameters of type [[com.digitalasset.canton.protocol.CatchUpConfig]].
* Defined starting with protobuf version v2 and protocol version v6.
* If None, the catch-up mode is disabled: the participant does not trigger the
* catch-up mode when lagging behind.
* If not None, it specifies the number of reconciliation intervals that the
* participant skips in catch-up mode, and the number of catch-up intervals
* intervals a participant should lag behind in order to enter catch-up mode.
*/
@nowarn("msg=deprecated") // TODO(#15221) Remove deprecated parameters with next breaking version
final case class StaticDomainParameters private (
@deprecated(
@ -79,6 +87,7 @@ final case class StaticDomainParameters private (
requiredHashAlgorithms: NonEmpty[Set[HashAlgorithm]],
requiredCryptoKeyFormats: NonEmpty[Set[CryptoKeyFormat]],
protocolVersion: ProtocolVersion,
catchUpParameters: Option[CatchUpConfig],
) extends HasProtocolVersionedWrapper[StaticDomainParameters] {
override val representativeProtocolVersion: RepresentativeProtocolVersion[
@ -119,6 +128,18 @@ final case class StaticDomainParameters private (
requiredCryptoKeyFormats = requiredCryptoKeyFormats.toSeq.map(_.toProtoEnum),
protocolVersion = protocolVersion.toProtoPrimitive,
)
def toProtoV2: protoV2.StaticDomainParameters =
protoV2.StaticDomainParameters(
uniqueContractKeys = uniqueContractKeys,
requiredSigningKeySchemes = requiredSigningKeySchemes.toSeq.map(_.toProtoEnum),
requiredEncryptionKeySchemes = requiredEncryptionKeySchemes.toSeq.map(_.toProtoEnum),
requiredSymmetricKeySchemes = requiredSymmetricKeySchemes.toSeq.map(_.toProtoEnum),
requiredHashAlgorithms = requiredHashAlgorithms.toSeq.map(_.toProtoEnum),
requiredCryptoKeyFormats = requiredCryptoKeyFormats.toSeq.map(_.toProtoEnum),
protocolVersion = protocolVersion.toProtoPrimitive,
catchUpParameters = catchUpParameters.map(_.toProtoV2),
)
}
@nowarn("msg=deprecated")
object StaticDomainParameters
@ -137,6 +158,12 @@ object StaticDomainParameters
supportedProtoVersion(_)(fromProtoV1),
_.toProtoV1.toByteString,
),
ProtoVersion(2) -> VersionedProtoConverter(ProtocolVersion.v6)(
protoV2.StaticDomainParameters
)(
supportedProtoVersion(_)(fromProtoV2),
_.toProtoV2.toByteString,
),
)
override lazy val invariants = Seq(
@ -174,6 +201,13 @@ object StaticDomainParameters
defaultMaxRequestSize,
)
lazy val defaultCatchUpParameters = DefaultValueUntilExclusive(
_.catchUpParameters,
"catchUpParameters",
protocolVersionRepresentativeFor(ProtocolVersion.v6),
None,
)
override def name: String = "static domain parameters"
def create(
@ -188,6 +222,7 @@ object StaticDomainParameters
reconciliationInterval: PositiveSeconds =
StaticDomainParameters.defaultReconciliationInterval,
maxRatePerParticipant: NonNegativeInt = StaticDomainParameters.defaultMaxRatePerParticipant,
catchUpParameters: Option[CatchUpConfig],
): StaticDomainParameters = StaticDomainParameters(
reconciliationInterval = defaultReconciliationIntervalFrom
.orValue(reconciliationInterval, protocolVersion),
@ -200,6 +235,7 @@ object StaticDomainParameters
requiredSymmetricKeySchemes = requiredSymmetricKeySchemes,
requiredHashAlgorithms = requiredHashAlgorithms,
requiredCryptoKeyFormats = requiredCryptoKeyFormats,
catchUpParameters = catchUpParameters,
protocolVersion = protocolVersion,
)
@ -278,6 +314,7 @@ object StaticDomainParameters
requiredHashAlgorithms = requiredHashAlgorithms,
requiredCryptoKeyFormats = requiredCryptoKeyFormats,
protocolVersion = protocolVersion,
catchUpParameters = defaultCatchUpParameters.defaultValue,
)
}
@ -332,6 +369,64 @@ object StaticDomainParameters
requiredHashAlgorithms,
requiredCryptoKeyFormats,
protocolVersion,
catchUpParameters = defaultCatchUpParameters.defaultValue,
)
}
def fromProtoV2(
domainParametersP: protoV2.StaticDomainParameters
): ParsingResult[StaticDomainParameters] = {
val protoV2.StaticDomainParameters(
uniqueContractKeys,
requiredSigningKeySchemesP,
requiredEncryptionKeySchemesP,
requiredSymmetricKeySchemesP,
requiredHashAlgorithmsP,
requiredCryptoKeyFormatsP,
protocolVersionP,
catchUpParametersP,
) = domainParametersP
for {
requiredSigningKeySchemes <- requiredKeySchemes(
"requiredSigningKeySchemes",
requiredSigningKeySchemesP,
SigningKeyScheme.fromProtoEnum,
)
requiredEncryptionKeySchemes <- requiredKeySchemes(
"requiredEncryptionKeySchemes",
requiredEncryptionKeySchemesP,
EncryptionKeyScheme.fromProtoEnum,
)
requiredSymmetricKeySchemes <- requiredKeySchemes(
"requiredSymmetricKeySchemes",
requiredSymmetricKeySchemesP,
SymmetricKeyScheme.fromProtoEnum,
)
requiredHashAlgorithms <- requiredKeySchemes(
"requiredHashAlgorithms",
requiredHashAlgorithmsP,
HashAlgorithm.fromProtoEnum,
)
requiredCryptoKeyFormats <- requiredKeySchemes(
"requiredCryptoKeyFormats",
requiredCryptoKeyFormatsP,
CryptoKeyFormat.fromProtoEnum,
)
protocolVersion <- ProtocolVersion.fromProtoPrimitive(protocolVersionP)
catchUpParameters <- catchUpParametersP.traverse(CatchUpConfig.fromProtoV2)
} yield StaticDomainParameters(
StaticDomainParameters.defaultReconciliationInterval,
StaticDomainParameters.defaultMaxRatePerParticipant,
StaticDomainParameters.defaultMaxRequestSize,
uniqueContractKeys,
requiredSigningKeySchemes,
requiredEncryptionKeySchemes,
requiredSymmetricKeySchemes,
requiredHashAlgorithms,
requiredCryptoKeyFormats,
protocolVersion,
catchUpParameters,
)
}
}
@ -1168,3 +1263,51 @@ final case class DynamicDomainParametersWithValidity(
def transferExclusivityTimeout: NonNegativeFiniteDuration = parameters.transferExclusivityTimeout
def sequencerSigningTolerance: NonNegativeFiniteDuration = parameters.sequencerSigningTolerance
}
/** The class specifies the catch-up parameters governing the catch-up mode of a participant lagging behind with its
* ACS commitments computation.
*
* @param catchUpIntervalSkip The number of reconciliation intervals that the participant skips in
* catch-up mode.
* A catch-up interval thus has a length of
* reconciliation interval * `catchUpIntervalSkip`.
* Note that, to ensure that all participants catch up to the same timestamp, the
* interval count starts at the beginning of time, as opposed to starting at the
* participant's current time when it triggers catch-up.
* For example, with time beginning at 0, a reconciliation interval of 5 seconds,
* and a catchUpIntervalSkip of 2 (intervals), a participant triggering catch-up at
* time 15 seconds will catch-up to timestamp 20 seconds.
* @param nrIntervalsToTriggerCatchUp The number of catch-up intervals intervals a participant should lag behind in
* order to enter catch-up mode. If a participant's current timestamp is behind
* the timestamp of valid received commitments by reconciliation interval *
* `catchUpIntervalSkip`.value` * `nrIntervalsToTriggerCatchUp`, and
* `catchUpModeEnabled` is true, then the participant triggers catch-up mode.
*/
final case class CatchUpConfig(
catchUpIntervalSkip: PositiveInt,
nrIntervalsToTriggerCatchUp: PositiveInt,
) extends PrettyPrinting {
override def pretty: Pretty[CatchUpConfig] = prettyOfClass(
param("catchUpIntervalSkip", _.catchUpIntervalSkip),
param("nrIntervalsToTriggerCatchUp", _.nrIntervalsToTriggerCatchUp),
)
def toProtoV2: protoV2.CatchUpConfig = protoV2.CatchUpConfig(
catchUpIntervalSkip.value,
nrIntervalsToTriggerCatchUp.value,
)
}
object CatchUpConfig {
def fromProtoV2(
value: v2.CatchUpConfig
): ParsingResult[CatchUpConfig] = {
val v2.CatchUpConfig(catchUpIntervalSkipP, nrIntervalsToTriggerCatchUpP) = value
for {
catchUpIntervalSkip <- ProtoConverter.parsePositiveInt(catchUpIntervalSkipP)
nrIntervalsToTriggerCatchUp <- ProtoConverter.parsePositiveInt(
nrIntervalsToTriggerCatchUpP
)
} yield CatchUpConfig(catchUpIntervalSkip, nrIntervalsToTriggerCatchUp)
}
}

View File

@ -253,8 +253,11 @@ object DomainTopologyTransactionMessage
message: v0.DomainTopologyTransactionMessage
): ParsingResult[DomainTopologyTransactionMessage] = {
val v0.DomainTopologyTransactionMessage(signature, _domainId, transactions) = message
for {
succeededContent <- transactions.toList.traverse(SignedTopologyTransaction.fromByteString)
succeededContent <- transactions.toList.traverse(
SignedTopologyTransaction.fromByteStringUnsafe
) // TODO(#12626) - try with context
signature <- ProtoConverter.parseRequired(Signature.fromProtoV0, "signature", signature)
domainUid <- UniqueIdentifier.fromProtoPrimitive(message.domainId, "domainId")
} yield DomainTopologyTransactionMessage(
@ -271,7 +274,7 @@ object DomainTopologyTransactionMessage
val v1.DomainTopologyTransactionMessage(signature, domainId, timestamp, transactions) = message
for {
succeededContent <- transactions.toList.traverse(
SignedTopologyTransaction.fromByteString
SignedTopologyTransaction.fromByteStringUnsafe // TODO(#12626) try with context
)
signature <- ProtoConverter.parseRequired(Signature.fromProtoV0, "signature", signature)
domainUid <- UniqueIdentifier.fromProtoPrimitive(domainId, "domainId")

View File

@ -80,6 +80,10 @@ sealed trait EncryptedView[+VT <: ViewType] extends Product with Serializable {
// Unfortunately, there doesn't seem to be a way to convince Scala's type checker that the two types must be equal.
if (desiredViewType == viewType) Some(this.asInstanceOf[EncryptedView[desiredViewType.type]])
else None
/** Indicative size for pretty printing */
def sizeHint: Int
}
object EncryptedView {
def apply[VT <: ViewType](
@ -88,6 +92,7 @@ object EncryptedView {
new EncryptedView[VT] {
override val viewType: aViewType.type = aViewType
override val viewTree = aViewTree
override lazy val sizeHint: Int = aViewTree.ciphertext.size
}
def compressed[VT <: ViewType](
@ -183,6 +188,7 @@ sealed trait EncryptedViewMessage[+VT <: ViewType] extends UnsignedProtocolMessa
override def pretty: Pretty[EncryptedViewMessage.this.type] = prettyOfClass(
param("view hash", _.viewHash),
param("view type", _.viewType),
param("size", _.encryptedView.sizeHint),
)
def toByteString: ByteString

View File

@ -292,7 +292,7 @@ object EnvelopeContent extends HasProtocolVersionedWithContextCompanion[Envelope
bytes: Array[Byte]
)(implicit cast: ProtocolMessageContentCast[M]): ParsingResult[M] = {
for {
envelopeContent <- fromByteString(protocolVersion)(hashOps)(ByteString.copyFrom(bytes))
envelopeContent <- fromByteStringLegacy(protocolVersion)(hashOps)(ByteString.copyFrom(bytes))
message <- cast
.toKind(envelopeContent.message)
.toRight(

View File

@ -120,7 +120,7 @@ object RegisterTopologyTransactionRequest
requestedBy <- Member.fromProtoPrimitive(message.requestedBy, "requestedBy")
participantUid <- UniqueIdentifier.fromProtoPrimitive(message.participant, "participant")
transactions <- message.signedTopologyTransactions.toList.traverse(elem =>
SignedTopologyTransaction.fromByteString(elem)
SignedTopologyTransaction.fromByteStringUnsafe(elem) // TODO(#12626) try with context
)
domainUid <- UniqueIdentifier.fromProtoPrimitive(message.domainId, "domainId")
requestId <- String255.fromProtoPrimitive(message.requestId, "requestId")

View File

@ -280,7 +280,7 @@ object SignedProtocolMessage
throw new IllegalStateException(s"Failed to create signed protocol message: $err")
)
private[messages] def fromProtoV0(
private[messages] def fromProtoV0( // TODO(#12626) try with context
hashOps: HashOps,
signedMessageP: v0.SignedProtocolMessage,
): ParsingResult[SignedProtocolMessage[SignedProtocolMessageContent]] = {
@ -289,15 +289,19 @@ object SignedProtocolMessage
for {
message <- (messageBytes match {
case Sm.MediatorResponse(mediatorResponseBytes) =>
MediatorResponse.fromByteString(mediatorResponseBytes)
MediatorResponse.fromByteStringUnsafe(mediatorResponseBytes)
case Sm.TransactionResult(transactionResultMessageBytes) =>
TransactionResultMessage.fromByteString(hashOps)(transactionResultMessageBytes)
TransactionResultMessage.fromByteStringUnsafe(hashOps)(
transactionResultMessageBytes
)
case Sm.TransferResult(transferResultBytes) =>
TransferResult.fromByteString(transferResultBytes)
TransferResult.fromByteStringUnsafe(transferResultBytes)
case Sm.AcsCommitment(acsCommitmentBytes) =>
AcsCommitment.fromByteString(acsCommitmentBytes)
AcsCommitment.fromByteStringUnsafe(acsCommitmentBytes)
case Sm.MalformedMediatorRequestResult(malformedMediatorRequestResultBytes) =>
MalformedMediatorRequestResult.fromByteString(malformedMediatorRequestResultBytes)
MalformedMediatorRequestResult.fromByteStringUnsafe(
malformedMediatorRequestResultBytes
)
case Sm.Empty =>
Left(OtherError("Deserialization of a SignedMessage failed due to a missing message"))
}): ParsingResult[SignedProtocolMessageContent]
@ -316,7 +320,9 @@ object SignedProtocolMessage
): ParsingResult[SignedProtocolMessage[SignedProtocolMessageContent]] = {
val v1.SignedProtocolMessage(signaturesP, typedMessageBytes) = signedMessageP
for {
typedMessage <- TypedSignedProtocolMessageContent.fromByteString(hashOps)(typedMessageBytes)
typedMessage <- TypedSignedProtocolMessageContent.fromByteStringUnsafe(hashOps)(
typedMessageBytes
)
signatures <- ProtoConverter.parseRequiredNonEmpty(
Signature.fromProtoV0,
"signatures",

View File

@ -90,7 +90,10 @@ object TypedSignedProtocolMessageContent
): TypedSignedProtocolMessageContent[M] =
TypedSignedProtocolMessageContent(content)(protocolVersionRepresentativeFor(protoVersion), None)
private def fromProtoV0(hashOps: HashOps, proto: v0.TypedSignedProtocolMessageContent)(
private def fromProtoV0(
hashOps: HashOps,
proto: v0.TypedSignedProtocolMessageContent,
)( // TODO(#12626) try with context
bytes: ByteString
): ParsingResult[TypedSignedProtocolMessageContent[SignedProtocolMessageContent]] = {
import v0.TypedSignedProtocolMessageContent.SomeSignedProtocolMessage as Sm
@ -98,15 +101,19 @@ object TypedSignedProtocolMessageContent
for {
message <- (messageBytes match {
case Sm.MediatorResponse(mediatorResponseBytes) =>
MediatorResponse.fromByteString(mediatorResponseBytes)
MediatorResponse.fromByteStringUnsafe(mediatorResponseBytes)
case Sm.TransactionResult(transactionResultMessageBytes) =>
TransactionResultMessage.fromByteString(hashOps)(transactionResultMessageBytes)
TransactionResultMessage.fromByteStringUnsafe(hashOps)(
transactionResultMessageBytes
)
case Sm.TransferResult(transferResultBytes) =>
TransferResult.fromByteString(transferResultBytes)
TransferResult.fromByteStringUnsafe(transferResultBytes)
case Sm.AcsCommitment(acsCommitmentBytes) =>
AcsCommitment.fromByteString(acsCommitmentBytes)
AcsCommitment.fromByteStringUnsafe(acsCommitmentBytes)
case Sm.MalformedMediatorRequestResult(malformedMediatorRequestResultBytes) =>
MalformedMediatorRequestResult.fromByteString(malformedMediatorRequestResultBytes)
MalformedMediatorRequestResult.fromByteStringUnsafe(
malformedMediatorRequestResultBytes
)
case Sm.Empty =>
Left(OtherError("Deserialization of a SignedMessage failed due to a missing message"))
}): ParsingResult[SignedProtocolMessageContent]

View File

@ -105,7 +105,7 @@ final case class GrpcSequencerConnection(
prettyOfClass(
param("endpoints", _.endpoints.map(_.toURI(transportSecurity)).toList),
param("transportSecurity", _.transportSecurity),
param("customTrustCertificates", _.customTrustCertificates),
paramIfTrue("customTrustCertificates", _.customTrustCertificates.nonEmpty),
)
override def addEndpoints(

View File

@ -378,7 +378,7 @@ object SequencedEventValidator extends HasLoggerName {
if (signingTimestamp <= approximateSnapshotTime) {
EitherT(snapshotF.flatMap(validateWithSnapshot))
} else {
loggingContext.info(
loggingContext.debug(
s"Validating event at $sequencingTimestamp optimistically with snapshot taken at $approximateSnapshotTime"
)
EitherT(validateWithSnapshot(approximateSnapshot))

View File

@ -532,7 +532,7 @@ class SequencerClientImpl(
override def generateMaxSequencingTime: CantonTimestamp =
clock.now.add(config.defaultMaxSequencingTimeOffset.asJava)
override protected def generateMessageId: MessageId = MessageId.randomMessageId()
override def generateMessageId: MessageId = MessageId.randomMessageId()
/** Create a subscription for sequenced events for this member,
* starting after the prehead in the `sequencerCounterTrackerStore`.

View File

@ -58,5 +58,5 @@ trait SequencerClientSend {
/** Generates a message id.
* The message id is only for correlation within this client and does not need to be globally unique.
*/
protected def generateMessageId: MessageId = MessageId.randomMessageId()
def generateMessageId: MessageId = MessageId.randomMessageId()
}

View File

@ -386,7 +386,7 @@ class GrpcSequencerClientTransport(
metrics,
timeouts,
loggerFactory,
)
)(protocolVersion)
context.run(() =>
TraceContextGrpc.withGrpcContext(traceContext) {
@ -412,7 +412,7 @@ class GrpcSequencerClientTransport(
metrics,
timeouts,
loggerFactory,
)
)(protocolVersion)
context.run(() =>
TraceContextGrpc.withGrpcContext(traceContext) {

View File

@ -138,12 +138,12 @@ class GrpcSequencerClientTransportPekko(
if (requiresAuthentication) sequencerServiceClient.subscribeVersioned _
else sequencerServiceClient.subscribeUnauthenticatedVersioned _
mkSubscription(subscriber)(SubscriptionResponse.fromVersionedProtoV0(_)(_))
mkSubscription(subscriber)(SubscriptionResponse.fromVersionedProtoV0(protocolVersion)(_)(_))
} else {
val subscriber =
if (requiresAuthentication) sequencerServiceClient.subscribe _
else sequencerServiceClient.subscribeUnauthenticated _
mkSubscription(subscriber)(SubscriptionResponse.fromProtoV0(_)(_))
mkSubscription(subscriber)(SubscriptionResponse.fromProtoV0(protocolVersion)(_)(_))
}
}

View File

@ -22,6 +22,7 @@ import com.digitalasset.canton.store.SequencedEventStore.OrdinarySequencedEvent
import com.digitalasset.canton.tracing.TraceContext.withTraceContext
import com.digitalasset.canton.tracing.{NoTracing, SerializableTraceContext, TraceContext, Traced}
import com.digitalasset.canton.util.FutureUtil
import com.digitalasset.canton.version.ProtocolVersion
import com.google.common.annotations.VisibleForTesting
import io.grpc.Context.CancellableContext
import io.grpc.Status.Code.CANCELLED
@ -262,14 +263,15 @@ object GrpcSequencerSubscription {
metrics: SequencerClientMetrics,
timeouts: ProcessingTimeout,
loggerFactory: NamedLoggerFactory,
)(implicit
)(protocolVersion: ProtocolVersion)(implicit
executionContext: ExecutionContext
): GrpcSequencerSubscription[E, v0.SubscriptionResponse] =
new GrpcSequencerSubscription(
context,
deserializingSubscriptionHandler(
handler,
(value, traceContext) => SubscriptionResponse.fromProtoV0(value)(traceContext),
(value, traceContext) =>
SubscriptionResponse.fromProtoV0(protocolVersion)(value)(traceContext),
),
metrics,
timeouts,
@ -282,14 +284,15 @@ object GrpcSequencerSubscription {
metrics: SequencerClientMetrics,
timeouts: ProcessingTimeout,
loggerFactory: NamedLoggerFactory,
)(implicit
)(protocolVersion: ProtocolVersion)(implicit
executionContext: ExecutionContext
): GrpcSequencerSubscription[E, v0.VersionedSubscriptionResponse] =
new GrpcSequencerSubscription(
context,
deserializingSubscriptionHandler(
handler,
(value, traceContext) => SubscriptionResponse.fromVersionedProtoV0(value)(traceContext),
(value, traceContext) =>
SubscriptionResponse.fromVersionedProtoV0(protocolVersion)(value)(traceContext),
),
metrics,
timeouts,

View File

@ -70,16 +70,19 @@ final case class ClosedEnvelope private (
): ParsingResult[DefaultOpenEnvelope] = {
NonEmpty.from(signatures) match {
case None =>
EnvelopeContent.fromByteString(protocolVersion)(hashOps)(bytes).map { envelopeContent =>
OpenEnvelope(envelopeContent.message, recipients)(protocolVersion)
EnvelopeContent.fromByteStringLegacy(protocolVersion)(hashOps)(bytes).map {
envelopeContent =>
OpenEnvelope(envelopeContent.message, recipients)(protocolVersion)
}
case Some(signaturesNE) =>
TypedSignedProtocolMessageContent.fromByteString(hashOps)(bytes).map { typedMessage =>
OpenEnvelope(
SignedProtocolMessage.tryCreate(typedMessage, signaturesNE, protocolVersion),
recipients,
)(protocolVersion)
}
TypedSignedProtocolMessageContent
.fromByteString(protocolVersion)(hashOps)(bytes)
.map { typedMessage =>
OpenEnvelope(
SignedProtocolMessage.tryCreate(typedMessage, signaturesNE, protocolVersion),
recipients,
)(protocolVersion)
}
}
}

View File

@ -198,7 +198,9 @@ object SequencedEvent
def fromByteStringOpen(hashOps: HashOps, protocolVersion: ProtocolVersion)(
bytes: ByteString
): ParsingResult[SequencedEvent[DefaultOpenEnvelope]] =
fromByteString(bytes).flatMap(_.traverse(_.openEnvelope(hashOps, protocolVersion)))
fromByteStringUnsafe(bytes).flatMap(
_.traverse(_.openEnvelope(hashOps, protocolVersion))
)
implicit val sequencedEventEnvelopeBox: EnvelopeBox[SequencedEvent] =
new EnvelopeBox[SequencedEvent] {

View File

@ -7,6 +7,7 @@ import cats.syntax.traverse.*
import com.digitalasset.canton.domain.api.v0
import com.digitalasset.canton.serialization.ProtoConverter.{ParsingResult, required}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.version.ProtocolVersion
final case class SubscriptionResponse(
signedSequencedEvent: SignedContent[SequencedEvent[ClosedEnvelope]],
@ -20,7 +21,7 @@ object SubscriptionResponse {
* [[com.digitalasset.canton.domain.api.v0.SubscriptionResponse.traceContext]] field and instead use the supplied value.
* This is because we deserialize the traceContext separately from this request immediately on receiving the structure.
*/
def fromProtoV0(responseP: v0.SubscriptionResponse)(implicit
def fromProtoV0(protocolVersion: ProtocolVersion)(responseP: v0.SubscriptionResponse)(implicit
traceContext: TraceContext
): ParsingResult[SubscriptionResponse] = {
val v0.SubscriptionResponse(
@ -33,11 +34,15 @@ object SubscriptionResponse {
maybeSignedSequencedEventP,
)
signedContent <- SignedContent.fromProtoV0(signedSequencedEventP)
signedSequencedEvent <- signedContent.deserializeContent(SequencedEvent.fromByteString)
signedSequencedEvent <- signedContent.deserializeContent(
SequencedEvent.fromByteString(protocolVersion)
)
} yield SubscriptionResponse(signedSequencedEvent, traceContext, None)
}
def fromVersionedProtoV0(responseP: v0.VersionedSubscriptionResponse)(implicit
def fromVersionedProtoV0(protocolVersion: ProtocolVersion)(
responseP: v0.VersionedSubscriptionResponse
)(implicit
traceContext: TraceContext
): ParsingResult[SubscriptionResponse] = {
val v0.VersionedSubscriptionResponse(
@ -46,8 +51,12 @@ object SubscriptionResponse {
trafficStateP,
) = responseP
for {
signedContent <- SignedContent.fromByteString(signedSequencedEvent)
signedSequencedEvent <- signedContent.deserializeContent(SequencedEvent.fromByteString)
signedContent <- SignedContent.fromByteString(protocolVersion)(
signedSequencedEvent
)
signedSequencedEvent <- signedContent.deserializeContent(
SequencedEvent.fromByteString(protocolVersion)
)
trafficState <- trafficStateP.traverse(SequencedEventTrafficState.fromProtoV0)
} yield SubscriptionResponse(signedSequencedEvent, traceContext, trafficState)

View File

@ -21,12 +21,11 @@ import com.digitalasset.canton.sequencing.protocol.{
SignedContent,
}
import com.digitalasset.canton.sequencing.{OrdinarySerializedEvent, PossiblyIgnoredSerializedEvent}
import com.digitalasset.canton.serialization.ProtoConverter
import com.digitalasset.canton.store.*
import com.digitalasset.canton.store.db.DbSequencedEventStore.*
import com.digitalasset.canton.tracing.{SerializableTraceContext, TraceContext}
import com.digitalasset.canton.util.{EitherTUtil, Thereafter}
import com.digitalasset.canton.version.{ProtocolVersion, UntypedVersionedMessage, VersionedMessage}
import com.digitalasset.canton.version.ProtocolVersion
import slick.jdbc.{GetResult, SetParameter}
import java.util.concurrent.Semaphore
@ -101,11 +100,11 @@ class DbSequencedEventStore(
traceContext
)
case _ =>
val signedEvent = ProtoConverter
.protoParserArray(UntypedVersionedMessage.parseFrom)(eventBytes)
.map(VersionedMessage.apply)
.flatMap(SignedContent.fromProtoVersioned(_))
.flatMap(_.deserializeContent(SequencedEvent.fromByteString))
val signedEvent = SignedContent
.fromByteArrayUnsafe(eventBytes)
.flatMap(
_.deserializeContent(SequencedEvent.fromByteString(protocolVersion))
)
.valueOr(err =>
throw new DbDeserializationException(s"Failed to deserialize sequenced event: $err")
)

View File

@ -649,7 +649,9 @@ class StoreBasedTopologySnapshot(
case TopologyStateUpdateElement(_id, SignedLegalIdentityClaim(_, claimBytes, _signature)) =>
val result = for {
claim <- LegalIdentityClaim
.fromByteString(claimBytes)
.fromByteStringUnsafe(
claimBytes
) // TODO(#12626) try with context
.leftMap(err => s"Failed to parse legal identity claim proto: $err")
certOpt = claim.evidence match {

View File

@ -165,7 +165,9 @@ object StoredTopologyTransactions
item.validFrom,
)
validUntil <- item.validFrom.traverse(EffectiveTime.fromProtoPrimitive)
transaction <- SignedTopologyTransaction.fromByteString(item.transaction)
transaction <- SignedTopologyTransaction.fromByteStringUnsafe(
item.transaction
) // TODO(#12626) try with context use an optional protocol version for cases when the protocol version is known
} yield StoredTopologyTransaction(
sequenced,
validFrom,

View File

@ -157,7 +157,9 @@ object StoredTopologyTransactionsX
item.validFrom,
)
validUntil <- item.validUntil.traverse(EffectiveTime.fromProtoPrimitive)
transaction <- SignedTopologyTransactionX.fromByteString(item.transaction)
transaction <- SignedTopologyTransactionX.fromByteStringUnsafe(
item.transaction
) // TODO(#12626) try with context use an optional protocol version for cases when the protocol version is known
} yield StoredTopologyTransactionX(
sequenced,
validFrom,

View File

@ -166,9 +166,11 @@ object SignedTopologyTransaction
private def fromProtoV0(transactionP: v0.SignedTopologyTransaction)(
bytes: ByteString
): ParsingResult[SignedTopologyTransaction[TopologyChangeOp]] =
): ParsingResult[SignedTopologyTransaction[TopologyChangeOp]] = {
for {
transaction <- TopologyTransaction.fromByteString(transactionP.transaction)
transaction <- TopologyTransaction.fromByteStringUnsafe(
transactionP.transaction
) // TODO(#12626) try with context
publicKey <- ProtoConverter.parseRequired(
SigningPublicKey.fromProtoV0,
"key",
@ -179,16 +181,19 @@ object SignedTopologyTransaction
"signature",
transactionP.signature,
)
protocolVersion = supportedProtoVersions.protocolVersionRepresentativeFor(ProtoVersion(0))
} yield SignedTopologyTransaction(transaction, publicKey, signature)(
protocolVersion,
supportedProtoVersions.protocolVersionRepresentativeFor(ProtoVersion(0)),
Some(bytes),
)
}
def createGetResultDomainTopologyTransaction
: GetResult[SignedTopologyTransaction[TopologyChangeOp]] =
GetResult { r =>
fromByteString(r.<<[ByteString])
fromByteStringUnsafe(
r.<<[ByteString]
)
.valueOr(err =>
throw new DbSerializationException(s"Failed to deserialize TopologyTransaction: $err")
)

View File

@ -204,24 +204,23 @@ object SignedTopologyTransactionX
): ParsingResult[GenericSignedTopologyTransactionX] = {
val v2.SignedTopologyTransactionX(txBytes, signaturesP, isProposal) = transactionP
for {
transaction <- TopologyTransactionX.fromByteString(txBytes)
transaction <- TopologyTransactionX.fromByteStringUnsafe(
txBytes
) // TODO(#12626) - try with context
signatures <- ProtoConverter.parseRequiredNonEmpty(
Signature.fromProtoV0,
"SignedTopologyTransactionX.signatures",
signaturesP,
)
protocolVersion = supportedProtoVersions.protocolVersionRepresentativeFor(ProtoVersion(2))
} yield SignedTopologyTransactionX(transaction, signatures.toSet, isProposal)(
protocolVersion
supportedProtoVersions.protocolVersionRepresentativeFor(ProtoVersion(2))
)
}
def createGetResultDomainTopologyTransaction: GetResult[GenericSignedTopologyTransactionX] =
GetResult { r =>
fromByteString(r.<<[ByteString])
fromByteStringUnsafe(r.<<[ByteString])
.valueOr(err =>
throw new DbSerializationException(
s"Failed to deserialize SignedTopologyTransactionX: $err"

View File

@ -245,7 +245,9 @@ object SignedLegalIdentityClaim {
): ParsingResult[SignedLegalIdentityClaim] =
for {
signature <- ProtoConverter.parseRequired(Signature.fromProtoV0, "signature", value.signature)
claim <- LegalIdentityClaim.fromByteString(value.claim)
claim <- LegalIdentityClaim.fromByteStringUnsafe(
value.claim
) // TODO(#12626) - try with context
} yield SignedLegalIdentityClaim(claim.uid, value.claim, signature)
}

View File

@ -3,7 +3,6 @@
package com.digitalasset.canton.topology.transaction
import com.digitalasset.canton.ProtoDeserializationError
import com.digitalasset.canton.ProtoDeserializationError.*
import com.digitalasset.canton.config.CantonRequireTypes.{
LengthLimitedStringWrapper,
@ -384,21 +383,6 @@ object TopologyStateUpdate {
TopologyTransaction.protocolVersionRepresentativeFor(protocolVersion)
)
def fromByteString(bytes: ByteString): ParsingResult[TopologyStateUpdate[AddRemoveChangeOp]] =
for {
converted <- TopologyTransaction.fromByteString(bytes)
result <- converted match {
case topologyStateUpdate: TopologyStateUpdate[_] =>
Right(topologyStateUpdate)
case _: DomainGovernanceTransaction =>
Left(
ProtoDeserializationError.TransactionDeserialization(
"Expecting TopologyStateUpdate, found DomainGovernanceTransaction"
)
)
}
} yield result
def fromProtoV0(
protoTopologyTransaction: v0.TopologyStateUpdate,
bytes: ByteString,
@ -619,19 +603,4 @@ object DomainGovernanceTransaction {
)
}
def fromByteString(bytes: ByteString): ParsingResult[DomainGovernanceTransaction] =
for {
converted <- TopologyTransaction.fromByteString(bytes)
result <- converted match {
case _: TopologyStateUpdate[_] =>
Left(
ProtoDeserializationError.TransactionDeserialization(
"Expecting DomainGovernanceTransaction, found TopologyStateUpdate"
)
)
case domainGovernanceTransaction: DomainGovernanceTransaction =>
Right(domainGovernanceTransaction)
}
} yield result
}

View File

@ -76,15 +76,6 @@ class SimpleExecutionQueue(
): FutureUnlessShutdown[A] =
genExecute(runIfFailed = false, execution, description, runWhenUnderFailures)
def executeUnderFailures[A](execution: => Future[A], description: String)(implicit
loggingContext: ErrorLoggingContext
): FutureUnlessShutdown[A] =
genExecute(
runIfFailed = true,
FutureUnlessShutdown.outcomeF(execution)(directExecutionContext),
description,
)
def executeUnderFailuresUS[A](execution: => FutureUnlessShutdown[A], description: String)(implicit
loggingContext: ErrorLoggingContext
): FutureUnlessShutdown[A] =

View File

@ -157,9 +157,15 @@ object RetryUtil {
// Retry on operator invention errors, otherwise Canton components crash in an uncontrolled manner when
// the exception bubbles up (don't retry on `query_canceled` and `database_dropped`)
TransientErrorKind
} else if (
error == "53000" || error == "53100" || error == "53200" || error == "53300" || error == "53400"
) {
// Retry insufficient db resource errors
TransientErrorKind
} else {
// Don't retry on other exceptions. These other exceptions should be those for which retrying typically won't
// help, for example a unique constraint violation.
logger.info(s"Fatal sql exception has error code: $error")
FatalErrorKind
}
@ -174,6 +180,8 @@ object RetryUtil {
_: SQLNonTransientConnectionException =>
TransientErrorKind
// Handle SQLException and all classes that derive from it (e.g. java.sql.BatchUpdateException)
// Note that if the exception is not known but has a cause, we'll base the retry on the cause
case ex: SQLException =>
val code = ex.getErrorCode
if (ex.getErrorCode == 1) {

View File

@ -7,6 +7,7 @@ import cats.syntax.either.*
import cats.syntax.foldable.*
import cats.syntax.functor.*
import com.daml.nonempty.{NonEmpty, NonEmptyUtil}
import com.digitalasset.canton.ProtoDeserializationError.OtherError
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.serialization.ProtoConverter
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
@ -368,7 +369,7 @@ trait HasSupportedProtoVersions[ValueClass] {
def protoVersionFor(protocolVersion: ProtocolVersion): ProtoVersion =
supportedProtoVersions.protoVersionFor(protocolVersionRepresentativeFor(protocolVersion))
/** Base class for (de)serializating from/to protobuf of ValueClass from a specific PV
/** Base class for (de)serializing from/to protobuf of ValueClass from a specific PV
*/
sealed trait ProtoCodec {
def fromInclusive: RepresentativeProtocolVersion[HasSupportedProtoVersions.this.type]
@ -633,7 +634,7 @@ trait HasSupportedProtoVersions[ValueClass] {
trait HasProtocolVersionedWrapperCompanion[
ValueClass <: HasRepresentativeProtocolVersion,
DeserializedValueClass,
DeserializedValueClass <: HasRepresentativeProtocolVersion,
] extends HasSupportedProtoVersions[ValueClass]
with Serializable {
@ -666,13 +667,77 @@ trait HasProtocolVersionedWrapperCompanion[
Left(unsupported.deserializationError)
}
}
/** Checks whether the representative protocol version originating from a deserialized proto message field
* is compatible with the protocol version of the domain.
*
* To skip this validation use [[None]] as the expected value, that is the domain protocol version.
*
* @param domainProtocolVersion the protocol version the domain is running on
* @param deserializedRepresentativeProtocolVersion the representative protocol version which originates from a proto message field
* @return Unit when the validation succeeds, parsing error otherwise
*/
private[version] def validateDeserialization(
domainProtocolVersion: Option[ProtocolVersion],
deserializedRepresentativeProtocolVersion: ProtocolVersion,
): ParsingResult[Unit] = {
domainProtocolVersion match {
case Some(pv) =>
val expected = protocolVersionRepresentativeFor(pv).representative
Either.cond(
expected == deserializedRepresentativeProtocolVersion,
(),
unexpectedProtoVersionError(expected, deserializedRepresentativeProtocolVersion),
)
case None =>
Right(())
}
}
private[version] def unexpectedProtoVersionError(
expected: ProtocolVersion,
found: ProtocolVersion,
) = {
OtherError(
s"Error while deserializing a $name; expected representative protocol version $expected but found $found"
)
}
}
trait HasProtocolVersionedWrapperWithoutContextCompanion[
ValueClass <: HasRepresentativeProtocolVersion,
DeserializedValueClass,
DeserializedValueClass <: HasRepresentativeProtocolVersion,
] extends HasProtocolVersionedWrapperCompanion[ValueClass, DeserializedValueClass] {
def fromByteString(bytes: OriginalByteString): ParsingResult[DeserializedValueClass]
/** Deserializes the given bytes and checks that the therein embedded proto version matches the
* domain's protocol version.
*
* Use this method whenever the origin of the given bytes cannot be trusted, and for example the
* proto version message field may be set maliciously. This should be your default choice for
* deserialization.
*
* @param domainProtocolVersion the protocol version on which the domain is running on
* @param bytes an untrusted byte string with an embedded proto version
*/
def fromByteString(
domainProtocolVersion: ProtocolVersion
)(
bytes: OriginalByteString
): ParsingResult[DeserializedValueClass]
/** Deserializes the given bytes without checking that the embedded proto version actually
* matches the protocol version which the domain is running on.
*
* Do NOT use this method unless you can justify that the given bytes originate from a trusted
* source.
*
* @param bytes a trusted byte string with an embedded proto version
*/
def fromByteStringUnsafe(
bytes: OriginalByteString
): ParsingResult[DeserializedValueClass]
}
/** Trait for companion objects of serializable classes with memoization.
@ -680,11 +745,11 @@ trait HasProtocolVersionedWrapperWithoutContextCompanion[
* For example, if a container can serialize its elements, but the container's deserializer
* does not deserialize the elements and instead leaves them as Bytestring.
*
* Use [[HasMemoizedProtocolVersionedWrapperCompanion]] if the type distinction between serialization and deseserialization is not needed.
* Use [[HasMemoizedProtocolVersionedWrapperCompanion]] if the type distinction between serialization and deserialization is not needed.
*/
trait HasMemoizedProtocolVersionedWrapperCompanion2[
ValueClass <: HasRepresentativeProtocolVersion,
DeserializedValueClass,
DeserializedValueClass <: HasRepresentativeProtocolVersion,
] extends HasProtocolVersionedWrapperWithoutContextCompanion[ValueClass, DeserializedValueClass] {
// Deserializer: (Proto => DeserializedValueClass)
override type Deserializer =
@ -698,11 +763,22 @@ trait HasMemoizedProtocolVersionedWrapperCompanion2[
(original: OriginalByteString, data: DataByteString) =>
ProtoConverter.protoParser(p.parseFrom)(data).flatMap(fromProto(_)(original))
def fromByteArray(bytes: Array[Byte]): ParsingResult[DeserializedValueClass] = fromByteString(
ByteString.copyFrom(bytes)
)
override def fromByteString(
domainProtocolVersion: ProtocolVersion
)(
bytes: OriginalByteString
): ParsingResult[DeserializedValueClass] =
for {
valueClass <- fromByteStringUnsafe(bytes)
_ <- validateDeserialization(
Some(domainProtocolVersion),
valueClass.representativeProtocolVersion.representative,
)
} yield valueClass
override def fromByteString(bytes: OriginalByteString): ParsingResult[DeserializedValueClass] =
override def fromByteStringUnsafe(
bytes: OriginalByteString
): ParsingResult[DeserializedValueClass] =
for {
proto <- ProtoConverter.protoParser(UntypedVersionedMessage.parseFrom)(bytes)
data <- proto.wrapper.data.toRight(ProtoDeserializationError.FieldNotSet(s"$name: data"))
@ -715,13 +791,14 @@ trait HasMemoizedProtocolVersionedWrapperCompanion2[
* @param protoVersion Proto version of the bytes to be deserialized
* @param bytes data
*/
def fromByteString(
// TODO(#15250) - Remove this method when protocol versions 3 and 4 are removed
def fromByteStringLegacy(
protoVersion: ProtoVersion
)(bytes: OriginalByteString): ParsingResult[DeserializedValueClass] = {
deserializeForVersion(
protocolVersionRepresentativeFor(protoVersion),
_(bytes, bytes),
fromByteString(bytes),
fromByteStringUnsafe(bytes),
)
}
@ -736,11 +813,11 @@ trait HasMemoizedProtocolVersionedWrapperCompanion2[
* For example, if a container can serialize its elements, but the container's deserializer
* does not deserialize the elements and instead leaves them as Bytestring.
*
* Use [[HasMemoizedProtocolVersionedWithContextCompanion]] if the type distinction between serialization and deseserialization is not needed.
* Use [[HasMemoizedProtocolVersionedWithContextCompanion]] if the type distinction between serialization and deserialization is not needed.
*/
trait HasMemoizedProtocolVersionedWithContextCompanion2[
ValueClass <: HasRepresentativeProtocolVersion,
DeserializedValueClass,
DeserializedValueClass <: HasRepresentativeProtocolVersion,
Context,
] extends HasProtocolVersionedWrapperCompanion[ValueClass, DeserializedValueClass] {
override type Deserializer =
@ -754,18 +831,51 @@ trait HasMemoizedProtocolVersionedWithContextCompanion2[
(ctx: Context, original: OriginalByteString, data: DataByteString) =>
ProtoConverter.protoParser(p.parseFrom)(data).flatMap(fromProto(ctx, _)(original))
/** Deserializes the given bytes and checks that the therein embedded proto version matches the
* domain's protocol version.
*
* Use this method whenever the origin of the given bytes cannot be trusted, and for example the
* proto version message field may be set maliciously. This should be your default choice for
* deserialization.
*
* @param domainProtocolVersion the protocol version on which the domain is running on
* @param context additional information which is required for the deserialization
* @param bytes an untrusted byte string with an embedded proto version
*/
def fromByteString(
domainProtocolVersion: ProtocolVersion
)(
context: Context
)(bytes: OriginalByteString): ParsingResult[DeserializedValueClass] = for {
)(
bytes: OriginalByteString
): ParsingResult[DeserializedValueClass] = for {
valueClass <- fromByteStringUnsafe(context)(bytes)
_ <- validateDeserialization(
Some(domainProtocolVersion),
valueClass.representativeProtocolVersion.representative,
)
} yield valueClass
/** Deserializes the given bytes without checking that the embedded proto version actually
* matches the protocol version which the domain is running on.
*
* Do NOT use this method unless you can justify that the given bytes originate from a trusted
* source.
*
* @param context additional information which required for the deserialization
* @param bytes a trusted byte string with an embedded proto version
*/
def fromByteStringUnsafe(
context: Context
)(
bytes: OriginalByteString
): ParsingResult[DeserializedValueClass] = for {
proto <- ProtoConverter.protoParser(UntypedVersionedMessage.parseFrom)(bytes)
data <- proto.wrapper.data.toRight(ProtoDeserializationError.FieldNotSet(s"$name: data"))
valueClass <- supportedProtoVersions
.deserializerFor(ProtoVersion(proto.version))(context, bytes, data)
} yield valueClass
def fromByteArray(context: Context)(bytes: Array[Byte]): ParsingResult[DeserializedValueClass] =
fromByteString(context)(ByteString.copyFrom(bytes))
override protected def deserializationErrorK(
error: ProtoDeserializationError
): (Context, OriginalByteString, DataByteString) => ParsingResult[DeserializedValueClass] =
@ -777,11 +887,11 @@ trait HasMemoizedProtocolVersionedWithContextCompanion2[
* For example, if a container can serialize its elements, but the container's deserializer
* does not deserialize the elements and instead leaves them as Bytestring.
*
* Use [[HasProtocolVersionedCompanion]] if the type distinction between serialization and deseserialization is not needed.
* Use [[HasProtocolVersionedCompanion]] if the type distinction between serialization and deserialization is not needed.
*/
trait HasProtocolVersionedCompanion2[
ValueClass <: HasRepresentativeProtocolVersion,
DeserializedValueClass,
DeserializedValueClass <: HasRepresentativeProtocolVersion,
] extends HasProtocolVersionedWrapperWithoutContextCompanion[ValueClass, DeserializedValueClass] {
override type Deserializer = DataByteString => ParsingResult[DeserializedValueClass]
@ -792,19 +902,44 @@ trait HasProtocolVersionedCompanion2[
): Deserializer =
(data: DataByteString) => ProtoConverter.protoParser(p.parseFrom)(data).flatMap(fromProto)
def fromByteArray(bytes: Array[Byte]): ParsingResult[DeserializedValueClass] = for {
proto <- ProtoConverter.protoParserArray(UntypedVersionedMessage.parseFrom)(bytes)
valueClass <- fromProtoVersioned(VersionedMessage(proto))
} yield valueClass
/** Deserializes the given bytes without checking that the embedded proto version actually
* matches the protocol version which the domain is running on.
*
* Do NOT use this method unless you can justify that the given bytes originate from a trusted
* source. For example, this should be the case for deserialization of data that originates a
* database.
*
* @param bytes trusted bytes with an embedded proto version
*/
def fromByteArrayUnsafe(bytes: Array[Byte]): ParsingResult[DeserializedValueClass] =
for {
proto <- ProtoConverter.protoParserArray(UntypedVersionedMessage.parseFrom)(bytes)
valueClass <- fromProtoVersioned(VersionedMessage(proto))
} yield valueClass
def fromProtoVersioned(
private def fromProtoVersioned(
proto: VersionedMessage[DeserializedValueClass]
): ParsingResult[DeserializedValueClass] =
proto.wrapper.data.toRight(ProtoDeserializationError.FieldNotSet(s"$name: data")).flatMap {
supportedProtoVersions.deserializerFor(ProtoVersion(proto.version))
}
override def fromByteString(bytes: OriginalByteString): ParsingResult[DeserializedValueClass] =
override def fromByteString(
domainProtocolVersion: ProtocolVersion
)(
bytes: OriginalByteString
): ParsingResult[DeserializedValueClass] =
for {
valueClass <- fromByteStringUnsafe(bytes)
_ <- validateDeserialization(
Some(domainProtocolVersion),
valueClass.representativeProtocolVersion.representative,
)
} yield valueClass
override def fromByteStringUnsafe(
bytes: OriginalByteString
): ParsingResult[DeserializedValueClass] =
for {
proto <- ProtoConverter.protoParser(UntypedVersionedMessage.parseFrom)(bytes)
valueClass <- fromProtoVersioned(VersionedMessage(proto))
@ -843,34 +978,47 @@ trait HasProtocolVersionedCompanion2[
* @param protocolVersion protocol version of the bytes to be deserialized
* @param bytes data
*/
def fromByteString(
// TODO(#15250) - Remove this method when protocol versions 3 and 4 are removed
def fromByteStringLegacy(
protocolVersion: ProtocolVersion
)(bytes: OriginalByteString): ParsingResult[DeserializedValueClass] = {
deserializeForVersion(
protocolVersionRepresentativeFor(protocolVersion),
_(bytes),
fromByteString(bytes),
fromByteStringUnsafe(bytes),
)
}
def readFromFile(
/** Deserializes the data from the given file without checking that the embedded proto version actually
* matches the protocol version which the domain is running on.
*
* Do NOT use this method unless you can justify that the data originates from a trusted
* source.
*/
def readFromFileUnsafe(
inputFile: String
): Either[String, DeserializedValueClass] = {
for {
bs <- BinaryFileUtil.readByteStringFromFile(inputFile)
value <- fromByteString(bs).leftMap(_.toString)
value <- fromByteStringUnsafe(bs).leftMap(_.toString)
} yield value
}
def tryReadFromFile(inputFile: String): DeserializedValueClass =
readFromFile(inputFile).valueOr(err =>
/** Deserializes the data from the given file without checking that the embedded proto version actually
* matches the protocol version which the domain is running on.
*
* Do NOT use this method unless you can justify that the data originates from a trusted
* source.
*/
def tryReadFromFileUnsafe(inputFile: String): DeserializedValueClass =
readFromFileUnsafe(inputFile).valueOr(err =>
throw new IllegalArgumentException(s"Reading $name from file $inputFile failed: $err")
)
implicit def hasVersionedWrapperGetResult(implicit
getResultByteArray: GetResult[Array[Byte]]
): GetResult[DeserializedValueClass] = GetResult { r =>
fromByteArray(r.<<[Array[Byte]]).valueOr(err =>
fromByteArrayUnsafe(r.<<[Array[Byte]]).valueOr(err =>
throw new DbDeserializationException(s"Failed to deserialize $name: $err")
)
}
@ -880,7 +1028,7 @@ trait HasProtocolVersionedCompanion2[
): GetResult[Option[DeserializedValueClass]] = GetResult { r =>
r.<<[Option[Array[Byte]]]
.map(
fromByteArray(_).valueOr(err =>
fromByteArrayUnsafe(_).valueOr(err =>
throw new DbDeserializationException(s"Failed to deserialize $name: $err")
)
)
@ -905,14 +1053,16 @@ trait HasProtocolVersionedWithContextCompanion[
(ctx: Context, data: DataByteString) =>
ProtoConverter.protoParser(p.parseFrom)(data).flatMap(fromProto(ctx, _))
def fromProtoVersioned(
private def fromProtoVersioned(
context: Context
)(proto: VersionedMessage[ValueClass]): ParsingResult[ValueClass] =
proto.wrapper.data.toRight(ProtoDeserializationError.FieldNotSet(s"$name: data")).flatMap {
supportedProtoVersions.deserializerFor(ProtoVersion(proto.version))(context, _)
}
def fromByteString(context: Context)(bytes: OriginalByteString): ParsingResult[ValueClass] = for {
def fromByteStringUnsafe(
context: Context
)(bytes: OriginalByteString): ParsingResult[ValueClass] = for {
proto <- ProtoConverter.protoParser(UntypedVersionedMessage.parseFrom)(bytes)
valueClass <- fromProtoVersioned(context)(VersionedMessage(proto))
} yield valueClass
@ -922,13 +1072,14 @@ trait HasProtocolVersionedWithContextCompanion[
* @param protoVersion Proto version of the bytes to be deserialized
* @param bytes data
*/
def fromByteString(
// TODO(#15250) - Remove this method when protocol versions 3 and 4 are removed
def fromByteStringLegacy(
protoVersion: ProtoVersion
)(context: Context)(bytes: OriginalByteString): ParsingResult[ValueClass] = {
deserializeForVersion(
protocolVersionRepresentativeFor(protoVersion),
_(context, bytes),
fromByteString(context)(bytes),
fromByteStringUnsafe(context)(bytes),
)
}
@ -937,13 +1088,14 @@ trait HasProtocolVersionedWithContextCompanion[
* @param protocolVersion protocol version of the bytes to be deserialized
* @param bytes data
*/
def fromByteString(
// TODO(#15250) - Remove this method when protocol versions 3 and 4 are removed
def fromByteStringLegacy(
protocolVersion: ProtocolVersion
)(context: Context)(bytes: OriginalByteString): ParsingResult[ValueClass] = {
deserializeForVersion(
protocolVersionRepresentativeFor(protocolVersion),
_(context, bytes),
fromByteString(context)(bytes),
fromByteStringUnsafe(context)(bytes),
)
}

View File

@ -171,7 +171,10 @@ object ProtocolVersion {
}
}
private[version] def unsupportedErrorMessage(pv: ProtocolVersion, includeDeleted: Boolean) = {
private[version] def unsupportedErrorMessage(
pv: ProtocolVersion,
includeDeleted: Boolean = false,
) = {
val supportedStablePVs = stableAndSupported.map(_.toString)
val supportedPVs: NonEmpty[List[String]] = if (includeDeleted) {
@ -229,7 +232,7 @@ object ProtocolVersion {
*/
def fromProtoPrimitive(rawVersion: Int): ParsingResult[ProtocolVersion] = {
val pv = ProtocolVersion(rawVersion)
Either.cond(pv.isSupported, pv, OtherError(unsupportedErrorMessage(pv, includeDeleted = false)))
Either.cond(pv.isSupported, pv, OtherError(unsupportedErrorMessage(pv)))
}
/** Like [[create]] ensures a supported protocol version; tailored to (de-)serialization purposes.

View File

@ -207,5 +207,6 @@ object GrpcSequencerConnectClient {
Left(ProtoDeserializationError.FieldNotSet("GetDomainParameters.parameters"))
case Parameters.ParametersV0(parametersV0) => StaticDomainParameters.fromProtoV0(parametersV0)
case Parameters.ParametersV1(parametersV1) => StaticDomainParameters.fromProtoV1(parametersV1)
case Parameters.ParametersV2(parametersV2) => StaticDomainParameters.fromProtoV2(parametersV2)
}
}

View File

@ -166,7 +166,11 @@ final class GrpcTopologyManagerWriteService[T <: CantonError](
for {
parsed <- mapErrNew(
EitherT
.fromEither[Future](SignedTopologyTransaction.fromByteString(request.serialized))
.fromEither[Future](
SignedTopologyTransaction.fromByteString(protocolVersion)(
request.serialized
)
)
.leftMap(ProtoDeserializationFailure.Wrap(_))
)
_ <- mapErrNewEUS(

View File

@ -115,12 +115,14 @@ class GenTransactionTreeTest
"be serialized and deserialized" in {
val fullInformeeTree = example.fullInformeeTree
FullInformeeTree.fromByteString(factory.cryptoOps)(
FullInformeeTree.fromByteStringUnsafe(factory.cryptoOps)(
fullInformeeTree.toByteString
) shouldEqual Right(fullInformeeTree)
val (_, informeeTree) = example.informeeTreeBlindedFor
InformeeTree.fromByteString(factory.cryptoOps)(informeeTree.toByteString) shouldEqual Right(
InformeeTree.fromByteStringUnsafe(factory.cryptoOps)(
informeeTree.toByteString
) shouldEqual Right(
informeeTree
)

View File

@ -124,10 +124,10 @@ class MerkleSeqTest extends AnyWordSpec with BaseTest {
val merkleSeqP = merkleSeq.toByteString
val merkleSeqDeserialized =
MerkleSeq
.fromByteString(
.fromByteStringUnsafe(
(
hashOps,
AbstractLeaf.fromByteString(testedProtocolVersion)(_),
AbstractLeaf.fromByteStringLegacy(testedProtocolVersion)(_),
)
)(merkleSeqP)
.value

View File

@ -263,7 +263,9 @@ class TransactionViewTest extends AnyWordSpec with BaseTest with HasExecutionCon
).value
ViewParticipantData
.fromByteString(hashOps)(vpd.getCryptographicEvidence)
.fromByteString(testedProtocolVersion)(hashOps)(
vpd.getCryptographicEvidence
)
.map(_.unwrap) shouldBe Right(Right(vpd))
}
}

View File

@ -55,6 +55,11 @@ object GeneratorsProtocol {
StaticDomainParameters.defaultMaxRequestSizeFrom,
)
catchUpParameters <- defaultValueGen(
protocolVersion,
StaticDomainParameters.defaultCatchUpParameters,
)
parameters = StaticDomainParameters.create(
maxRequestSize,
uniqueContractKeys,
@ -66,6 +71,7 @@ object GeneratorsProtocol {
protocolVersion,
reconciliationInterval,
maxRatePerParticipant,
catchUpParameters,
)
} yield parameters)

View File

@ -1,112 +0,0 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.protocol
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.error.MediatorError
import com.digitalasset.canton.protocol.messages.Verdict.{
Approve,
MediatorRejectV0,
MediatorRejectV1,
MediatorRejectV2,
ParticipantReject,
}
import com.digitalasset.canton.protocol.messages.{LocalReject, LocalVerdict, Verdict}
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{BaseTest, LfPartyId}
import org.scalatest.wordspec.AnyWordSpec
@SuppressWarnings(
Array(
"org.wartremover.warts.Product",
"org.wartremover.warts.Serializable",
)
)
class VerdictTest extends AnyWordSpec with BaseTest {
private def party(name: String): LfPartyId = LfPartyId.assertFromString(name)
private lazy val representativeProtocolVersion =
LocalVerdict.protocolVersionRepresentativeFor(testedProtocolVersion)
"Verdict" can {
"converting to and from proto" should {
"result in an equal object" in {
val exampleResults = Table(
("type", "value"),
("approve", Approve(testedProtocolVersion)),
(
"reject",
ParticipantReject(
NonEmpty(
List,
(
Set(party("p1"), party("p2")),
LocalReject.MalformedRejects.Payloads
.Reject("some error")(representativeProtocolVersion),
),
(
Set(party("p3")),
LocalReject.ConsistencyRejections.LockedContracts
.Reject(Seq())(representativeProtocolVersion),
),
),
testedProtocolVersion,
),
),
("timeout", VerdictTest.timeoutVerdict(testedProtocolVersion)),
)
forAll(exampleResults) { (resultType: String, original: Verdict) =>
val cycled =
Verdict.fromProtoVersioned(
original.toProtoVersioned
) match {
case Left(err) => fail(err.toString)
case Right(verdict) => verdict
}
assertResult(original, resultType)(cycled)
}
}
}
}
}
object VerdictTest {
def timeoutVerdict(protocolVersion: ProtocolVersion): Verdict.MediatorReject =
if (protocolVersion >= Verdict.MediatorRejectV2.firstApplicableProtocolVersion)
MediatorRejectV2.tryCreate(
MediatorError.MalformedMessage.Reject("").rpcStatusWithoutLoggingContext(),
protocolVersion,
)
else if (protocolVersion >= Verdict.MediatorRejectV1.firstApplicableProtocolVersion)
MediatorRejectV1.tryCreate(
"",
MediatorError.MalformedMessage.id,
MediatorError.MalformedMessage.category.asInt,
protocolVersion,
)
else
MediatorRejectV0.tryCreate(
com.digitalasset.canton.protocol.v0.MediatorRejection.Code.Timeout,
"",
)
def malformedVerdict(protocolVersion: ProtocolVersion): Verdict.MediatorReject =
if (protocolVersion >= Verdict.MediatorRejectV2.firstApplicableProtocolVersion)
MediatorRejectV2.tryCreate(
MediatorError.MalformedMessage.Reject("").rpcStatusWithoutLoggingContext(),
protocolVersion,
)
else if (protocolVersion >= Verdict.MediatorRejectV1.firstApplicableProtocolVersion)
MediatorRejectV1.tryCreate(
"",
MediatorError.MalformedMessage.id,
MediatorError.MalformedMessage.category.asInt,
protocolVersion,
)
else
MediatorRejectV0.tryCreate(
com.digitalasset.canton.protocol.v0.MediatorRejection.Code.Timeout,
"",
)
}

View File

@ -59,7 +59,7 @@ class AcsCommitmentTest extends AnyWordSpec with BaseTest with HasCryptographicE
)
def fromByteString(bytes: ByteString): AcsCommitment = {
AcsCommitment.fromByteString(bytes) match {
AcsCommitment.fromByteString(testedProtocolVersion)(bytes) match {
case Left(x) => fail(x.toString)
case Right(x) => x
}

View File

@ -41,7 +41,9 @@ class MediatorResponseTest extends AnyWordSpec with BaseTest with HasCryptograph
)
def fromByteString(bytes: ByteString): MediatorResponse = {
MediatorResponse.fromByteString(bytes).valueOr(err => fail(err.toString))
MediatorResponse
.fromByteString(testedProtocolVersion)(bytes)
.valueOr(err => fail(err.toString))
}
"ConfirmationResponse" should {

View File

@ -34,7 +34,7 @@ class TopologyTransactionTest extends AnyWordSpec with BaseTest with HasCryptogr
private val deserialize: ByteString => TopologyTransaction[TopologyChangeOp] =
bytes =>
TopologyTransaction.fromByteString(bytes) match {
TopologyTransaction.fromByteString(testedProtocolVersion)(bytes) match {
case Left(err) => throw new TestFailedException(err.toString, 0)
case Right(msg) => msg
}

View File

@ -3,7 +3,6 @@
package com.digitalasset.canton.topology
import com.digitalasset.canton.BaseTest
import com.digitalasset.canton.crypto.Fingerprint
import com.digitalasset.canton.crypto.provider.symbolic.SymbolicCrypto
import com.digitalasset.canton.protocol.TestDomainParameters
@ -11,6 +10,7 @@ import com.digitalasset.canton.serialization.HasCryptographicEvidenceTest
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.topology.transaction.*
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{BaseTest, ProtoDeserializationError}
import com.google.protobuf.ByteString
import org.scalatest.wordspec.AnyWordSpec
@ -52,13 +52,19 @@ class TopologyTransactionTest extends AnyWordSpec with BaseTest with HasCryptogr
"namespace delegation" should {
val ns1 = NamespaceDelegation(Namespace(fingerprint), pubKey, true)
val ns2 = NamespaceDelegation(Namespace(fingerprint2), pubKey2, true)
testConversion(TopologyStateUpdate.createAdd, TopologyTransaction.fromByteString)(
testConversion(
TopologyStateUpdate.createAdd,
TopologyTransaction.fromByteString(testedProtocolVersion),
)(
ns1,
Some(ns2),
)
}
"identifier delegation" should {
testConversion(TopologyStateUpdate.createAdd, TopologyTransaction.fromByteString)(
testConversion(
TopologyStateUpdate.createAdd,
TopologyTransaction.fromByteString(testedProtocolVersion),
)(
IdentifierDelegation(uid, pubKey),
Some(IdentifierDelegation(uid2, pubKey2)),
)
@ -71,7 +77,10 @@ class TopologyTransactionTest extends AnyWordSpec with BaseTest with HasCryptogr
DomainTopologyManagerId(uid),
)
owners.foreach(owner =>
testConversion(TopologyStateUpdate.createAdd, TopologyTransaction.fromByteString)(
testConversion(
TopologyStateUpdate.createAdd,
TopologyTransaction.fromByteString(testedProtocolVersion),
)(
OwnerToKeyMapping(owner, pubKey),
Some(OwnerToKeyMapping(owner, pubKey2)),
hint = " for " + owner.toString,
@ -85,7 +94,10 @@ class TopologyTransactionTest extends AnyWordSpec with BaseTest with HasCryptogr
(RequestSide.To, ParticipantPermission.Observation),
)
sides.foreach { case (side, permission) =>
testConversion(TopologyStateUpdate.createAdd, TopologyTransaction.fromByteString)(
testConversion(
TopologyStateUpdate.createAdd,
TopologyTransaction.fromByteString(testedProtocolVersion),
)(
PartyToParticipant(side, PartyId(uid), ParticipantId(uid2), permission),
Some(PartyToParticipant(side, PartyId(uid2), ParticipantId(uid), permission)),
hint = " for " + side.toString + " and " + permission.toString,
@ -94,12 +106,31 @@ class TopologyTransactionTest extends AnyWordSpec with BaseTest with HasCryptogr
}
"domain parameters change" should {
def fromByteString(bytes: ByteString): ParsingResult[DomainGovernanceTransaction] =
for {
converted <- TopologyTransaction.fromByteStringUnsafe(
bytes
)
result <- converted match {
case _: TopologyStateUpdate[_] =>
Left(
ProtoDeserializationError.TransactionDeserialization(
"Expecting DomainGovernanceTransaction, found TopologyStateUpdate"
)
)
case domainGovernanceTransaction: DomainGovernanceTransaction =>
Right(domainGovernanceTransaction)
}
} yield result
val builder = (
mapping: DomainGovernanceMapping,
protocolVersion: ProtocolVersion,
) => DomainGovernanceTransaction(mapping, protocolVersion)
testConversion(builder, DomainGovernanceTransaction.fromByteString)(
testConversion(builder, fromByteString)(
DomainParametersChange(DomainId(uid), defaultDynamicDomainParameters),
Some(DomainParametersChange(DomainId(uid), defaultDynamicDomainParameters)),
)

View File

@ -24,6 +24,7 @@ class HasProtocolVersionedWrapperTest extends AnyWordSpec with BaseTest {
"HasVersionedWrapperV2" should {
"use correct proto version depending on the protocol version for serialization" in {
def message(i: Int): Message = Message("Hey", 1, 2.0)(protocolVersionRepresentative(i), None)
message(3).toProtoVersioned.version shouldBe 0
message(4).toProtoVersioned.version shouldBe 0
message(5).toProtoVersioned.version shouldBe 1
@ -31,24 +32,27 @@ class HasProtocolVersionedWrapperTest extends AnyWordSpec with BaseTest {
message(7).toProtoVersioned.version shouldBe 2
}
"set correct protocol version depending on the proto version" in {
def fromByteString(bytes: ByteString, protoVersion: Int): Message = Message
.fromByteString(
VersionedMessage[Message](bytes, protoVersion).toByteString
)
.value
def fromByteString(
bytes: ByteString,
protoVersion: Int,
domainProtocolVersion: ProtocolVersion,
): ParsingResult[Message] = Message
.fromByteString(domainProtocolVersion)(
VersionedMessage[Message](bytes, protoVersion).toByteString
)
"set correct protocol version depending on the proto version" in {
val messageV0 = VersionedMessageV0("Hey").toByteString
val expectedV0Deserialization = Message("Hey", 0, 0)(protocolVersionRepresentative(3), None)
Message
.fromByteString(ProtoVersion(0))(
.fromByteStringLegacy(ProtoVersion(0))(
messageV0
)
.value shouldBe expectedV0Deserialization
// Round trip serialization
Message
.fromByteString(ProtoVersion(0))(
.fromByteStringLegacy(ProtoVersion(0))(
expectedV0Deserialization.toByteString
)
.value shouldBe expectedV0Deserialization
@ -56,11 +60,11 @@ class HasProtocolVersionedWrapperTest extends AnyWordSpec with BaseTest {
val messageV1 = VersionedMessageV1("Hey", 42).toByteString
val expectedV1Deserialization =
Message("Hey", 42, 1.0)(protocolVersionRepresentative(5), None)
fromByteString(messageV1, 1) shouldBe expectedV1Deserialization
fromByteString(messageV1, 1, ProtocolVersion(5)).value shouldBe expectedV1Deserialization
// Round trip serialization
Message
.fromByteString(
.fromByteString(ProtocolVersion(5))(
expectedV1Deserialization.toByteString
)
.value shouldBe expectedV1Deserialization
@ -68,11 +72,11 @@ class HasProtocolVersionedWrapperTest extends AnyWordSpec with BaseTest {
val messageV2 = VersionedMessageV2("Hey", 42, 43.0).toByteString
val expectedV2Deserialization =
Message("Hey", 42, 43.0)(protocolVersionRepresentative(6), None)
fromByteString(messageV2, 2) shouldBe expectedV2Deserialization
fromByteString(messageV2, 2, ProtocolVersion(6)).value shouldBe expectedV2Deserialization
// Round trip serialization
Message
.fromByteString(
.fromByteString(ProtocolVersion(6))(
expectedV2Deserialization.toByteString
)
.value shouldBe expectedV2Deserialization
@ -87,8 +91,37 @@ class HasProtocolVersionedWrapperTest extends AnyWordSpec with BaseTest {
protocolVersionRepresentative(8).representative shouldBe ProtocolVersion(6)
}
"return the highest inclusive protocol representative for an unknown proto version" in {
protocolVersionRepresentative(-1).representative shouldBe ProtocolVersion(6)
}
"fail deserialization when the representative protocol version from the proto version does not match the expected (representative) protocol version" in {
val message = VersionedMessageV1("Hey", 42).toByteString
fromByteString(message, 1, ProtocolVersion(6)).left.value should have message
Message.unexpectedProtoVersionError(ProtocolVersion(6), ProtocolVersion(5)).message
}
"validate proto version against expected (representative) protocol version" in {
Message
.validateDeserialization(Some(ProtocolVersion(5)), ProtocolVersion(5))
.value shouldBe ()
Message
.validateDeserialization(Some(ProtocolVersion(6)), ProtocolVersion(5))
.left
.value should have message Message
.unexpectedProtoVersionError(ProtocolVersion(6), ProtocolVersion(5))
.message
Message
.validateDeserialization(
None, // skips expected protocol version check
ProtocolVersion(3),
)
.value shouldBe ()
}
"status consistency between protobuf messages and protocol versions" in {
new HasMemoizedProtocolVersionedWrapperCompanion[Message] {
import com.digitalasset.canton.version.HasProtocolVersionedWrapperTest.Message.*
val stablePV = ProtocolVersion.stable(10)

View File

@ -63,7 +63,16 @@ class ProtocolVersionTest extends AnyWordSpec with BaseTest {
"fail parsing version string with create" in {
ProtocolVersion.create(invalidProtocolVersionNumber.toString).left.value should be(
unsupportedErrorMessage(invalidProtocolVersion, includeDeleted = false)
unsupportedErrorMessage(invalidProtocolVersion)
)
}
"fail parsing version string considering also deleted protocol versions with create" in {
ProtocolVersion
.create(invalidProtocolVersionNumber.toString, allowDeleted = true)
.left
.value should be(
unsupportedErrorMessage(invalidProtocolVersion, includeDeleted = true)
)
}
@ -76,7 +85,7 @@ class ProtocolVersionTest extends AnyWordSpec with BaseTest {
"fail parsing version string with tryCreate" in {
the[RuntimeException] thrownBy {
ProtocolVersion.tryCreate(invalidProtocolVersionNumber.toString)
} should have message unsupportedErrorMessage(invalidProtocolVersion, includeDeleted = false)
} should have message unsupportedErrorMessage(invalidProtocolVersion)
}
"parse version string with fromProtoPrimitiveS" in {
@ -90,10 +99,7 @@ class ProtocolVersionTest extends AnyWordSpec with BaseTest {
"fail parsing version string with fromProtoPrimitiveS" in {
val result = ProtocolVersion.fromProtoPrimitiveS(invalidProtocolVersionNumber.toString)
result shouldBe a[ParsingResult[?]]
result.left.value should have message unsupportedErrorMessage(
invalidProtocolVersion,
includeDeleted = false,
)
result.left.value should have message unsupportedErrorMessage(invalidProtocolVersion)
}
"parse version string with fromProtoPrimitive" in {
@ -107,10 +113,7 @@ class ProtocolVersionTest extends AnyWordSpec with BaseTest {
"fail parsing version string fromProtoPrimitive" in {
val result = ProtocolVersion.fromProtoPrimitive(invalidProtocolVersionNumber)
result shouldBe a[ParsingResult[?]]
result.left.value should have message unsupportedErrorMessage(
invalidProtocolVersion,
includeDeleted = false,
)
result.left.value should have message unsupportedErrorMessage(invalidProtocolVersion)
}
}

View File

@ -8,6 +8,7 @@ package com.digitalasset.canton.domain.admin.v0;
import "com/digitalasset/canton/crypto/v0/crypto.proto";
import "com/digitalasset/canton/protocol/v0/sequencing.proto";
import "com/digitalasset/canton/protocol/v1/sequencing.proto";
import "com/digitalasset/canton/protocol/v2/domain_params.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
@ -24,6 +25,7 @@ message GetDomainParameters {
oneof parameters {
com.digitalasset.canton.protocol.v0.StaticDomainParameters parameters_v0 = 1;
com.digitalasset.canton.protocol.v1.StaticDomainParameters parameters_v1 = 2;
com.digitalasset.canton.protocol.v2.StaticDomainParameters parameters_v2 = 3;
}
}
}

View File

@ -24,7 +24,6 @@ import com.digitalasset.canton.domain.config.store.{
DomainNodeSettingsStore,
StoredDomainNodeSettings,
}
import com.digitalasset.canton.domain.governance.ParticipantAuditor
import com.digitalasset.canton.domain.initialization.*
import com.digitalasset.canton.domain.mediator.{
CommunityMediatorRuntimeFactory,
@ -471,19 +470,6 @@ class DomainNodeBootstrap(
loggerFactory,
)
auditLogger = ParticipantAuditor.factory(loggerFactory, config.auditLogging)
// add audit logging to the domain manager
_ = if (config.auditLogging) {
manager.addObserver(new DomainIdentityStateObserver {
override def willChangeTheParticipantState(
participant: ParticipantId,
attributes: ParticipantAttributes,
): Unit = {
auditLogger.info(s"Updating participant $participant to $attributes")
}
})
}
syncCrypto: DomainSyncCryptoClient = {
ips.add(topologyClient).discard
new SyncCryptoApiProvider(
@ -536,7 +522,6 @@ class DomainNodeBootstrap(
staticDomainParameters,
arguments.testingConfig,
parameters.processingTimeouts,
auditLogger,
agreementManager,
memberAuthFactory,
parameters,

View File

@ -43,6 +43,7 @@ class GrpcDomainService(
val response = staticDomainParameters.protoVersion.v match {
case 0 => Future.successful(Parameters.ParametersV0(staticDomainParameters.toProtoV0))
case 1 => Future.successful(Parameters.ParametersV1(staticDomainParameters.toProtoV1))
case 2 => Future.successful(Parameters.ParametersV2(staticDomainParameters.toProtoV2))
case unsupported =>
Future.failed(
new IllegalStateException(

View File

@ -16,7 +16,7 @@ import com.digitalasset.canton.crypto.CryptoFactory.{
import com.digitalasset.canton.crypto.*
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.protocol.DomainParameters.MaxRequestSize
import com.digitalasset.canton.protocol.StaticDomainParameters
import com.digitalasset.canton.protocol.{CatchUpConfig, StaticDomainParameters}
import com.digitalasset.canton.version.{DomainProtocolVersion, ProtocolVersion}
/** Configuration of domain parameters that all members connecting to a domain must adhere to.
@ -40,6 +40,7 @@ import com.digitalasset.canton.version.{DomainProtocolVersion, ProtocolVersion}
* @param protocolVersion The protocol version spoken on the domain. All participants and domain nodes attempting to connect to the sequencer need to support this protocol version to connect.
* @param dontWarnOnDeprecatedPV If true, then this domain will not emit a warning when configured to use a deprecated protocol version (such as 2.0.0).
* @param resetStoredStaticConfig DANGEROUS: If true, then the stored static configuration parameters will be reset to the ones in the configuration file
* @param catchUpParameters The optional catch up parameters of type [[com.digitalasset.canton.protocol.CatchUpConfig]]. If None is specified, then the catch-up mode is disabled.
*/
final case class DomainParametersConfig(
reconciliationInterval: PositiveDurationSeconds =
@ -56,6 +57,8 @@ final case class DomainParametersConfig(
protocolVersion: DomainProtocolVersion = DomainProtocolVersion(
ProtocolVersion.latest
),
catchUpParameters: Option[CatchUpConfig] =
StaticDomainParameters.defaultCatchUpParameters.defaultValue,
override val devVersionSupport: Boolean = false,
override val dontWarnOnDeprecatedPV: Boolean = false,
resetStoredStaticConfig: Boolean = false,
@ -76,6 +79,7 @@ final case class DomainParametersConfig(
param("devVersionSupport", _.devVersionSupport),
param("dontWarnOnDeprecatedPV", _.dontWarnOnDeprecatedPV),
param("resetStoredStaticConfig", _.resetStoredStaticConfig),
param("catchUpParameters", _.catchUpParameters),
)
override def initialProtocolVersion: ProtocolVersion = protocolVersion.version
@ -138,6 +142,7 @@ final case class DomainParametersConfig(
requiredHashAlgorithms = newRequiredHashAlgorithms,
requiredCryptoKeyFormats = newCryptoKeyFormats,
protocolVersion = protocolVersion.unwrap,
catchUpParameters = catchUpParameters,
)
}
}

View File

@ -1,23 +0,0 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.governance
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging, TracedLogger}
import org.slf4j.helpers.NOPLogger.NOP_LOGGER
class ParticipantAuditor(val loggerFactory: NamedLoggerFactory) extends NamedLogging {}
object ParticipantAuditor {
val noop: TracedLogger = TracedLogger(NOP_LOGGER)
def factory(loggerFactory: NamedLoggerFactory, enable: Boolean): TracedLogger =
if (enable) {
val ret = new ParticipantAuditor(loggerFactory).logger
ret
} else {
NamedLogging.noopLogger
}
}

View File

@ -16,6 +16,7 @@ import com.digitalasset.canton.data.{CantonTimestamp, ConfirmingParty, ViewType}
import com.digitalasset.canton.domain.mediator.store.MediatorState
import com.digitalasset.canton.error.MediatorError
import com.digitalasset.canton.lifecycle.{FlagCloseable, FutureUnlessShutdown, HasCloseContext}
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.logging.{ErrorLoggingContext, NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.protocol.messages.*
import com.digitalasset.canton.protocol.{v0, *}
@ -38,6 +39,34 @@ import org.slf4j.event.Level
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
/** small helper class to extract appropriate data for logging
*
* please once we rewrite the mediator event stage stuff, we should
* clean this up again.
*/
// TODO(#15627) remove me
private[mediator] final case class MediatorResultLog(
sender: ParticipantId,
ts: CantonTimestamp,
approved: Int = 0,
rejected: Seq[LocalReject] = Seq.empty,
)(val traceContext: TraceContext)
extends PrettyPrinting {
override def pretty: Pretty[MediatorResultLog] = prettyNode(
"ParticipantResponse",
param("sender", _.sender),
param("ts", _.ts),
param("approved", _.approved, _.approved > 0),
paramIfNonEmpty("rejected", _.rejected),
)
def extend(result: LocalVerdict): MediatorResultLog = result match {
case _: LocalApprove => copy(approved = approved + 1)(traceContext)
case reject: LocalReject => copy(rejected = rejected :+ reject)(traceContext)
}
}
/** Scalable service to check the received Stakeholder Trees and Confirmation Responses, derive a verdict and post
* result messages to stakeholders.
*/
@ -57,6 +86,22 @@ private[mediator] class ConfirmationResponseProcessor(
with FlagCloseable
with HasCloseContext {
private def extractEventsForLogging(
events: Seq[Traced[MediatorEvent]]
): Seq[MediatorResultLog] =
events
.collect { case tr @ Traced(MediatorEvent.Response(_, timestamp, response, _)) =>
(response.message.sender, timestamp, tr.traceContext, response.message.localVerdict)
}
.groupBy { case (sender, ts, traceContext, _) => (sender, ts, traceContext) }
.map { case ((sender, ts, traceContext), results) =>
results.foldLeft(MediatorResultLog(sender, ts)(traceContext)) {
case (acc, (_, _, _, result)) =>
acc.extend(result)
}
}
.toSeq
/** Handle events for a single request-id.
* Callers should ensure all events are for the same request and ordered by sequencer time.
*/
@ -67,6 +112,14 @@ private[mediator] class ConfirmationResponseProcessor(
): HandlerResult = {
val requestTs = requestId.unwrap
// // TODO(#15627) clean me up after removing the MediatorStageEvent stuff
if (logger.underlying.isInfoEnabled()) {
extractEventsForLogging(events).foreach { result =>
logger.info(
show"Phase 5: Received responses for request=${requestId}: $result"
)(result.traceContext)
}
}
val future = for {
// FIXME(i12903): do not block if requestId is far in the future
@ -135,7 +188,9 @@ private[mediator] class ConfirmationResponseProcessor(
implicit val traceContext: TraceContext = responseAggregation.requestTraceContext
logger
.debug(s"Request ${requestId}: Timeout in state ${responseAggregation.state} at $timestamp")
.info(
s"Phase 6: Request ${requestId}: Timeout in state ${responseAggregation.state} at $timestamp"
)
val timeout = responseAggregation.timeout(version = timestamp)
mediatorState
@ -192,9 +247,8 @@ private[mediator] class ConfirmationResponseProcessor(
_ <- mediatorState.add(aggregation)
} yield {
timeTracker.requestTick(participantResponseDeadline)
logger.debug(
show"$requestId: registered mediator request. Initial state: ${aggregation.showMergedState}"
logger.info(
show"Phase 2: Registered request=${requestId.unwrap} with ${request.informeesAndThresholdByViewHash.size} view(s). Initial state: ${aggregation.showMergedState}"
)
}
@ -621,7 +675,6 @@ private[mediator] class ConfirmationResponseProcessor(
OptionT.none[Future, Unit]
}
}
nextResponseAggregation <- OptionT(
responseAggregation.validateAndProgress(ts, response, snapshot.ipsSnapshot)
)
@ -658,6 +711,9 @@ private[mediator] class ConfirmationResponseProcessor(
)(implicit traceContext: TraceContext): Future[Unit] =
responseAggregation.asFinalized(protocolVersion) match {
case Some(finalizedResponse) =>
logger.info(
s"Phase 6: Finalized request=${finalizedResponse.requestId} with verdict ${finalizedResponse.verdict}"
)
verdictSender.sendResult(
finalizedResponse.requestId,
finalizedResponse.request,

View File

@ -31,6 +31,8 @@ import scala.concurrent.{ExecutionContext, Future}
*
* All mediator request related events that can be processed concurrently grouped by requestId.
*/
// TODO(#15627) we can simplify this quite a bit as we just need to push the events into the
// ConfirmationResponseProcessor pipelines instead of doing the complicated book keeping here.
private[mediator] final case class MediatorEventStage(
requests: NonEmpty[Map[RequestId, NonEmpty[Seq[Traced[MediatorEvent]]]]]
) {
@ -107,7 +109,6 @@ private[mediator] class MediatorEventsProcessor(
val identityF = identityClientEventHandler(Traced(events))
val envelopesByEvent = envelopesGroupedByEvent(events)
for {
deduplicatorResult <- FutureUnlessShutdown.outcomeF(
deduplicator.rejectDuplicates(envelopesByEvent)
@ -221,6 +222,7 @@ private[mediator] class MediatorEventsProcessor(
logger.error(s"Received messages with wrong domain ids: $wrongDomainIds")
},
)
(event, domainEnvelopes)
case DeliverError(_, _, _, _, SequencerErrors.TrafficCredit(_)) =>

View File

@ -78,7 +78,7 @@ final case class ResponseAggregation[VKEY](
ec: ExecutionContext,
): Future[Option[ResponseAggregation[VKEY]]] = {
val MediatorResponse(
_requestId,
requestId,
sender,
_viewHashO,
_viewPositionO,
@ -104,7 +104,7 @@ final case class ResponseAggregation[VKEY](
// - more exhaustive security alerts
// - avoid race conditions in security tests
statesOfViews <- OptionT.fromOption[Future](state.leftMap { s => // move down
loggingContext.debug(
loggingContext.info(
s"Request ${requestId.unwrap} has already been finalized with verdict $s before response $responseTimestamp from $sender with $localVerdict for view $viewKeyO arrives"
)
}.toOption)
@ -243,9 +243,6 @@ final case class ResponseAggregation[VKEY](
def withVersion(version: CantonTimestamp): ResponseAggregation[VKEY] =
copy(version = version)
def withRequestId(requestId: RequestId): ResponseAggregation[VKEY] =
copy(requestId = requestId)
def timeout(
version: CantonTimestamp
)(implicit loggingContext: NamedLoggingContext): ResponseAggregation[VKEY] = state match {
@ -304,11 +301,9 @@ object ResponseAggregation {
override def pretty: Pretty[ConsortiumVotingState] = {
prettyOfClass(
paramIfTrue("consortium party", _.threshold.value > 1),
paramIfTrue("non-consortium party", _.threshold.value == 1),
param("threshold", _.threshold, _.threshold.value > 1),
param("approved by participants", _.approvals),
param("rejected by participants", _.rejections),
param("consortium-threshold", _.threshold, _.threshold.value > 1),
paramIfNonEmpty("approved by participants", _.approvals),
paramIfNonEmpty("rejected by participants", _.rejections),
)
}
}

View File

@ -37,7 +37,7 @@ import com.digitalasset.canton.domain.service.grpc.GrpcDomainService
import com.digitalasset.canton.health.HealthListener
import com.digitalasset.canton.health.admin.data.{SequencerHealthStatus, TopologyQueueStatus}
import com.digitalasset.canton.lifecycle.{FlagCloseable, HasCloseContext, Lifecycle}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging, TracedLogger}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.protocol.DomainParametersLookup.SequencerDomainParameters
import com.digitalasset.canton.protocol.{DomainParametersLookup, StaticDomainParameters}
import com.digitalasset.canton.resource.Storage
@ -86,7 +86,6 @@ class SequencerRuntime(
topologyManagerStatusO: Option[TopologyManagerStatus],
storage: Storage,
clock: Clock,
auditLogger: TracedLogger,
authenticationConfig: SequencerAuthenticationConfig,
additionalAdminServiceFactory: Sequencer => Option[ServerServiceDefinition],
staticMembersToRegister: Seq[Member],
@ -169,7 +168,6 @@ class SequencerRuntime(
private val sequencerService = GrpcSequencerService(
sequencer,
metrics,
auditLogger,
authenticationConfig.check,
clock,
sequencerDomainParamsLookup,
@ -222,7 +220,6 @@ class SequencerRuntime(
// can still re-subscribe with the token just before we removed it
Traced.lift(sequencerService.disconnectMember(_)(_)),
isTopologyInitializedPromise.future,
auditLogger,
)
val sequencerAuthenticationService =

View File

@ -59,7 +59,6 @@ trait SequencerRuntimeFactory {
staticDomainParameters: StaticDomainParameters,
testingConfig: TestingConfigInternal,
processingTimeout: ProcessingTimeout,
auditLogger: TracedLogger,
agreementManager: Option[ServiceAgreementManager],
memberAuthenticationServiceFactory: MemberAuthenticationServiceFactory,
localParameters: CantonNodeWithSequencerParameters,
@ -96,7 +95,6 @@ object SequencerRuntimeFactory {
staticDomainParameters: StaticDomainParameters,
testingConfig: TestingConfigInternal,
processingTimeout: ProcessingTimeout,
auditLogger: TracedLogger,
agreementManager: Option[ServiceAgreementManager],
memberAuthenticationServiceFactory: MemberAuthenticationServiceFactory,
localParameters: CantonNodeWithSequencerParameters,
@ -166,7 +164,6 @@ object SequencerRuntimeFactory {
topologyManagerStatusO,
storage,
clock,
auditLogger,
SequencerAuthenticationConfig(
agreementManager,
domainConfig.publicApi.nonceExpirationTime,

View File

@ -150,7 +150,11 @@ object InitializeSequencerRequest
ProtoDeserializationError.FieldNotSet("topology_snapshot")
)
snapshotO <- Option
.when(!request.snapshot.isEmpty)(SequencerSnapshot.fromByteString(request.snapshot))
.when(!request.snapshot.isEmpty)(
SequencerSnapshot.fromByteString(domainParameters.protocolVersion)(
request.snapshot
)
)
.sequence
} yield InitializeSequencerRequest(
domainId,
@ -192,7 +196,9 @@ object InitializeSequencerRequestX {
)
snapshotO <- Option
.when(!request.snapshot.isEmpty)(
SequencerSnapshot.fromByteString(request.snapshot)
SequencerSnapshot.fromByteString(domainParameters.protocolVersion)(
request.snapshot
)
)
.sequence
} yield InitializeSequencerRequestX(

View File

@ -16,7 +16,7 @@ import com.digitalasset.canton.crypto.*
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.service.ServiceAgreementManager
import com.digitalasset.canton.lifecycle.{FlagCloseable, FutureUnlessShutdown, Lifecycle}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging, TracedLogger}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.resource.DbStorage.PassiveInstanceException
import com.digitalasset.canton.sequencing.authentication.MemberAuthentication.*
import com.digitalasset.canton.sequencing.authentication.grpc.AuthenticationTokenWithExpiry
@ -61,7 +61,6 @@ class MemberAuthenticationService(
isTopologyInitialized: Future[Unit],
override val timeouts: ProcessingTimeout,
val loggerFactory: NamedLoggerFactory,
auditLogger: TracedLogger,
)(implicit ec: ExecutionContext)
extends NamedLogging
with FlagCloseable {
@ -149,8 +148,8 @@ class MemberAuthenticationService(
storedToken = StoredAuthenticationToken(member, tokenExpiry, token)
_ <- handlePassiveInstanceException(tokenCache.saveToken(storedToken))
} yield {
auditLogger.info(
s"$member authenticated new token based on agreement $agreementId on $domain"
logger.info(
s"$member authenticated new token with expiry $tokenExpiry"
)
AuthenticationTokenWithExpiry(token, tokenExpiry)
}
@ -258,7 +257,6 @@ class MemberAuthenticationServiceOld(
isTopologyInitialized: Future[Unit],
timeouts: ProcessingTimeout,
loggerFactory: NamedLoggerFactory,
auditLogger: TracedLogger,
)(implicit ec: ExecutionContext)
extends MemberAuthenticationService(
domain,
@ -272,7 +270,6 @@ class MemberAuthenticationServiceOld(
isTopologyInitialized,
timeouts,
loggerFactory,
auditLogger,
)
with TopologyTransactionProcessingSubscriber {
@ -321,7 +318,6 @@ class MemberAuthenticationServiceX(
isTopologyInitialized: Future[Unit],
timeouts: ProcessingTimeout,
loggerFactory: NamedLoggerFactory,
auditLogger: TracedLogger,
)(implicit ec: ExecutionContext)
extends MemberAuthenticationService(
domain,
@ -335,7 +331,6 @@ class MemberAuthenticationServiceX(
isTopologyInitialized,
timeouts,
loggerFactory,
auditLogger,
)
with TopologyTransactionProcessingSubscriberX {
@ -384,7 +379,6 @@ trait MemberAuthenticationServiceFactory {
agreementManager: Option[ServiceAgreementManager],
invalidateMemberCallback: Traced[Member] => Unit,
isTopologyInitialized: Future[Unit],
auditLogger: TracedLogger,
)(implicit ec: ExecutionContext): MemberAuthenticationService
}
@ -405,7 +399,6 @@ object MemberAuthenticationServiceFactory {
agreementManager: Option[ServiceAgreementManager],
invalidateMemberCallback: Traced[Member] => Unit,
isTopologyInitialized: Future[Unit],
auditLogger: TracedLogger,
)(implicit ec: ExecutionContext): MemberAuthenticationService = {
val service = new MemberAuthenticationServiceOld(
domain,
@ -419,7 +412,6 @@ object MemberAuthenticationServiceFactory {
isTopologyInitialized,
timeouts,
loggerFactory,
auditLogger,
)
topologyTransactionProcessor.subscribe(service)
service
@ -442,7 +434,6 @@ object MemberAuthenticationServiceFactory {
agreementManager: Option[ServiceAgreementManager],
invalidateMemberCallback: Traced[Member] => Unit,
isTopologyInitialized: Future[Unit],
auditLogger: TracedLogger,
)(implicit ec: ExecutionContext): MemberAuthenticationService = {
val service = new MemberAuthenticationServiceX(
domain,
@ -456,7 +447,6 @@ object MemberAuthenticationServiceFactory {
isTopologyInitialized,
timeouts,
loggerFactory,
auditLogger,
)
topologyTransactionProcessorX.subscribe(service)
service

View File

@ -494,7 +494,9 @@ class SequencerReader(
val messageIdO =
Option(messageId).filter(_ => memberId == sender) // message id only goes to sender
val batch: Batch[ClosedEnvelope] = Batch
.fromByteString(payload.content)
.fromByteString(protocolVersion)(
payload.content
)
.fold(err => throw new DbDeserializationException(err.toString), identity)
val filteredBatch = Batch.filterClosedEnvelopesFor(batch, member, Set.empty)
Deliver.create[ClosedEnvelope](

View File

@ -13,8 +13,18 @@ import com.daml.nonempty.NonEmpty
import com.daml.nonempty.catsinstances.*
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.sequencing.sequencer.errors.SequencerError.{
ExceededMaxSequencingTime,
PayloadToEventTimeBoundExceeded,
}
import com.digitalasset.canton.domain.sequencing.sequencer.store.*
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging, TracedLogger}
import com.digitalasset.canton.error.BaseCantonError
import com.digitalasset.canton.logging.{
ErrorLoggingContext,
NamedLoggerFactory,
NamedLogging,
TracedLogger,
}
import com.digitalasset.canton.sequencing.protocol.{
SendAsyncError,
SequencerErrors,
@ -375,14 +385,14 @@ object SequenceWritesFlow {
): Option[Sequenced[PayloadId]] = {
def checkMaxSequencingTime(
event: Presequenced[StoreEvent[PayloadId]]
): Either[String, Presequenced[StoreEvent[PayloadId]]] =
): Either[BaseCantonError, Presequenced[StoreEvent[PayloadId]]] =
event.maxSequencingTimeO
.toLeft(event)
.leftFlatMap { maxSequencingTime =>
Either.cond(
timestamp <= maxSequencingTime,
event,
s"The sequencer time [$timestamp] has exceeded the max-sequencing-time of the send request [$maxSequencingTime]: ${event.event.description}",
ExceededMaxSequencingTime.Error(timestamp, maxSequencingTime, event.event.description),
)
}
@ -420,19 +430,24 @@ object SequenceWritesFlow {
def checkPayloadToEventMargin(
presequencedEvent: Presequenced[StoreEvent[PayloadId]]
): Either[String, Presequenced[StoreEvent[PayloadId]]] =
): Either[BaseCantonError, Presequenced[StoreEvent[PayloadId]]] =
presequencedEvent match {
// we only need to check deliver events for payloads
// the only reason why
case presequencedDeliver @ Presequenced(deliver: DeliverStoreEvent[PayloadId], _) =>
val payloadTs = deliver.payload.unwrap
val bound = writerConfig.payloadToEventMargin.asJava
val maxAllowableEventTime = payloadTs.add(bound)
val bound = writerConfig.payloadToEventMargin
val maxAllowableEventTime = payloadTs.add(bound.asJava)
Either
.cond(
timestamp <= maxAllowableEventTime,
presequencedDeliver,
s"The payload to event time bound [$bound] has been been exceeded by payload time [$payloadTs] and sequenced event time [$timestamp]",
PayloadToEventTimeBoundExceeded.Error(
bound.duration,
payloadTs,
sequencedTs = timestamp,
messageId = deliver.messageId,
),
)
case other =>
Right(other)
@ -445,7 +460,10 @@ object SequenceWritesFlow {
resultE match {
case Left(error) =>
logger.warn(error)(presequencedEvent.traceContext)
// log here as we don't have the trace context in the error itself
implicit val errorLoggingContext =
ErrorLoggingContext(logger, loggerFactory.properties, presequencedEvent.traceContext)
error.log()
None
case Right(event) =>
val checkedEvent = checkSigningTimestamp(event)

View File

@ -3,19 +3,23 @@
package com.digitalasset.canton.domain.sequencing.sequencer.errors
import com.daml.error.{ContextualizedErrorLogger, Explanation, Resolution}
import com.daml.error.{ContextualizedErrorLogger, ErrorCategory, ErrorCode, Explanation, Resolution}
import com.digitalasset.canton.ProtoDeserializationError
import com.digitalasset.canton.crypto.SignatureCheckError
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.error.CantonErrorGroups.SequencerErrorGroup
import com.digitalasset.canton.error.{Alarm, AlarmErrorCode, LogOnCreation}
import com.digitalasset.canton.error.{Alarm, AlarmErrorCode, BaseCantonError, LogOnCreation}
import com.digitalasset.canton.protocol.DomainParameters.MaxRequestSize
import com.digitalasset.canton.sequencing.protocol.{
AcknowledgeRequest,
MessageId,
SignedContent,
SubmissionRequest,
}
import com.digitalasset.canton.topology.Member
import com.digitalasset.canton.util.LoggerUtil
import scala.concurrent.duration.Duration
object SequencerError extends SequencerErrorGroup {
@ -173,4 +177,63 @@ object SequencerError extends SequencerErrorGroup {
)
with LogOnCreation
}
import scala.jdk.DurationConverters.*
// TODO(#15603) modify resolution once fixed
@Explanation("""
|This error indicates that a request was not sequenced because the sequencing time would exceed the
|max-sequencing-time of the request. This error usually happens if either a participant or mediator node is too
|slowly responding to requests, or if it is catching up after some downtime. In rare cases, it can happen
|if the sequencer nodes are massively overloaded.
|
|If it happens repeatedly, this information might indicate that there is a problem with the respective participant
|or mediator node.
|""")
@Resolution(
"""Inspect the time difference between sequenced and max-sequencing-time. If the time difference is large,
|then some remote node is catching up but sending messages during catch-up. If the difference is not too large,
|then the submitting node or this sequencer node might be overloaded."""
)
object ExceededMaxSequencingTime
extends ErrorCode(
"MAX_SEQUENCING_TIME_EXCEEDED",
ErrorCategory.ContentionOnSharedResources,
) {
override def exposedViaApi: Boolean = false
final case class Error(
ts: CantonTimestamp,
maxSequencingTime: CantonTimestamp,
message: String,
) extends BaseCantonError.Impl(
cause =
s"The sequencer time [$ts] has exceeded by ${LoggerUtil.roundDurationForHumans((ts - maxSequencingTime).toScala)} the max-sequencing-time of the send request [$maxSequencingTime]: $message"
)
}
@Explanation("""This warning indicates that the time difference between storing the payload and writing the"
|event exceeded the configured time bound, which resulted in the message to be discarded. This can happen
|during some failure event on the database which causes unexpected delay between these two database operations.
|(The two events need to be sufficiently close together to support pruning of payloads by timestamp).
|""")
@Resolution(
"""The submitting node will usually retry the command, but you should check the health of the
|sequencer node, in particular with respect to database processing."""
)
object PayloadToEventTimeBoundExceeded
extends ErrorCode(
"PAYLOAD_TO_EVENT_TIME_BOUND_EXCEEDED",
ErrorCategory.BackgroundProcessDegradationWarning,
) {
final case class Error(
bound: Duration,
payloadTs: CantonTimestamp,
sequencedTs: CantonTimestamp,
messageId: MessageId,
) extends BaseCantonError.Impl(
cause =
s"The payload to event time bound [$bound] has been been exceeded by payload time [$payloadTs] and sequenced event time [$sequencedTs]: $messageId"
)
}
}

View File

@ -262,7 +262,7 @@ object DeliverErrorStoreEvent {
Left(ProtoDeserializationError.FieldNotSet("error"))
)(serializedError =>
VersionedStatus
.fromByteString(serializedError)
.fromByteString(protocolVersion)(serializedError)
.map(_.status)
)

View File

@ -52,6 +52,7 @@ class GrpcSequencerConnectService(
val response = staticDomainParameters.protoVersion.v match {
case 0 => Future.successful(Parameters.ParametersV0(staticDomainParameters.toProtoV0))
case 1 => Future.successful(Parameters.ParametersV1(staticDomainParameters.toProtoV1))
case 2 => Future.successful(Parameters.ParametersV2(staticDomainParameters.toProtoV2))
case unsupported =>
Future.failed(
new IllegalStateException(

View File

@ -21,7 +21,7 @@ import com.digitalasset.canton.domain.sequencing.sequencer.errors.SequencerError
import com.digitalasset.canton.domain.sequencing.sequencer.{Sequencer, SequencerValidations}
import com.digitalasset.canton.domain.sequencing.service.GrpcSequencerService.*
import com.digitalasset.canton.lifecycle.FlagCloseable
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging, TracedLogger}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.protocol.DomainParameters.MaxRequestSize
import com.digitalasset.canton.protocol.DomainParametersLookup.SequencerDomainParameters
import com.digitalasset.canton.protocol.{DomainParametersLookup, v0 as protocolV0}
@ -104,7 +104,6 @@ object GrpcSequencerService {
def apply(
sequencer: Sequencer,
metrics: SequencerMetrics,
auditLogger: TracedLogger,
authenticationCheck: AuthenticationCheck,
clock: Clock,
domainParamsLookup: DomainParametersLookup[SequencerDomainParameters],
@ -118,7 +117,6 @@ object GrpcSequencerService {
sequencer,
metrics,
loggerFactory,
auditLogger,
authenticationCheck,
new SubscriptionPool[GrpcManagedSubscription[_]](
clock,
@ -147,7 +145,11 @@ object GrpcSequencerService {
type ValueClass
/** Tries to parse the proto class to the value class, erroring if the request exceeds the given limit. */
def parse(requestP: ProtoClass, maxRequestSize: MaxRequestSize): ParsingResult[ValueClass]
def parse(
requestP: ProtoClass,
maxRequestSize: MaxRequestSize,
protocolVersion: ProtocolVersion,
): ParsingResult[ValueClass]
/** Extract the [[SubmissionRequest]] from the value class */
def unwrap(request: ValueClass): SubmissionRequest
@ -165,6 +167,7 @@ object GrpcSequencerService {
override def parse(
requestP: protocolV0.SubmissionRequest,
maxRequestSize: MaxRequestSize,
protocolVersion: ProtocolVersion, // unused; because this parse implementation uses proto version 0 always
): ParsingResult[SubmissionRequest] =
SubmissionRequest.fromProtoV0(
requestP,
@ -186,13 +189,16 @@ object GrpcSequencerService {
override def parse(
requestP: protocolV0.SignedContent,
maxRequestSize: MaxRequestSize,
protocolVersion: ProtocolVersion,
): ParsingResult[SignedContent[SubmissionRequest]] =
SignedContent
.fromProtoV0(requestP)
.flatMap(
_.deserializeContent(
SubmissionRequest
.fromByteString(MaxRequestSizeToDeserialize.Limit(maxRequestSize.value))
.fromByteString(protocolVersion)(
MaxRequestSizeToDeserialize.Limit(maxRequestSize.value)
)
)
)
@ -211,12 +217,17 @@ object GrpcSequencerService {
override def parse(
requestP: v0.SendAsyncVersionedRequest,
maxRequestSize: MaxRequestSize,
protocolVersion: ProtocolVersion,
): ParsingResult[SignedContent[SubmissionRequest]] = {
for {
signedContent <- SignedContent.fromByteString(requestP.signedSubmissionRequest)
signedContent <- SignedContent.fromByteString(protocolVersion)(
requestP.signedSubmissionRequest
)
signedSubmissionRequest <- signedContent.deserializeContent(
SubmissionRequest
.fromByteString(MaxRequestSizeToDeserialize.Limit(maxRequestSize.value))
.fromByteString(protocolVersion)(
MaxRequestSizeToDeserialize.Limit(maxRequestSize.value)
)
)
} yield signedSubmissionRequest
}
@ -236,8 +247,11 @@ object GrpcSequencerService {
override def parse(
requestP: v0.SendAsyncUnauthenticatedVersionedRequest,
maxRequestSize: MaxRequestSize,
protocolVersion: ProtocolVersion,
): ParsingResult[SubmissionRequest] =
SubmissionRequest.fromByteString(MaxRequestSizeToDeserialize.Limit(maxRequestSize.value))(
SubmissionRequest.fromByteString(protocolVersion)(
MaxRequestSizeToDeserialize.Limit(maxRequestSize.value)
)(
requestP.submissionRequest
)
@ -271,7 +285,6 @@ class GrpcSequencerService(
sequencer: Sequencer,
metrics: SequencerMetrics,
protected val loggerFactory: NamedLoggerFactory,
auditLogger: TracedLogger,
authenticationCheck: AuthenticationCheck,
subscriptionPool: SubscriptionPool[GrpcManagedSubscription[_]],
directSequencerSubscriptionFactory: DirectSequencerSubscriptionFactory,
@ -400,7 +413,7 @@ class GrpcSequencerService(
maxRequestSize: MaxRequestSize
): Either[SendAsyncError, processing.ValueClass] = for {
request <- processing
.parse(proto, maxRequestSize)
.parse(proto, maxRequestSize, protocolVersion)
.leftMap(requestDeserializationError(_, maxRequestSize))
_ <- validateSubmissionRequest(
proto.serializedSize,
@ -419,8 +432,14 @@ class GrpcSequencerService(
_ <- processing.send(request, sequencer)
} yield ()
performUnlessClosingF(functionFullName)(sendET.value.map(toSendAsyncResponse))
performUnlessClosingF(functionFullName)(sendET.value.map { res =>
res.left.foreach { err =>
logger.info(s"Rejecting submission request by $senderFromMetadata with ${err}")
}
toSendAsyncResponse(res)
})
.onShutdown(SendAsyncResponse(error = Some(SendAsyncError.ShuttingDown())))
}
private def toSendAsyncResponse(result: Either[SendAsyncError, Unit]): SendAsyncResponse =
@ -495,7 +514,7 @@ class GrpcSequencerService(
_ = {
val envelopesCount = request.batch.envelopesCount
auditLogger.info(
logger.info(
s"'$sender' sends request with id '$messageId' of size $requestSize bytes with $envelopesCount envelopes."
)
}
@ -875,7 +894,9 @@ class GrpcSequencerService(
} else {
val acknowledgeRequestE = SignedContent
.fromProtoV0(request)
.flatMap(_.deserializeContent(AcknowledgeRequest.fromByteString))
.flatMap(
_.deserializeContent(AcknowledgeRequest.fromByteString(protocolVersion))
)
performAcknowledge(acknowledgeRequestE.map(SignedAcknowledgeRequest))
}
}
@ -920,11 +941,8 @@ class GrpcSequencerService(
observer: ServerCallStreamObserver[T],
toSubscriptionResponse: OrdinarySerializedEvent => T,
)(implicit traceContext: TraceContext): GrpcManagedSubscription[T] = {
member match {
case ParticipantId(uid) =>
auditLogger.info(s"$uid creates subscription from $counter")
case _ => ()
}
logger.info(s"$member subscribes from counter=$counter")
new GrpcManagedSubscription(
handler => directSequencerSubscriptionFactory.create(counter, "direct", member, handler),
observer,

View File

@ -7,7 +7,6 @@ import cats.implicits.*
import com.digitalasset.canton.BaseTest
import com.digitalasset.canton.config.DefaultProcessingTimeouts
import com.digitalasset.canton.crypto.{Nonce, Signature}
import com.digitalasset.canton.domain.governance.ParticipantAuditor
import com.digitalasset.canton.sequencing.authentication.MemberAuthentication.{
MissingToken,
NonMatchingDomainId,
@ -53,7 +52,6 @@ class MemberAuthenticationServiceTest extends AsyncWordSpec with BaseTest {
Future.unit,
DefaultProcessingTimeouts.testing,
loggerFactory,
ParticipantAuditor.noop,
) {
override def isParticipantActive(participant: ParticipantId)(implicit
traceContext: TraceContext

View File

@ -10,7 +10,6 @@ import com.digitalasset.canton.config.RequireTypes.Port
import com.digitalasset.canton.crypto.provider.symbolic.SymbolicPureCrypto
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.api.v0.{Hello, HelloServiceGrpc}
import com.digitalasset.canton.domain.governance.ParticipantAuditor
import com.digitalasset.canton.domain.sequencing.authentication.*
import com.digitalasset.canton.networking.Endpoint
import com.digitalasset.canton.sequencing.authentication.grpc.{
@ -76,7 +75,6 @@ class SequencerAuthenticationServerInterceptorTest
Future.unit,
DefaultProcessingTimeouts.testing,
loggerFactory,
ParticipantAuditor.noop,
) {
override protected def isParticipantActive(participant: ParticipantId)(implicit
traceContext: TraceContext

View File

@ -9,10 +9,11 @@ import com.digitalasset.canton.*
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.crypto.{DomainSyncCryptoClient, HashPurpose, Signature}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.sequencing.sequencer.errors.SequencerError.ExceededMaxSequencingTime
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer as CantonSequencer
import com.digitalasset.canton.lifecycle.Lifecycle
import com.digitalasset.canton.logging.NamedLogging
import com.digitalasset.canton.logging.pretty.Pretty
import com.digitalasset.canton.logging.{NamedLogging, SuppressionRule}
import com.digitalasset.canton.sequencing.OrdinarySerializedEvent
import com.digitalasset.canton.sequencing.protocol.SendAsyncError.RequestInvalid
import com.digitalasset.canton.sequencing.protocol.*
@ -28,6 +29,7 @@ import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.Sink
import org.scalatest.wordspec.FixtureAsyncWordSpec
import org.scalatest.{Assertion, FutureOutcome}
import org.slf4j.event.Level
import java.time.Duration
import java.util.UUID
@ -138,7 +140,7 @@ abstract class SequencerApiTest
"Register topology client"
)
_ <- valueOrFail(sequencer.registerMember(sender))("Register mediator")
messages <- loggerFactory.assertLogs(
messages <- loggerFactory.assertLogsSeq(SuppressionRule.LevelAndAbove(Level.INFO))(
valueOrFail(sequencer.sendAsync(request))("Sent async")
.flatMap(_ =>
readForMembers(
@ -147,9 +149,13 @@ abstract class SequencerApiTest
timeout = 5.seconds, // We don't need the full timeout here
)
),
entry => {
entry.warningMessage should include("has exceeded the max-sequencing-time")
entry.warningMessage should include(suppressedMessageContent)
forAll(_) { entry =>
entry.message should include(
suppressedMessageContent
) // block update generator will log every send
entry.message should (include(ExceededMaxSequencingTime.id) or include(
"Observed Send"
))
},
)
} yield {
@ -182,12 +188,16 @@ abstract class SequencerApiTest
)
_ <- valueOrFail(sequencer.registerMember(sender))("Register mediator")
_ <- valueOrFail(sequencer.sendAsync(request1))("Sent async #1")
messages <- loggerFactory.assertLogs(
messages <- loggerFactory.assertLogsSeq(SuppressionRule.LevelAndAbove(Level.INFO))(
valueOrFail(sequencer.sendAsync(request2))("Sent async #2")
.flatMap(_ => readForMembers(List(sender), sequencer)),
entry => {
entry.warningMessage should include("has exceeded the max-sequencing-time")
entry.warningMessage should include(suppressedMessageContent)
forAll(_) { entry =>
// block update generator will log every send
entry.message should ((include(ExceededMaxSequencingTime.id) or include(
"Observed Send"
) and include(
suppressedMessageContent
)) or (include("Observed Send") and include(normalMessageContent)))
},
)
} yield {

View File

@ -9,6 +9,7 @@ import com.daml.nonempty.{NonEmpty, NonEmptyUtil}
import com.digitalasset.canton.config.CantonRequireTypes.String256M
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.sequencing.sequencer.errors.SequencerError.PayloadToEventTimeBoundExceeded
import com.digitalasset.canton.domain.sequencing.sequencer.store.*
import com.digitalasset.canton.lifecycle.{
AsyncCloseable,
@ -197,7 +198,7 @@ class SequencerWriterSourceTest extends AsyncWordSpec with BaseTest with HasExec
offerDeliverOrFail(Presequenced.alwaysValid(deliver1))
completeFlow()
},
_.warningMessage shouldBe "The payload to event time bound [PT1M] has been been exceeded by payload time [1970-01-01T00:00:00.000001Z] and sequenced event time [1970-01-01T00:01:11Z]",
_.shouldBeCantonErrorCode(PayloadToEventTimeBoundExceeded),
)
events <- store.readEvents(aliceId)
} yield events.payloads should have size (0)
@ -229,14 +230,12 @@ class SequencerWriterSourceTest extends AsyncWordSpec with BaseTest with HasExec
payload2,
None,
)
_ <- loggerFactory.assertLogs(
{
offerDeliverOrFail(Presequenced.withMaxSequencingTime(deliver1, beforeNow))
offerDeliverOrFail(Presequenced.withMaxSequencingTime(deliver2, longAfterNow))
completeFlow()
},
_.warningMessage should include regex "The sequencer time \\[.*\\] has exceeded the max-sequencing-time of the send request \\[1970-01-01T00:00:05Z\\]: deliver\\[message-id:1\\]",
)
_ <- {
offerDeliverOrFail(Presequenced.withMaxSequencingTime(deliver1, beforeNow))
offerDeliverOrFail(Presequenced.withMaxSequencingTime(deliver2, longAfterNow))
completeFlow()
}
events <- store.readEvents(aliceId)
} yield {
events.payloads should have size (1)

View File

@ -20,7 +20,6 @@ import com.digitalasset.canton.crypto.{HashPurpose, Nonce}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.api.v0
import com.digitalasset.canton.domain.api.v0.SequencerAuthenticationServiceGrpc.SequencerAuthenticationService
import com.digitalasset.canton.domain.governance.ParticipantAuditor
import com.digitalasset.canton.domain.metrics.DomainTestMetrics
import com.digitalasset.canton.domain.sequencing.SequencerParameters
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer
@ -164,7 +163,6 @@ final case class Env(loggerFactory: NamedLoggerFactory)(implicit
sequencer,
DomainTestMetrics.sequencer,
loggerFactory,
ParticipantAuditor.noop,
authenticationCheck,
new SubscriptionPool[GrpcManagedSubscription[_]](
clock,

View File

@ -14,7 +14,6 @@ import com.digitalasset.canton.crypto.provider.symbolic.SymbolicPureCrypto
import com.digitalasset.canton.crypto.{DomainSyncCryptoClient, Signature}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.api.v0
import com.digitalasset.canton.domain.governance.ParticipantAuditor
import com.digitalasset.canton.domain.metrics.DomainTestMetrics
import com.digitalasset.canton.domain.sequencing.SequencerParameters
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer
@ -197,7 +196,6 @@ class GrpcSequencerServiceTest
sequencer,
DomainTestMetrics.sequencer,
loggerFactory,
ParticipantAuditor.noop,
new AuthenticationCheck.MatchesAuthenticatedMember {
override def lookupCurrentMember(): Option[Member] = member.some
},

View File

@ -6,15 +6,18 @@ package com.digitalasset.canton.console
import scala.collection.mutable
import scala.sys.process.ProcessLogger
@SuppressWarnings(Array("com.digitalasset.canton.RequireBlocking"))
class BufferedProcessLogger extends ProcessLogger {
private val buffer = mutable.Buffer[String]()
override def out(s: => String): Unit = buffer.append(s)
override def err(s: => String): Unit = buffer.append(s)
override def out(s: => String): Unit = synchronized(buffer.append(s))
override def err(s: => String): Unit = synchronized(buffer.append(s))
override def buffer[T](f: => T): T = f
/** Output the buffered content to a String applying an optional line prefix.
*/
def output(linePrefix: String = ""): String =
def output(linePrefix: String = ""): String = synchronized(
buffer.map(l => s"$linePrefix$l").mkString(System.lineSeparator)
)
}

View File

@ -13,6 +13,7 @@ import com.daml.logging.entries.LoggingValue.OfString
import com.daml.logging.entries.{LoggingValue, ToLoggingValue}
import com.digitalasset.canton.ledger.api.DeduplicationPeriod
import com.digitalasset.canton.ledger.configuration.Configuration
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.topology.DomainId
import scalaz.syntax.tag.*
import scalaz.{@@, Tag}
@ -87,7 +88,26 @@ final case class Commands(
commands: LfCommands,
disclosedContracts: ImmArray[DisclosedContract],
domainId: Option[DomainId] = None,
)
) extends PrettyPrinting {
override def pretty: Pretty[Commands] = {
import com.digitalasset.canton.logging.pretty.PrettyInstances.*
prettyOfClass(
param("commandId", _.commandId.unwrap),
paramIfDefined("submissionId", _.submissionId.map(_.unwrap)),
param("applicationId", _.applicationId),
param("actAs", _.actAs),
paramIfNonEmpty("readAs", _.readAs),
param("submittedAt", _.submittedAt),
param("ledgerEffectiveTime", _.commands.ledgerEffectiveTime),
param("deduplicationPeriod", _.deduplicationPeriod),
paramIfDefined("workflowId", _.workflowId.filter(_ != commandId).map(_.unwrap)),
paramIfDefined("domainId", _.domainId),
indicateOmittedFields,
)
}
}
sealed trait DisclosedContract extends Product with Serializable {
def templateId: Ref.TypeConName

View File

@ -5,11 +5,46 @@ package com.digitalasset.canton.ledger.api.messages.event
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.lf.ledger.EventId
import com.daml.lf.value.Value
sealed trait KeyContinuationToken extends Product with Serializable
object KeyContinuationToken {
final case object NoToken extends KeyContinuationToken
final case class EndExclusiveSeqIdToken(seqId: Long) extends KeyContinuationToken
final case class EndExclusiveEventIdToken(eventId: EventId) extends KeyContinuationToken
private val sequenceTokenPrefix = "s:"
private val eventTokenPrefix = "e:"
private val prefixLength = 2
def toTokenString(token: KeyContinuationToken): String = token match {
case NoToken => ""
case EndExclusiveSeqIdToken(seqId) => s"$sequenceTokenPrefix$seqId"
case EndExclusiveEventIdToken(eventId) => s"$eventTokenPrefix${eventId.toLedgerString}"
}
def fromTokenString(s: String): Either[String, KeyContinuationToken] =
s.splitAt(prefixLength) match {
case ("", "") => Right(NoToken)
case (`sequenceTokenPrefix`, oth) =>
oth.toLongOption
.toRight(s"Unable to parse $s into a sequence token")
.map(EndExclusiveSeqIdToken)
case (`eventTokenPrefix`, eventId) =>
EventId
.fromString(eventId)
.left
.map(errStr => s"Failed to parse $eventId into an event-id: $errStr")
.map(EndExclusiveEventIdToken)
case _ => Left(s"Unable to parse '$s' into token string")
}
}
final case class GetEventsByContractKeyRequest(
contractKey: Value,
templateId: Ref.Identifier,
requestingParties: Set[Party],
endExclusiveSeqId: Option[Long],
keyContinuationToken: KeyContinuationToken,
)

View File

@ -9,6 +9,8 @@ import com.daml.ledger.api.v1.event_query_service.{
GetEventsByContractKeyRequest,
}
import com.digitalasset.canton.ledger.api.messages.event
import com.digitalasset.canton.ledger.api.messages.event.KeyContinuationToken
import com.digitalasset.canton.ledger.api.validation.ValidationErrors.invalidField
import io.grpc.StatusRuntimeException
object EventQueryServiceRequestValidator {
@ -50,18 +52,16 @@ class EventQueryServiceRequestValidator(partyNameChecker: PartyNameChecker) {
templateId <- FieldValidator.validateIdentifier(apiTemplateId)
_ <- requireNonEmpty(req.requestingParties, "requesting_parties")
requestingParties <- partyValidator.requireKnownParties(req.requestingParties)
endExclusiveSeqId <- optionalEventSequentialId(
req.continuationToken,
"continuation_token",
"Invalid token", // Don't mention event sequential id as opaque
)
token <- KeyContinuationToken.fromTokenString(req.continuationToken).left.map { err =>
invalidField("continuation_token", err)
}
} yield {
event.GetEventsByContractKeyRequest(
contractKey = contractKey,
templateId = templateId,
requestingParties = requestingParties,
endExclusiveSeqId = endExclusiveSeqId,
keyContinuationToken = token,
)
}

View File

@ -8,6 +8,7 @@ import com.daml.ledger.api.v2.event_query_service.GetEventsByContractIdResponse
import com.daml.lf.data.Ref
import com.daml.lf.value.Value
import com.daml.lf.value.Value.ContractId
import com.digitalasset.canton.ledger.api.messages.event.KeyContinuationToken
import com.digitalasset.canton.logging.LoggingContextWithTrace
import scala.concurrent.Future
@ -26,7 +27,7 @@ trait IndexEventQueryService extends LedgerEndService {
contractKey: Value,
templateId: Ref.Identifier,
requestingParties: Set[Ref.Party],
endExclusiveSeqId: Option[Long],
keyContinuationToken: KeyContinuationToken,
)(implicit loggingContext: LoggingContextWithTrace): Future[GetEventsByContractKeyResponse]
}

View File

@ -56,6 +56,7 @@ final case class CompletionInfo(
param("applicationId", _.applicationId),
paramIfDefined("deduplication period", _.optDeduplicationPeriod),
param("submissionId", _.submissionId),
indicateOmittedFields,
)
}

View File

@ -51,6 +51,7 @@ final case class TransactionMeta(
param("ledgerEffectiveTime", _.ledgerEffectiveTime),
paramIfDefined("workflowId", _.workflowId),
param("submissionTime", _.submissionTime),
customParam(_ => "..."),
paramIfDefined("domainId", _.optDomainId),
indicateOmittedFields,
)
}

View File

@ -12,7 +12,10 @@ import com.daml.lf.value.Value
import com.daml.logging.entries.{LoggingEntry, LoggingValue, ToLoggingValue}
import com.digitalasset.canton.ledger.api.DeduplicationPeriod
import com.digitalasset.canton.ledger.configuration.Configuration
import com.digitalasset.canton.logging.pretty.PrettyInstances.*
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.topology.DomainId
import com.digitalasset.canton.util.ShowUtil.*
import com.google.rpc.status.Status as RpcStatus
import java.time.Duration
@ -25,10 +28,7 @@ import java.time.Duration
* We describe the possible updates in the comments of
* each of the case classes implementing [[Update]].
*/
sealed trait Update extends Product with Serializable {
/** Short human-readable one-line description summarizing the state updates content. */
def description: String
sealed trait Update extends Product with Serializable with PrettyPrinting {
/** The record time at which the state change was committed. */
def recordTime: Timestamp
@ -43,8 +43,14 @@ object Update {
participantId: Ref.ParticipantId,
newConfiguration: Configuration,
) extends Update {
override def description: String =
s"Configuration change '$submissionId' from participant '$participantId' accepted with configuration: $newConfiguration"
override def pretty: Pretty[ConfigurationChanged] =
prettyOfClass(
param("recordTime", _.recordTime),
param("configuration", _.newConfiguration),
indicateOmittedFields,
)
}
object ConfigurationChanged {
@ -68,9 +74,14 @@ object Update {
proposedConfiguration: Configuration,
rejectionReason: String,
) extends Update {
override def description: String = {
s"Configuration change '$submissionId' from participant '$participantId' was rejected: $rejectionReason"
}
override def pretty: Pretty[ConfigurationChangeRejected] =
prettyOfClass(
param("recordTime", _.recordTime),
param("configuration", _.proposedConfiguration),
param("rejectionReason", _.rejectionReason.singleQuoted),
indicateOmittedFields,
)
}
object ConfigurationChangeRejected {
@ -114,8 +125,14 @@ object Update {
recordTime: Timestamp,
submissionId: Option[Ref.SubmissionId],
) extends Update {
override def description: String =
s"Add party '$party' to participant"
override def pretty: Pretty[PartyAddedToParticipant] =
prettyOfClass(
param("recordTime", _.recordTime),
param("party", _.party),
param("displayName", _.displayName.singleQuoted),
param("participantId", _.participantId),
indicateOmittedFields,
)
}
object PartyAddedToParticipant {
@ -146,8 +163,12 @@ object Update {
recordTime: Timestamp,
rejectionReason: String,
) extends Update {
override val description: String =
s"Request to add party to participant with submissionId '$submissionId' failed"
override def pretty: Pretty[PartyAllocationRejected] =
prettyOfClass(
param("recordTime", _.recordTime),
param("participantId", _.participantId),
param("rejectionReason", _.rejectionReason.singleQuoted),
)
}
object PartyAllocationRejected {
@ -178,8 +199,13 @@ object Update {
recordTime: Timestamp,
submissionId: Option[Ref.SubmissionId],
) extends Update {
override def description: String =
s"Public package upload: ${archives.map(_.getHash).mkString(", ")}"
override def pretty: Pretty[PublicPackageUpload] =
prettyOfClass(
param("recordTime", _.recordTime),
param("archives", _.archives.map(_.getHash.readableHash)),
paramIfDefined("sourceDescription", _.sourceDescription.map(_.singleQuoted)),
)
}
object PublicPackageUpload {
@ -205,8 +231,11 @@ object Update {
recordTime: Timestamp,
rejectionReason: String,
) extends Update {
override def description: String =
s"Public package upload rejected, correlationId=$submissionId reason='$rejectionReason'"
override def pretty: Pretty[PublicPackageUploadRejected] =
prettyOfClass(
param("recordTime", _.recordTime),
param("rejectionReason", _.rejectionReason.singleQuoted),
)
}
object PublicPackageUploadRejected {
@ -265,7 +294,17 @@ object Update {
hostedWitnesses: List[Ref.Party],
contractMetadata: Map[Value.ContractId, Bytes],
) extends Update {
override def description: String = s"Accept transaction $transactionId"
override def pretty: Pretty[TransactionAccepted] =
prettyOfClass(
param("recordTime", _.recordTime),
param("transactionId", _.transactionId),
param("transactionMeta", _.transactionMeta),
paramIfDefined("completion", _.completionInfoO),
param("nodes", _.transaction.nodes.size),
param("roots", _.transaction.roots.length),
indicateOmittedFields,
)
}
object TransactionAccepted {
@ -312,7 +351,16 @@ object Update {
reassignmentInfo: ReassignmentInfo,
reassignment: Reassignment,
) extends Update {
override def description: String = s"Accept reassignment $updateId"
override def pretty: Pretty[ReassignmentAccepted] =
prettyOfClass(
param("recordTime", _.recordTime),
param("updateId", _.updateId),
paramIfDefined("completion", _.optCompletionInfo),
param("source", _.reassignmentInfo.sourceDomain),
param("target", _.reassignmentInfo.targetDomain),
indicateOmittedFields,
)
}
object ReassignmentAccepted {
@ -349,9 +397,14 @@ object Update {
DomainId
], // TODO(#13173) None for backwards compatibility, expected to be set for X nodes
) extends Update {
override def description: String =
s"Reject command ${completionInfo.commandId}${if (definiteAnswer)
" (definite answer)"}: ${reasonTemplate.message}"
override def pretty: Pretty[CommandRejected] =
prettyOfClass(
param("recordTime", _.recordTime),
param("completion", _.completionInfo),
paramIfTrue("definiteAnswer", _.definiteAnswer),
param("reason", _.reasonTemplate.message.singleQuoted),
paramIfDefined("domainId", _.domainId),
)
/** If true, the [[ReadService]]'s deduplication guarantees apply to this rejection.
* The participant state implementations should strive to set this flag to true as often as

View File

@ -68,18 +68,19 @@ class DispatcherState(
})
def stopDispatcher(): Future[Unit] = blocking(synchronized {
logger.info(s"Stopping active $ServiceName.")
dispatcherStateRef match {
case DispatcherNotRunning | DispatcherStateShutdown =>
logger.info(s"$ServiceName already stopped or shutdown.")
logger.debug(s"$ServiceName already stopped, shutdown or never started.")
Future.unit
case DispatcherRunning(dispatcher) =>
logger.info(s"Stopping active $ServiceName.")
dispatcherStateRef = DispatcherNotRunning
dispatcher
.cancel(() => dispatcherNotRunning)
.transform {
case Success(_) =>
logger.info(s"Active $ServiceName stopped.")
logger.debug(s"Active $ServiceName stopped.")
Success(())
case f @ Failure(failure) =>
logger.warn(s"Failed stopping active $ServiceName", failure)

View File

@ -10,6 +10,7 @@ import com.digitalasset.canton.platform.apiserver.services.tracking.SubmissionTr
import com.digitalasset.canton.platform.store.backend.ParameterStorageBackend.LedgerEnd
import com.digitalasset.canton.platform.store.cache.{
ContractStateCaches,
EventsByContractKeyCache,
InMemoryFanoutBuffer,
MutableLedgerEndCache,
}
@ -29,6 +30,7 @@ import scala.util.chaining.*
private[platform] class InMemoryState(
val ledgerEndCache: MutableLedgerEndCache,
val contractStateCaches: ContractStateCaches,
val eventsByContractKeyCache: Option[EventsByContractKeyCache],
val inMemoryFanoutBuffer: InMemoryFanoutBuffer,
val stringInterningView: StringInterningView,
val dispatcherState: DispatcherState,
@ -63,6 +65,7 @@ private[platform] class InMemoryState(
// Reset the Ledger API caches to the latest ledger end
_ <- Future {
contractStateCaches.reset(ledgerEnd.lastOffset)
eventsByContractKeyCache.foreach(_.flush())
inMemoryFanoutBuffer.flush()
ledgerEndCache.set(ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId)
submissionTracker.close()
@ -79,6 +82,7 @@ object InMemoryState {
bufferedStreamsPageSize: Int,
maxContractStateCacheSize: Long,
maxContractKeyStateCacheSize: Long,
maxEventsByContractKeyCacheSize: Option[Int],
maxTransactionsInMemoryFanOutBufferSize: Int,
maxCommandsInFlight: Int,
metrics: Metrics,
@ -96,6 +100,8 @@ object InMemoryState {
tracer,
loggerFactory,
)
_ = if (maxEventsByContractKeyCacheSize.isDefined)
loggerFactory.getLogger(getClass).debug("Using EventsByContractKey cache")
} yield new InMemoryState(
ledgerEndCache = MutableLedgerEndCache()
.tap(
@ -109,6 +115,9 @@ object InMemoryState {
metrics,
loggerFactory,
)(executionContext),
eventsByContractKeyCache = maxEventsByContractKeyCacheSize.map(cacheSize =>
EventsByContractKeyCache.build(metrics, cacheSize.toLong)
),
inMemoryFanoutBuffer = new InMemoryFanoutBuffer(
maxBufferSize = maxTransactionsInMemoryFanOutBufferSize,
metrics = metrics,

View File

@ -22,6 +22,7 @@ object LedgerApiServer {
tracer: Tracer,
loggerFactory: NamedLoggerFactory,
multiDomainEnabled: Boolean,
maxEventsByContractKeyCacheSize: Option[Int],
)(implicit
traceContext: TraceContext
): ResourceOwner[(InMemoryState, InMemoryStateUpdater.UpdaterFlow)] = {
@ -31,6 +32,7 @@ object LedgerApiServer {
bufferedStreamsPageSize = indexServiceConfig.bufferedStreamsPageSize,
maxContractStateCacheSize = indexServiceConfig.maxContractStateCacheSize,
maxContractKeyStateCacheSize = indexServiceConfig.maxContractKeyStateCacheSize,
maxEventsByContractKeyCacheSize = maxEventsByContractKeyCacheSize,
maxTransactionsInMemoryFanOutBufferSize =
indexServiceConfig.maxTransactionsInMemoryFanOutBufferSize,
executionContext = executionContext,

View File

@ -30,6 +30,7 @@ import com.digitalasset.canton.ledger.api.domain.{
TransactionId,
}
import com.digitalasset.canton.ledger.api.health.HealthStatus
import com.digitalasset.canton.ledger.api.messages.event.KeyContinuationToken
import com.digitalasset.canton.ledger.configuration.Configuration
import com.digitalasset.canton.ledger.offset.Offset
import com.digitalasset.canton.ledger.participant.state.index.v2
@ -261,7 +262,7 @@ final class TimedIndexService(delegate: IndexService, metrics: Metrics) extends
contractKey: Value,
templateId: Ref.Identifier,
requestingParties: Set[Ref.Party],
endExclusiveSeqId: Option[Long],
keyContinuationToken: KeyContinuationToken,
)(implicit loggingContext: LoggingContextWithTrace): Future[GetEventsByContractKeyResponse] =
Timed.future(
metrics.daml.services.index.getEventsByContractKey,
@ -269,7 +270,7 @@ final class TimedIndexService(delegate: IndexService, metrics: Metrics) extends
contractKey,
templateId,
requestingParties,
endExclusiveSeqId,
keyContinuationToken,
),
)

View File

@ -5,6 +5,7 @@ package com.digitalasset.canton.platform.apiserver.services.command
import com.daml.error.ContextualizedErrorLogger
import com.daml.error.ErrorCode.LoggedApiException
import com.daml.lf.command.ApiCommand
import com.daml.lf.crypto
import com.daml.scalautil.future.FutureConversion.CompletionStageConversionOps
import com.daml.timer.Delayed
@ -45,6 +46,7 @@ import com.digitalasset.canton.platform.apiserver.services.{
}
import com.digitalasset.canton.platform.services.time.TimeProviderType
import com.digitalasset.canton.tracing.{Spanning, TraceContext}
import com.digitalasset.canton.util.ShowUtil.*
import io.opentelemetry.api.trace.Tracer
import java.time.{Duration, Instant}
@ -118,9 +120,22 @@ private[apiserver] final class CommandSubmissionServiceImpl private[services] (
): Future[Unit] =
withEnrichedLoggingContext(logging.commands(request.commands)) { implicit loggingContext =>
logger.info(
s"Submitting commands for interpretation, ${loggingContext.serializeFiltered("commands")}."
show"Phase 1 started: Submitting commands for interpretation: ${request.commands}."
)
logger.trace(s"Commands: ${request.commands.commands.commands}.")
val cmds = request.commands.commands.commands
logger.debug(show"Submitted commands are: ${if (cmds.length > 1) "\n " else ""}${cmds
.map {
case ApiCommand.Create(templateId, _) => s"create ${templateId.qualifiedName}"
case ApiCommand.Exercise(templateId, _, choiceId, _) =>
s"exercise @${templateId.qualifiedName} ${choiceId}"
case ApiCommand.ExerciseByKey(templateId, _, choiceId, _) =>
s"exerciseByKey @${templateId.qualifiedName} $choiceId"
case ApiCommand.CreateAndExercise(templateId, _, choiceId, _) =>
s"createAndExercise ${templateId.qualifiedName} ... $choiceId ..."
}
.map(_.singleQuoted)
.toSeq
.mkString("\n ")}")
implicit val errorLoggingContext: ContextualizedErrorLogger =
ErrorLoggingContext.fromOption(
@ -150,7 +165,7 @@ private[apiserver] final class CommandSubmissionServiceImpl private[services] (
import state.SubmissionResult.*
result match {
case Success(Acknowledged) =>
logger.debug("Success")
logger.debug("Submission acknowledged by sync-service.")
Success(())
case Success(result: SynchronousError) =>
@ -233,7 +248,7 @@ private[apiserver] final class CommandSubmissionServiceImpl private[services] (
loggingContext: LoggingContextWithTrace
): Future[state.SubmissionResult] = {
metrics.daml.commands.validSubmissions.mark()
logger.debug("Submitting transaction to ledger.")
logger.trace("Submitting transaction to ledger.")
writeService
.submitTransaction(
result.submitterInfo,

Some files were not shown because too many files have changed in this diff Show More