update canton to 20240320.12940.v147a56d2 (#18803)

tell-slack: canton

Co-authored-by: Azure Pipelines Daml Build <support@digitalasset.com>
This commit is contained in:
azure-pipelines[bot] 2024-03-21 10:08:38 +01:00 committed by GitHub
parent 99c6b363f4
commit c1f0a3c42e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
90 changed files with 1090 additions and 414 deletions

View File

@ -0,0 +1,97 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.admin.api.client.commands
import com.digitalasset.canton.domain.sequencing.sequencer.block.bftordering.admin.EnterpriseSequencerBftAdminData.{
PeerNetworkStatus,
endpointToProto,
}
import com.digitalasset.canton.networking.Endpoint
import com.digitalasset.canton.sequencer.admin.v30
import com.digitalasset.canton.sequencer.admin.v30.*
import io.grpc.ManagedChannel
import scala.concurrent.Future
object EnterpriseSequencerBftAdminCommands {
abstract class BaseSequencerBftAdministrationCommand[Req, Rep, Res]
extends GrpcAdminCommand[Req, Rep, Res] {
override type Svc =
v30.SequencerBftAdministrationServiceGrpc.SequencerBftAdministrationServiceStub
override def createService(
channel: ManagedChannel
): v30.SequencerBftAdministrationServiceGrpc.SequencerBftAdministrationServiceStub =
v30.SequencerBftAdministrationServiceGrpc.stub(channel)
}
final case class AddPeerEndpoint(endpoint: Endpoint)
extends BaseSequencerBftAdministrationCommand[
AddPeerEndpointRequest,
AddPeerEndpointResponse,
Unit,
] {
override def createRequest(): Either[String, AddPeerEndpointRequest] = Right(
AddPeerEndpointRequest.of(Some(endpointToProto(endpoint)))
)
override def submitRequest(
service: v30.SequencerBftAdministrationServiceGrpc.SequencerBftAdministrationServiceStub,
request: AddPeerEndpointRequest,
): Future[AddPeerEndpointResponse] =
service.addPeerEndpoint(request)
override def handleResponse(
response: AddPeerEndpointResponse
): Either[String, Unit] =
Right(())
}
final case class RemovePeerEndpoint(endpoint: Endpoint)
extends BaseSequencerBftAdministrationCommand[
RemovePeerEndpointRequest,
RemovePeerEndpointResponse,
Unit,
] {
override def createRequest(): Either[String, RemovePeerEndpointRequest] = Right(
RemovePeerEndpointRequest.of(Some(endpointToProto(endpoint)))
)
override def submitRequest(
service: v30.SequencerBftAdministrationServiceGrpc.SequencerBftAdministrationServiceStub,
request: RemovePeerEndpointRequest,
): Future[RemovePeerEndpointResponse] =
service.removePeerEndpoint(request)
override def handleResponse(
response: RemovePeerEndpointResponse
): Either[String, Unit] =
Right(())
}
final case class GetPeerNetworkStatus(endpoints: Option[Iterable[Endpoint]])
extends BaseSequencerBftAdministrationCommand[
GetPeerNetworkStatusRequest,
GetPeerNetworkStatusResponse,
PeerNetworkStatus,
] {
override def createRequest(): Either[String, GetPeerNetworkStatusRequest] = Right(
GetPeerNetworkStatusRequest.of(endpoints.getOrElse(Iterable.empty).map(endpointToProto).toSeq)
)
override def submitRequest(
service: v30.SequencerBftAdministrationServiceGrpc.SequencerBftAdministrationServiceStub,
request: GetPeerNetworkStatusRequest,
): Future[GetPeerNetworkStatusResponse] =
service.getPeerNetworkStatus(request)
override def handleResponse(
response: GetPeerNetworkStatusResponse
): Either[String, PeerNetworkStatus] =
PeerNetworkStatus.fromProto(response)
}
}

View File

@ -491,20 +491,21 @@ private[canton] object CantonNodeParameterConverter {
def general(parent: CantonConfig, node: LocalNodeConfig): CantonNodeParameters.General = {
CantonNodeParameters.General.Impl(
parent.monitoring.tracing,
parent.monitoring.delayLoggingThreshold.toInternal,
parent.monitoring.logQueryCost,
parent.monitoring.logging,
parent.parameters.enableAdditionalConsistencyChecks,
parent.features.enablePreviewCommands,
parent.parameters.timeouts.processing,
node.sequencerClient,
node.parameters.caching,
node.parameters.batching,
parent.parameters.nonStandardConfig,
node.storage.parameters.migrateAndStart,
node.parameters.useNewTrafficControl,
parent.parameters.exitOnFatalFailures,
tracing = parent.monitoring.tracing,
delayLoggingThreshold = parent.monitoring.delayLoggingThreshold.toInternal,
logQueryCost = parent.monitoring.logQueryCost,
loggingConfig = parent.monitoring.logging,
enableAdditionalConsistencyChecks = parent.parameters.enableAdditionalConsistencyChecks,
enablePreviewFeatures = parent.features.enablePreviewCommands,
processingTimeouts = parent.parameters.timeouts.processing,
sequencerClient = node.sequencerClient,
cachingConfigs = node.parameters.caching,
batchingConfig = node.parameters.batching,
nonStandardConfig = parent.parameters.nonStandardConfig,
dbMigrateAndStart = node.storage.parameters.migrateAndStart,
useNewTrafficControl = node.parameters.useNewTrafficControl,
exitOnFatalFailures = parent.parameters.exitOnFatalFailures,
useUnifiedSequencer = node.parameters.useUnifiedSequencer,
)
}

View File

@ -7,6 +7,7 @@ import com.digitalasset.canton.*
import com.digitalasset.canton.admin.api.client.commands.EnterpriseSequencerAdminCommands.LocatePruningTimestampCommand
import com.digitalasset.canton.admin.api.client.commands.{
EnterpriseSequencerAdminCommands,
EnterpriseSequencerBftAdminCommands,
GrpcAdminCommand,
PruningSchedulerCommands,
SequencerAdminCommands,
@ -29,6 +30,7 @@ import com.digitalasset.canton.domain.sequencing.config.{
RemoteSequencerConfig,
SequencerNodeConfigCommon,
}
import com.digitalasset.canton.domain.sequencing.sequencer.block.bftordering.admin.EnterpriseSequencerBftAdminData.PeerNetworkStatus
import com.digitalasset.canton.domain.sequencing.sequencer.{
SequencerClients,
SequencerPruningStatus,
@ -39,6 +41,7 @@ import com.digitalasset.canton.health.admin.data.*
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging, TracedLogger}
import com.digitalasset.canton.metrics.MetricValue
import com.digitalasset.canton.networking.Endpoint
import com.digitalasset.canton.participant.config.{
BaseParticipantConfig,
LocalParticipantConfig,
@ -1150,6 +1153,26 @@ abstract class SequencerNodeReference(
runner.adminCommand(EnterpriseSequencerAdminCommands.DisableMember(member))
}
}
@Help.Summary("Methods used to manage BFT sequencers; they'll fail on non-BFT sequencers")
object bft {
@Help.Summary("Add a new peer endpoint")
def add_peer_endpoint(endpoint: Endpoint): Unit = consoleEnvironment.run {
runner.adminCommand(EnterpriseSequencerBftAdminCommands.AddPeerEndpoint(endpoint))
}
@Help.Summary("Remove a peer endpoint")
def remove_peer_endpoint(endpoint: Endpoint): Unit = consoleEnvironment.run {
runner.adminCommand(EnterpriseSequencerBftAdminCommands.RemovePeerEndpoint(endpoint))
}
@Help.Summary("Get peer network status")
def get_peer_network_status(endpoints: Option[Iterable[Endpoint]]): PeerNetworkStatus =
consoleEnvironment.run {
runner.adminCommand(EnterpriseSequencerBftAdminCommands.GetPeerNetworkStatus(endpoints))
}
}
}
class LocalSequencerNodeReference(

View File

@ -70,6 +70,7 @@ class NodesXTest extends FixtureAnyWordSpec with BaseTest with HasExecutionConte
override def batching: BatchingConfig = BatchingConfig()
override def caching: CachingConfigs = CachingConfigs()
override def useNewTrafficControl: Boolean = false
override def useUnifiedSequencer: Boolean = false
}
}
@ -92,6 +93,7 @@ class NodesXTest extends FixtureAnyWordSpec with BaseTest with HasExecutionConte
initialProtocolVersion: ProtocolVersion = testedProtocolVersion,
useNewTrafficControl: Boolean = false,
exitOnFatalFailures: Boolean = true,
useUnifiedSequencer: Boolean = false,
) extends CantonNodeParameters
private val metricsFactory: CantonLabeledMetricsFactory = new InMemoryMetricsFactory

View File

@ -190,6 +190,7 @@ message ActionDescription {
bool failed = 8;
optional string interface_id = 9;
optional string template_id = 10;
repeated string package_preference = 11;
}
message FetchActionDescription {

View File

@ -41,7 +41,7 @@ import com.digitalasset.canton.version.{
ProtocolVersion,
RepresentativeProtocolVersion,
}
import com.digitalasset.canton.{LfChoiceName, LfInterfaceId, LfPartyId, LfVersioned}
import com.digitalasset.canton.{LfChoiceName, LfInterfaceId, LfPackageId, LfPartyId, LfVersioned}
import com.google.protobuf.ByteString
/** Summarizes the information that is needed in addition to the other fields of [[ViewParticipantData]] for
@ -92,9 +92,12 @@ object ActionDescription extends HasProtocolVersionedCompanion[ActionDescription
def tryFromLfActionNode(
actionNode: LfActionNode,
seedO: Option[LfHash],
packagePreference: Set[LfPackageId],
protocolVersion: ProtocolVersion,
): ActionDescription =
fromLfActionNode(actionNode, seedO, protocolVersion).valueOr(err => throw err)
fromLfActionNode(actionNode, seedO, packagePreference, protocolVersion).valueOr(err =>
throw err
)
/** Extracts the action description from an LF node and the optional seed.
* @param seedO Must be set iff `node` is a [[com.digitalasset.canton.protocol.LfNodeCreate]] or [[com.digitalasset.canton.protocol.LfNodeExercises]].
@ -102,6 +105,7 @@ object ActionDescription extends HasProtocolVersionedCompanion[ActionDescription
def fromLfActionNode(
actionNode: LfActionNode,
seedO: Option[LfHash],
packagePreference: Set[LfPackageId],
protocolVersion: ProtocolVersion,
): Either[InvalidActionDescription, ActionDescription] =
actionNode match {
@ -148,6 +152,7 @@ object ActionDescription extends HasProtocolVersionedCompanion[ActionDescription
Some(templateId),
choice,
interfaceId,
packagePreference,
chosenValue,
actors,
byKey,
@ -231,10 +236,12 @@ object ActionDescription extends HasProtocolVersionedCompanion[ActionDescription
failed,
interfaceIdP,
templateIdP,
packagePreferenceP,
) = e
for {
inputContractId <- ProtoConverter.parseLfContractId(inputContractIdP)
templateId <- templateIdP.traverse(RefIdentifierSyntax.fromProtoPrimitive)
packagePreference <- packagePreferenceP.traverse(ProtoConverter.parsePackageId).map(_.toSet)
choice <- choiceFromProto(choiceP)
interfaceId <- interfaceIdP.traverse(RefIdentifierSyntax.fromProtoPrimitive)
version <- lfVersionFromProtoVersioned(versionP)
@ -249,6 +256,7 @@ object ActionDescription extends HasProtocolVersionedCompanion[ActionDescription
templateId,
choice,
interfaceId,
packagePreference,
chosenValue,
actors,
byKey,
@ -355,6 +363,7 @@ object ActionDescription extends HasProtocolVersionedCompanion[ActionDescription
templateId: Option[LfTemplateId],
choice: LfChoiceName,
interfaceId: Option[LfInterfaceId],
packagePreference: Set[LfPackageId],
chosenValue: Value,
actors: Set[LfPartyId],
override val byKey: Boolean,
@ -377,6 +386,7 @@ object ActionDescription extends HasProtocolVersionedCompanion[ActionDescription
v30.ActionDescription.ExerciseActionDescription(
inputContractId = inputContractId.toProtoPrimitive,
templateId = templateId.map(i => new RefIdentifierSyntax(i).toProtoPrimitive),
packagePreference = packagePreference.toSeq,
choice = choice,
interfaceId = interfaceId.map(i => new RefIdentifierSyntax(i).toProtoPrimitive),
chosenValue = serializedChosenValue,
@ -407,6 +417,7 @@ object ActionDescription extends HasProtocolVersionedCompanion[ActionDescription
templateId: Option[LfTemplateId],
choice: LfChoiceName,
interfaceId: Option[LfInterfaceId],
packagePreference: Set[LfPackageId],
chosenValue: Value,
actors: Set[LfPartyId],
byKey: Boolean,
@ -419,6 +430,7 @@ object ActionDescription extends HasProtocolVersionedCompanion[ActionDescription
templateId,
choice,
interfaceId,
packagePreference,
chosenValue,
actors,
byKey,
@ -433,6 +445,7 @@ object ActionDescription extends HasProtocolVersionedCompanion[ActionDescription
templateId: Option[LfTemplateId],
choice: LfChoiceName,
interfaceId: Option[LfInterfaceId],
packagePreference: Set[LfPackageId],
chosenValue: Value,
actors: Set[LfPartyId],
byKey: Boolean,
@ -447,6 +460,7 @@ object ActionDescription extends HasProtocolVersionedCompanion[ActionDescription
templateId,
choice,
interfaceId,
packagePreference,
chosenValue,
actors,
byKey,

View File

@ -32,6 +32,7 @@ import com.digitalasset.canton.{
LfFetchByKeyCommand,
LfFetchCommand,
LfLookupByKeyCommand,
LfPackageId,
LfPartyId,
ProtoDeserializationError,
checked,
@ -170,6 +171,7 @@ final case class ViewParticipantData private (
),
metadata.signatories,
failed = false,
packageIdPreference = Set.empty,
)
case ExerciseActionDescription(
@ -177,6 +179,7 @@ final case class ViewParticipantData private (
commandTemplateId,
choice,
interfaceId,
packagePreference,
chosenValue,
actors,
byKey,
@ -220,7 +223,7 @@ final case class ViewParticipantData private (
argument = chosenValue,
)
}
RootAction(cmd, actors, failed)
RootAction(cmd, actors, failed, packagePreference)
case FetchActionDescription(inputContractId, actors, byKey, _version) =>
val inputContract = coreInputs.getOrElse(
@ -242,7 +245,7 @@ final case class ViewParticipantData private (
} else {
LfFetchCommand(templateId = templateId, coid = inputContractId)
}
RootAction(cmd, actors, failed = false)
RootAction(cmd, actors, failed = false, packageIdPreference = Set.empty)
case LookupByKeyActionDescription(key, _version) =>
val keyResolution = resolvedKeys.getOrElse(
@ -260,6 +263,7 @@ final case class ViewParticipantData private (
LfLookupByKeyCommand(templateId = key.templateId, contractKey = key.key),
maintainers,
failed = false,
packageIdPreference = Set.empty,
)
}
@ -477,7 +481,12 @@ object ViewParticipantData
} yield viewParticipantData
}
final case class RootAction(command: LfCommand, authorizers: Set[LfPartyId], failed: Boolean)
final case class RootAction(
command: LfCommand,
authorizers: Set[LfPartyId],
failed: Boolean,
packageIdPreference: Set[LfPackageId],
)
/** Indicates an attempt to create an invalid [[ViewParticipantData]]. */
final case class InvalidViewParticipantData(message: String) extends RuntimeException(message)

View File

@ -33,6 +33,7 @@ object CantonNodeParameters {
def dbMigrateAndStart: Boolean
def useNewTrafficControl: Boolean
def exitOnFatalFailures: Boolean
def useUnifiedSequencer: Boolean
}
object General {
final case class Impl(
@ -50,6 +51,7 @@ object CantonNodeParameters {
dbMigrateAndStart: Boolean,
useNewTrafficControl: Boolean,
exitOnFatalFailures: Boolean,
useUnifiedSequencer: Boolean,
) extends CantonNodeParameters.General
}
trait Protocol {
@ -88,6 +90,7 @@ trait HasGeneralCantonNodeParameters extends CantonNodeParameters.General {
override def dbMigrateAndStart: Boolean = general.dbMigrateAndStart
override def useNewTrafficControl: Boolean = general.useNewTrafficControl
override def exitOnFatalFailures: Boolean = general.exitOnFatalFailures
override def useUnifiedSequencer: Boolean = general.useUnifiedSequencer
}

View File

@ -19,8 +19,9 @@ import com.digitalasset.canton.version.*
import com.google.rpc.status.Status
/** Possible verdicts on a transaction view from the participant's perspective.
* The verdict can be `LocalApprove`, `LocalReject` or `Malformed`.
* The verdicts `LocalReject` and `Malformed` include a `reason` pointing out which checks in Phase 3 have failed.
* The verdict can be `LocalApprove` or `LocalReject`.
* The verdict `LocalReject` includes a `reason` pointing out which checks in Phase 3 have failed, and
* a flag `isMalformed` indicating whether the rejection occurs due to malicious behavior.
*/
sealed trait LocalVerdict
extends Product
@ -30,6 +31,8 @@ sealed trait LocalVerdict
def isMalformed: Boolean
def isApprove: Boolean
private[messages] def toProtoV30: v30.LocalVerdict
@transient override protected lazy val companionObj: LocalVerdict.type = LocalVerdict
@ -80,6 +83,8 @@ final case class LocalApprove()(
override def isMalformed: Boolean = false
override def isApprove: Boolean = true
private[messages] def toProtoV30: v30.LocalVerdict =
v30.LocalVerdict(
code = VERDICT_CODE_LOCAL_APPROVE,
@ -99,6 +104,7 @@ final case class LocalReject(reason: com.google.rpc.status.Status, isMalformed:
) extends LocalVerdict
with TransactionRejection
with PrettyPrinting {
override def isApprove: Boolean = false
override def logWithContext(
extra: Map[String, String]

View File

@ -148,6 +148,9 @@ object ProtoConverter {
def parseCommandId(id: String): ParsingResult[Ref.CommandId] =
parseString(id)(Ref.CommandId.fromString)
def parsePackageId(id: String): ParsingResult[Ref.PackageId] =
parseString(id)(Ref.PackageId.fromString)
def parseTemplateIdO(id: String): ParsingResult[Option[LfTemplateId]] =
OptionUtil.emptyStringAsNone(id).traverse(parseTemplateId)

View File

@ -24,7 +24,7 @@ class DomainOutboxQueue(val loggerFactory: NamedLoggerFactory) extends NamedLogg
private val unsentQueue =
new scala.collection.mutable.Queue[Traced[GenericSignedTopologyTransactionX]]
private val pendingQueue =
private val inProcessQueue =
new scala.collection.mutable.Queue[Traced[GenericSignedTopologyTransactionX]]
/** To be called by the topology manager whenever new topology transactions have been validated.
@ -36,7 +36,8 @@ class DomainOutboxQueue(val loggerFactory: NamedLoggerFactory) extends NamedLogg
unsentQueue.enqueueAll(txs.map(Traced(_))).discard
})
def size(): Int = blocking(synchronized(unsentQueue.size))
def numUnsentTransactions: Int = blocking(synchronized(unsentQueue.size))
def numInProcessTransactions: Int = blocking(synchronized(inProcessQueue.size))
/** Marks up to `limit` transactions as pending and returns those transactions.
* @param limit batch size
@ -48,27 +49,27 @@ class DomainOutboxQueue(val loggerFactory: NamedLoggerFactory) extends NamedLogg
val txs = unsentQueue.take(limit).toList
logger.debug(s"dequeuing: $txs")
require(
pendingQueue.isEmpty,
s"tried to dequeue while pending wasn't empty: ${pendingQueue.toSeq}",
inProcessQueue.isEmpty,
s"tried to dequeue while pending wasn't empty: ${inProcessQueue.toSeq}",
)
pendingQueue.enqueueAll(txs)
unsentQueue.remove(0, limit)
pendingQueue.toSeq.map(_.value)
inProcessQueue.enqueueAll(txs)
unsentQueue.dropInPlace(limit)
inProcessQueue.toSeq.map(_.value)
})
/** Marks the currently pending transactions as unsent and adds them to the front of the queue in the same order.
*/
def requeue()(implicit traceContext: TraceContext): Unit = blocking(synchronized {
logger.debug(s"requeuing $pendingQueue")
unsentQueue.prependAll(pendingQueue)
pendingQueue.clear()
logger.debug(s"requeuing $inProcessQueue")
unsentQueue.prependAll(inProcessQueue)
inProcessQueue.clear()
})
/** Clears the currently pending transactions.
*/
def completeCycle()(implicit traceContext: TraceContext): Unit = blocking(synchronized {
logger.debug(s"completeCycle $pendingQueue")
pendingQueue.clear()
logger.debug(s"completeCycle $inProcessQueue")
inProcessQueue.clear()
})
}

View File

@ -240,4 +240,18 @@ object MapsUtil {
)
minuend.filter { case (k, v) => !subtrahend.get(k).contains(v) }
}
/** @return a map if there are no conflicting key binding
* or a map of key to the set of conflicting bindings
*/
def toNonConflictingMap[K, V](it: Iterable[(K, V)]): Either[Map[K, Set[V]], Map[K, V]] = {
val set = it.toSet
val map = set.toMap
if (map.size == set.size) {
Right(map)
} else {
Left(set.groupBy(_._1).collect({ case (k, v) if v.size > 1 => (k, v.map(_._2)) }))
}
}
}

View File

@ -16,21 +16,19 @@ import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
@RunWith(JUnitPlatform.class)
import org.junit.Test;
public class JsonLfDecodersTest {
@Test
void testUnit() throws JsonLfDecoder.Error {
public void testUnit() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.unit, eq("{}", Unit.getInstance()), eq("\t{\n} ", Unit.getInstance()));
}
@Test
void testUnitErrors() throws JsonLfDecoder.Error {
public void testUnitErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.unit,
errors("", "Expected { but was nothing at line: 1, column: 0"),
@ -40,12 +38,12 @@ public class JsonLfDecodersTest {
}
@Test
void testBool() throws JsonLfDecoder.Error {
public void testBool() throws JsonLfDecoder.Error {
checkReadAll(JsonLfDecoders.bool, eq("false", false), eq("true", true));
}
@Test
void testBoolErrors() throws JsonLfDecoder.Error {
public void testBoolErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.bool,
errors("1", "Expected boolean but was 1 at line: 1, column: 1"),
@ -54,7 +52,7 @@ public class JsonLfDecodersTest {
}
@Test
void testInt64() throws JsonLfDecoder.Error {
public void testInt64() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.int64,
eq("42", 42L),
@ -69,7 +67,7 @@ public class JsonLfDecodersTest {
}
@Test
void testInt64Errors() throws JsonLfDecoder.Error {
public void testInt64Errors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.int64,
errors("42.3", "Expected int64 but was 42.3 at line: 1, column: 1"),
@ -85,7 +83,7 @@ public class JsonLfDecodersTest {
}
@Test
void testNumeric() throws JsonLfDecoder.Error {
public void testNumeric() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.numeric(10),
cmpEq("42", dec("42")),
@ -106,7 +104,7 @@ public class JsonLfDecodersTest {
}
@Test
void testNumericErrors() throws JsonLfDecoder.Error {
public void testNumericErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.numeric(10),
errors(
@ -132,7 +130,7 @@ public class JsonLfDecodersTest {
}
@Test
void testTimestamp() throws JsonLfDecoder.Error {
public void testTimestamp() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.timestamp,
eq(
@ -155,7 +153,7 @@ public class JsonLfDecodersTest {
}
@Test
void testTimestampErrors() throws JsonLfDecoder.Error {
public void testTimestampErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.timestamp,
// 24th hour
@ -190,7 +188,7 @@ public class JsonLfDecodersTest {
}
@Test
void testDate() throws JsonLfDecoder.Error {
public void testDate() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.date,
eq("\"2019-06-18\"", date(2019, Month.JUNE, 18)),
@ -199,7 +197,7 @@ public class JsonLfDecodersTest {
}
@Test
void testDateErrors() throws JsonLfDecoder.Error {
public void testDateErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.date,
errors(
@ -230,12 +228,12 @@ public class JsonLfDecodersTest {
}
@Test
void testParty() throws JsonLfDecoder.Error {
public void testParty() throws JsonLfDecoder.Error {
checkReadAll(JsonLfDecoders.party, eq("\"Alice\"", "Alice"));
}
@Test
void testPartyErrors() throws JsonLfDecoder.Error {
public void testPartyErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.party,
errors("{\"unwrap\": \"foo\"}", "Expected text but was { at line: 1, column: 1"),
@ -243,12 +241,12 @@ public class JsonLfDecodersTest {
}
@Test
void testText() throws JsonLfDecoder.Error {
public void testText() throws JsonLfDecoder.Error {
checkReadAll(JsonLfDecoders.text, eq("\"\"", ""), eq("\" \"", " "), eq("\"hello\"", "hello"));
}
@Test
void testTextErrors() throws JsonLfDecoder.Error {
public void testTextErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.text,
errors("{\"unwrap\": \"foo\"}", "Expected text but was { at line: 1, column: 1"),
@ -256,20 +254,20 @@ public class JsonLfDecodersTest {
}
@Test
void testContractId() throws JsonLfDecoder.Error {
public void testContractId() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.contractId(Tmpl.Cid::new), eq("\"deadbeef\"", new Tmpl.Cid("deadbeef")));
}
@Test
void testContractIdErrors() throws JsonLfDecoder.Error {
public void testContractIdErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.contractId(Tmpl.Cid::new),
errors("42", "Expected text but was 42 at line: 1, column: 1"));
}
@Test
void testEnum() throws JsonLfDecoder.Error {
public void testEnum() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.enumeration(Suit.damlNames),
eq("\"Hearts\"", Suit.HEARTS),
@ -279,7 +277,7 @@ public class JsonLfDecodersTest {
}
@Test
void testEnumErrors() throws JsonLfDecoder.Error {
public void testEnumErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.enumeration(Suit.damlNames),
errors("Hearts", "JSON parse error at line: 1, column: 1", IOException.class),
@ -294,7 +292,7 @@ public class JsonLfDecodersTest {
}
@Test
void testList() throws JsonLfDecoder.Error {
public void testList() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.list(JsonLfDecoders.int64),
eq("[]", emptyList()),
@ -303,7 +301,7 @@ public class JsonLfDecodersTest {
}
@Test
void testListNested() throws JsonLfDecoder.Error {
public void testListNested() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.list(JsonLfDecoders.list(JsonLfDecoders.int64)),
eq("[]", emptyList()),
@ -311,7 +309,7 @@ public class JsonLfDecodersTest {
}
@Test
void testListErrors() throws JsonLfDecoder.Error {
public void testListErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.list(JsonLfDecoders.int64),
errors("[", "JSON parse error at line: 1, column: 2", IOException.class),
@ -322,7 +320,7 @@ public class JsonLfDecodersTest {
}
@Test
void testTextMap() throws JsonLfDecoder.Error {
public void testTextMap() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.textMap(JsonLfDecoders.int64),
eq("{}", emptyMap()),
@ -330,7 +328,7 @@ public class JsonLfDecodersTest {
}
@Test
void testTextMapErrors() throws JsonLfDecoder.Error {
public void testTextMapErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.textMap(JsonLfDecoders.int64),
errors("{1: true}", "JSON parse error at line: 1, column: 3", IOException.class),
@ -340,7 +338,7 @@ public class JsonLfDecodersTest {
}
@Test
void testGenMap() throws JsonLfDecoder.Error {
public void testGenMap() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.genMap(JsonLfDecoders.text, JsonLfDecoders.int64),
eq("[]", emptyMap()),
@ -348,7 +346,7 @@ public class JsonLfDecodersTest {
}
@Test
void testGenMapErrors() throws JsonLfDecoder.Error {
public void testGenMapErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.genMap(JsonLfDecoders.text, JsonLfDecoders.int64),
errors("{}", "Expected [ but was { at line: 1, column: 1"),
@ -360,7 +358,7 @@ public class JsonLfDecodersTest {
}
@Test
void testOptionalNonNested() throws JsonLfDecoder.Error {
public void testOptionalNonNested() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.optional(JsonLfDecoders.int64),
eq("null", Optional.empty()),
@ -368,7 +366,7 @@ public class JsonLfDecodersTest {
}
@Test
void testOptionalNonNestedErrors() throws JsonLfDecoder.Error {
public void testOptionalNonNestedErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.optional(JsonLfDecoders.int64),
errors("undefined", "JSON parse error at line: 1, column: 1", IOException.class),
@ -378,7 +376,7 @@ public class JsonLfDecodersTest {
}
@Test
void testOptionalNested() throws JsonLfDecoder.Error {
public void testOptionalNested() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.optionalNested(JsonLfDecoders.optional(JsonLfDecoders.int64)),
eq("null", Optional.empty()),
@ -387,7 +385,7 @@ public class JsonLfDecodersTest {
}
@Test
void testOptionalNestedErrors() throws JsonLfDecoder.Error {
public void testOptionalNestedErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.optionalNested(JsonLfDecoders.optional(JsonLfDecoders.int64)),
errors("[null]", "Expected ] or item but was null at line: 1, column: 2"),
@ -396,7 +394,7 @@ public class JsonLfDecodersTest {
}
@Test
void testOptionalNestedDeeper() throws JsonLfDecoder.Error {
public void testOptionalNestedDeeper() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.optionalNested(
JsonLfDecoders.optionalNested(
@ -409,7 +407,7 @@ public class JsonLfDecodersTest {
}
@Test
void testOptionalLists() throws JsonLfDecoder.Error {
public void testOptionalLists() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.optional(JsonLfDecoders.list(JsonLfDecoders.list(JsonLfDecoders.int64))),
eq("null", Optional.empty()),
@ -419,7 +417,7 @@ public class JsonLfDecodersTest {
}
@Test
void testVariant() throws JsonLfDecoder.Error {
public void testVariant() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.variant(
asList("Bar", "Baz", "Quux", "Flarp"),
@ -448,7 +446,7 @@ public class JsonLfDecodersTest {
}
@Test
void testVariantErrors() throws JsonLfDecoder.Error {
public void testVariantErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.variant(
asList("Bar", "Baz", "Quux", "Flarp"),
@ -511,7 +509,7 @@ public class JsonLfDecodersTest {
}
@Test
void testRecord() throws JsonLfDecoder.Error {
public void testRecord() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.record(
asList("i", "b"),
@ -534,7 +532,7 @@ public class JsonLfDecodersTest {
}
@Test
void testRecordErrors() throws JsonLfDecoder.Error {
public void testRecordErrors() throws JsonLfDecoder.Error {
checkReadAll(
JsonLfDecoders.record(
asList("i", "b"),
@ -565,7 +563,7 @@ public class JsonLfDecodersTest {
}
@Test
void testUnknownValue() throws JsonLfDecoder.Error {
public void testUnknownValue() throws JsonLfDecoder.Error {
JsonLfReader.UnknownValue.read(new JsonLfReader("1")).decodeWith(JsonLfDecoders.int64);
JsonLfReader.UnknownValue.read(new JsonLfReader("\"88\""))
.decodeWith(JsonLfDecoders.numeric(2));
@ -605,7 +603,7 @@ public class JsonLfDecodersTest {
}
@Test
void testUnknownValueErrors() throws JsonLfDecoder.Error {
public void testUnknownValueErrors() throws JsonLfDecoder.Error {
unknownValueDecodeErrors(
"42", r -> r, JsonLfDecoders.bool, "Expected boolean but was 42 at line: 1, column: 1");
unknownValueDecodeErrors(

View File

@ -18,25 +18,23 @@ import java.time.Month;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
@RunWith(JUnitPlatform.class)
import org.junit.Test;
public class JsonLfEncodersTest {
@Test
void testUnit() throws IOException {
public void testUnit() throws IOException {
assertEquals("{}", intoString(JsonLfEncoders.unit(Unit.getInstance())));
}
@Test
void testBool() throws IOException {
public void testBool() throws IOException {
checkWriteAll(JsonLfEncoders::bool, Case.of(true, "true"), Case.of(false, "false"));
}
@Test
void testInt64() throws IOException {
public void testInt64() throws IOException {
checkWriteAll(
JsonLfEncoders::int64,
Case.of(42L, "42"),
@ -48,14 +46,14 @@ public class JsonLfEncodersTest {
}
@Test
void testInt64AsString() throws IOException {
public void testInt64AsString() throws IOException {
StringWriter sw = new StringWriter();
JsonLfEncoders.int64(42L).encode(new JsonLfWriter(sw, opts().encodeInt64AsString(true)));
assertEquals("\"42\"", sw.toString());
}
@Test
void testNumeric() throws IOException {
public void testNumeric() throws IOException {
checkWriteAll(
JsonLfEncoders::numeric,
Case.of(dec("42"), "42"),
@ -72,7 +70,7 @@ public class JsonLfEncodersTest {
}
@Test
void testNumericAsString() throws IOException {
public void testNumericAsString() throws IOException {
StringWriter sw = new StringWriter();
JsonLfEncoders.numeric(new BigDecimal(0.5))
.encode(new JsonLfWriter(sw, opts().encodeNumericAsString(true)));
@ -80,7 +78,7 @@ public class JsonLfEncodersTest {
}
@Test
void testTimestamp() throws IOException {
public void testTimestamp() throws IOException {
checkWriteAll(
JsonLfEncoders::timestamp,
Case.of(
@ -106,7 +104,7 @@ public class JsonLfEncodersTest {
}
@Test
void testDate() throws IOException {
public void testDate() throws IOException {
checkWriteAll(
JsonLfEncoders::date,
Case.of(date(2019, Month.JUNE, 18), "\"2019-06-18\""),
@ -115,12 +113,12 @@ public class JsonLfEncodersTest {
}
@Test
void testParty() throws IOException {
public void testParty() throws IOException {
checkWriteAll(JsonLfEncoders::party, Case.of("Alice", "\"Alice\""));
}
@Test
void testText() throws IOException {
public void testText() throws IOException {
checkWriteAll(
JsonLfEncoders::text,
Case.of("", "\"\""),
@ -130,12 +128,12 @@ public class JsonLfEncodersTest {
}
@Test
void testContractId() throws IOException {
public void testContractId() throws IOException {
checkWriteAll(JsonLfEncoders::contractId, Case.of(new Tmpl.Cid("deadbeef"), "\"deadbeef\""));
}
@Test
void testEnum() throws IOException {
public void testEnum() throws IOException {
checkWriteAll(
JsonLfEncoders.enumeration(Suit::toDamlName),
Case.of(Suit.HEARTS, "\"Hearts\""),
@ -145,7 +143,7 @@ public class JsonLfEncodersTest {
}
@Test
void testList() throws IOException {
public void testList() throws IOException {
checkWriteAll(
JsonLfEncoders.list(JsonLfEncoders::int64),
Case.of(emptyList(), "[]"),
@ -153,7 +151,7 @@ public class JsonLfEncodersTest {
}
@Test
void testListNested() throws IOException {
public void testListNested() throws IOException {
checkWriteAll(
JsonLfEncoders.list(JsonLfEncoders.list(JsonLfEncoders::int64)),
Case.of(emptyList(), "[]"),
@ -168,7 +166,7 @@ public class JsonLfEncodersTest {
}
@Test
void testTextMap() throws IOException {
public void testTextMap() throws IOException {
checkWriteAll(
JsonLfEncoders.textMap(JsonLfEncoders::int64),
Case.of(emptyMap(), "{}"),
@ -176,7 +174,7 @@ public class JsonLfEncodersTest {
}
@Test
void testGenMap() throws IOException {
public void testGenMap() throws IOException {
checkWriteAll(
JsonLfEncoders.genMap(JsonLfEncoders::text, JsonLfEncoders::int64),
Case.of(emptyMap(), "[]"),
@ -184,7 +182,7 @@ public class JsonLfEncodersTest {
}
@Test
void testOptionalNonNested() throws IOException {
public void testOptionalNonNested() throws IOException {
checkWriteAll(
JsonLfEncoders.optional(JsonLfEncoders::int64),
Case.of(Optional.empty(), "null"),
@ -192,7 +190,7 @@ public class JsonLfEncodersTest {
}
@Test
void testOptionalNested() throws IOException {
public void testOptionalNested() throws IOException {
checkWriteAll(
JsonLfEncoders.optionalNested(JsonLfEncoders.optional(JsonLfEncoders::int64)),
Case.of(Optional.empty(), "null"),
@ -201,7 +199,7 @@ public class JsonLfEncodersTest {
}
@Test
void testOptionalNestedDeeper() throws IOException {
public void testOptionalNestedDeeper() throws IOException {
checkWriteAll(
JsonLfEncoders.optionalNested(
JsonLfEncoders.optionalNested(
@ -214,7 +212,7 @@ public class JsonLfEncodersTest {
}
@Test
void testOptionalLists() throws IOException {
public void testOptionalLists() throws IOException {
checkWriteAll(
JsonLfEncoders.optional(JsonLfEncoders.list(JsonLfEncoders.list(JsonLfEncoders::int64))),
Case.of(Optional.empty(), "null"),
@ -224,7 +222,7 @@ public class JsonLfEncodersTest {
}
@Test
void testVariant() throws IOException {
public void testVariant() throws IOException {
checkWriteAll(
JsonLfEncoders.variant(
v -> {
@ -251,7 +249,7 @@ public class JsonLfEncodersTest {
}
@Test
void testRecord() throws IOException {
public void testRecord() throws IOException {
checkWriteAll(
r ->
JsonLfEncoders.record(

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
build-options:
- --target=2.1
name: CantonExamples

View File

@ -32,6 +32,7 @@ trait LocalNodeParametersConfig {
/** Various cache sizes */
def caching: CachingConfigs
def useNewTrafficControl: Boolean
def useUnifiedSequencer: Boolean
}
trait CommunityLocalNodeConfig extends LocalNodeConfig {

View File

@ -96,44 +96,7 @@ class QueueBasedDomainOutboxX(
}
}
protected case class QueueState(
queuedApprox: Int,
running: Boolean,
) {
def updateQueued(queuedNum: Int): QueueState = {
val ret = copy(
queuedApprox = queuedApprox + queuedNum
)
if (ret.hasPending) {
ensureIdleFutureIsSet()
}
ret
}
def hasPending: Boolean = queuedApprox > 0
def done(): QueueState = {
if (!hasPending) {
idleFuture.getAndSet(None).foreach(_.trySuccess(UnlessShutdown.unit))
}
copy(running = false)
}
def setRunning(): QueueState = {
if (!running) {
ensureIdleFutureIsSet()
}
copy(running = true)
}
}
private val queueState =
new AtomicReference[QueueState](
QueueState(
0,
false,
)
)
private val isRunning = new AtomicBoolean(false)
private val initialized = new AtomicBoolean(false)
/** a future we provide that gets fulfilled once we are done dispatching */
@ -146,13 +109,17 @@ class QueueBasedDomainOutboxX(
case x => x
}.discard
def queueSize: Int = queueState.get().queuedApprox
// reflect both unsent and in-process transactions in the topology queue status
def queueSize: Int =
domainOutboxQueue.numUnsentTransactions + domainOutboxQueue.numInProcessTransactions
private def hasUnsentTransactions: Boolean = domainOutboxQueue.numUnsentTransactions > 0
def newTransactionsAddedToAuthorizedStore(
asOf: CantonTimestamp,
num: Int,
): FutureUnlessShutdown[Unit] = {
queueState.updateAndGet(_.updateQueued(num)).discard
ensureIdleFutureIsSet()
kickOffFlush()
FutureUnlessShutdown.unit
}
@ -171,24 +138,14 @@ class QueueBasedDomainOutboxX(
def startup()(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, String, Unit] = {
val checkQueueF = performUnlessClosingF(functionFullName) {
val enqueued = domainOutboxQueue.size()
val cur = queueState.updateAndGet { c =>
val next = c.copy(
// queuing statistics during startup will be a bit off, we just ensure that we signal that we have something in our queue
// we might improve by querying the store, checking for the number of pending tx
queuedApprox = enqueued
)
if (next.hasPending) ensureIdleFutureIsSet()
next
}
performUnlessClosingEitherUSF(functionFullName) {
if (hasUnsentTransactions) ensureIdleFutureIsSet()
logger.debug(
s"Resuming dispatching, pending=${cur.hasPending}"
s"Resuming dispatching, pending=${hasUnsentTransactions}"
)
Future.unit
// run initial flush
flush(initialize = true)
}
// run initial flush
EitherT.right(checkQueueF).flatMap(_ => flush(initialize = true))
}
protected def kickOffFlush(): Unit = {
@ -204,12 +161,15 @@ class QueueBasedDomainOutboxX(
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, String, Unit] = {
def markDone(delayRetry: Boolean = false): Unit = {
val updated = queueState.updateAndGet(_.done())
isRunning.set(false)
if (!hasUnsentTransactions) {
idleFuture.getAndSet(None).foreach(_.trySuccess(UnlessShutdown.unit))
}
// if anything has been pushed in the meantime, we need to kick off a new flush
logger.debug(
s"Marked flush as done. Updated queue size: ${updated.queuedApprox}. IsClosing: ${isClosing}"
s"Marked flush as done. Current queue size: ${queueSize}. IsClosing: ${isClosing}"
)
if (updated.hasPending && !isClosing) {
if (hasUnsentTransactions && !isClosing) {
if (delayRetry) {
val delay = 10.seconds
logger.debug(s"Kick off a new delayed flush in ${delay}")
@ -232,23 +192,26 @@ class QueueBasedDomainOutboxX(
}
}
val cur = queueState.getAndUpdate(_.setRunning())
val transitionedToRunning = isRunning.compareAndSet(false, true)
if (transitionedToRunning) {
ensureIdleFutureIsSet()
}
logger.debug(s"Invoked flush with queue size ${queueState.get().queuedApprox}")
logger.debug(s"Invoked flush with queue size ${queueSize}")
if (isClosing) {
logger.debug("Flush invoked in spite of closing")
EitherT.rightT(())
}
// only flush if we are not running yet
else if (cur.running) {
else if (!transitionedToRunning) {
logger.debug("Another flush cycle is currently ongoing")
EitherT.rightT(())
} else {
// mark as initialised (it's safe now for a concurrent thread to invoke flush as well)
if (initialize)
initialized.set(true)
if (cur.hasPending) {
if (hasUnsentTransactions) {
val pendingAndApplicableF = performUnlessClosingF(functionFullName)(for {
// find pending transactions
pending <- findPendingTransactions()
@ -262,10 +225,10 @@ class QueueBasedDomainOutboxX(
notPresent <- notAlreadyPresent(applicable)
_ = if (notPresent.size != applicable.size)
logger.debug(s"not already present transactions: $notPresent")
} yield (pending, notPresent))
} yield notPresent)
val ret = for {
pendingAndNotPresent <- EitherT.right(pendingAndApplicableF)
(pending, notPresent) = pendingAndNotPresent
notPresent <- EitherT.right(pendingAndApplicableF)
_ = lastDispatched.set(notPresent.lastOption)
// Try to convert if necessary the topology transactions for the required protocol version of the domain
@ -289,12 +252,6 @@ class QueueBasedDomainOutboxX(
if (!observed) {
logger.warn("Did not observe transactions in target domain store.")
}
// update queue state according to responses
queueState.updateAndGet { c =>
c.copy(
queuedApprox = math.max(c.queuedApprox - pending.size, 0)
)
}.discard
domainOutboxQueue.completeCycle()
markDone()

View File

@ -37,6 +37,7 @@ class ActionDescriptionTest extends AnyWordSpec with BaseTest {
templateId = Some(defaultTemplateId),
choiceName,
None,
Set.empty,
ExampleTransactionFactory.veryDeepValue,
Set(ExampleTransactionFactory.submitter),
byKey = true,
@ -72,6 +73,7 @@ class ActionDescriptionTest extends AnyWordSpec with BaseTest {
ActionDescription.fromLfActionNode(
ExampleTransactionFactory.createNode(suffixedId),
None,
Set.empty,
testedProtocolVersion,
) shouldBe
Left(InvalidActionDescription("No seed for a Create node given"))
@ -79,6 +81,7 @@ class ActionDescriptionTest extends AnyWordSpec with BaseTest {
ActionDescription.fromLfActionNode(
ExampleTransactionFactory.exerciseNodeWithoutChildren(suffixedId),
None,
Set.empty,
testedProtocolVersion,
) shouldBe
Left(InvalidActionDescription("No seed for an Exercise node given"))
@ -88,6 +91,7 @@ class ActionDescriptionTest extends AnyWordSpec with BaseTest {
ActionDescription.fromLfActionNode(
ExampleTransactionFactory.fetchNode(suffixedId),
Some(seed),
Set.empty,
testedProtocolVersion,
) shouldBe
Left(InvalidActionDescription("No seed should be given for a Fetch node"))
@ -96,6 +100,7 @@ class ActionDescriptionTest extends AnyWordSpec with BaseTest {
ExampleTransactionFactory
.lookupByKeyNode(globalKey, maintainers = Set(ExampleTransactionFactory.observer)),
Some(seed),
Set.empty,
testedProtocolVersion,
) shouldBe Left(InvalidActionDescription("No seed should be given for a LookupByKey node"))
}
@ -104,6 +109,7 @@ class ActionDescriptionTest extends AnyWordSpec with BaseTest {
ActionDescription.fromLfActionNode(
ExampleTransactionFactory.fetchNode(suffixedId, actingParties = Set.empty),
None,
Set.empty,
testedProtocolVersion,
) shouldBe Left(InvalidActionDescription("Fetch node without acting parties"))
}

View File

@ -19,7 +19,7 @@ import com.digitalasset.canton.protocol.*
import com.digitalasset.canton.sequencing.protocol.MediatorsOfDomain
import com.digitalasset.canton.topology.{DomainId, ParticipantId}
import com.digitalasset.canton.version.{ProtocolVersion, RepresentativeProtocolVersion}
import com.digitalasset.canton.{LfInterfaceId, LfPartyId}
import com.digitalasset.canton.{LfInterfaceId, LfPackageId, LfPartyId}
import magnolify.scalacheck.auto.*
import org.scalacheck.{Arbitrary, Gen}
@ -146,6 +146,8 @@ final class GeneratorsData(
interfaceId <- Gen.option(Arbitrary.arbitrary[LfInterfaceId])
packagePreference <- Gen.containerOf[Set, LfPackageId](Arbitrary.arbitrary[LfPackageId])
// We consider only this specific value because the goal is not exhaustive testing of LF (de)serialization
chosenValue <- Gen.long.map(ValueInt64)
@ -160,6 +162,7 @@ final class GeneratorsData(
templateId,
choice,
interfaceId,
packagePreference,
chosenValue,
actors,
byKey,

View File

@ -39,10 +39,13 @@ class TransactionViewTest extends AnyWordSpec with BaseTest with HasExecutionCon
)
.value
private val defaultPackagePreference = Set(ExampleTransactionFactory.packageId)
private val defaultActionDescription: ActionDescription =
ActionDescription.tryFromLfActionNode(
ExampleTransactionFactory.createNode(createdId, contractInst),
Some(ExampleTransactionFactory.lfHash(5)),
defaultPackagePreference,
testedProtocolVersion,
)
@ -191,6 +194,7 @@ class TransactionViewTest extends AnyWordSpec with BaseTest with HasExecutionCon
actionDescription = ActionDescription.tryFromLfActionNode(
ExampleTransactionFactory.exerciseNodeWithoutChildren(absoluteId),
Some(nodeSeed),
defaultPackagePreference,
testedProtocolVersion,
)
).left.value should startWith(
@ -207,6 +211,7 @@ class TransactionViewTest extends AnyWordSpec with BaseTest with HasExecutionCon
Set(ExampleTransactionFactory.submitter),
),
None,
defaultPackagePreference,
testedProtocolVersion,
)
).left.value should startWith(
@ -222,6 +227,7 @@ class TransactionViewTest extends AnyWordSpec with BaseTest with HasExecutionCon
maintainers = Set(ExampleTransactionFactory.submitter),
),
None,
defaultPackagePreference,
testedProtocolVersion,
)
).left.value should startWith(

View File

@ -618,6 +618,7 @@ class ExampleTransactionFactory(
ActionDescription.tryFromLfActionNode(
LfTransactionUtil.lightWeight(node),
seed,
packagePreference = Set.empty,
protocolVersion,
)

View File

@ -22,5 +22,11 @@ class MapsUtilTest extends AnyWordSpec with BaseTest {
)
}
"build non conflicting maps" in {
MapsUtil.toNonConflictingMap(Seq(1 -> 2, 2 -> 3)) shouldBe Right(Map(1 -> 2, 2 -> 3))
MapsUtil.toNonConflictingMap(Seq(1 -> 2, 2 -> 3, 1 -> 2)) shouldBe Right(Map(1 -> 2, 2 -> 3))
MapsUtil.toNonConflictingMap(Seq(1 -> 2, 1 -> 3)) shouldBe Left(Map(1 -> Set(2, 3)))
}
}
}

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
build-options:
- --target=2.1
name: ai-analysis

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
build-options:
- --target=2.1
name: bank

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
build-options:
- --target=2.1
name: doctor

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
build-options:
- --target=2.1
name: health-insurance

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
build-options:
- --target=2.1
name: medical-records

View File

@ -0,0 +1,60 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
syntax = "proto3";
package com.digitalasset.canton.sequencer.admin.v30;
service SequencerBftAdministrationService {
rpc AddPeerEndpoint(AddPeerEndpointRequest) returns (AddPeerEndpointResponse);
rpc RemovePeerEndpoint(RemovePeerEndpointRequest) returns (RemovePeerEndpointResponse);
rpc GetPeerNetworkStatus(GetPeerNetworkStatusRequest) returns (GetPeerNetworkStatusResponse);
}
message AddPeerEndpointRequest {
PeerEndpoint endpoint = 1;
}
message RemovePeerEndpointRequest {
PeerEndpoint endpoint = 1;
}
message PeerEndpoint {
string host = 1;
uint32 port = 2;
}
message AddPeerEndpointResponse {
// False if already present
bool added = 1;
}
message RemovePeerEndpointResponse {
// False if not found
bool removed = 1;
}
enum PeerEndpointHealthStatus {
PEER_ENDPOINT_HEALTH_STATUS_UNSPECIFIED = 0; // Required by buf lint (default value)
PEER_ENDPOINT_HEALTH_STATUS_UNKNOWN_ENDPOINT = 1;
PEER_ENDPOINT_HEALTH_STATUS_UNAUTHENTICATED = 3;
PEER_ENDPOINT_HEALTH_STATUS_AUTHENTICATED = 4;
}
message PeerEndpointHealth {
PeerEndpointHealthStatus status = 1;
optional string description = 2;
}
message PeerEndpointStatus {
PeerEndpoint endpoint = 1;
PeerEndpointHealth health = 2;
}
message GetPeerNetworkStatusRequest {
// If empty, the status of all known endpoints will be returned
repeated PeerEndpoint endpoints = 1;
}
message GetPeerNetworkStatusResponse {
repeated PeerEndpointStatus statuses = 1;
}

View File

@ -4,7 +4,7 @@
package com.digitalasset.canton.domain.block
import cats.data.Nested
import cats.syntax.functor.*
import cats.syntax.parallel.*
import com.daml.error.BaseError
import com.daml.nameof.NameOf.functionFullName
import com.daml.nonempty.NonEmpty
@ -26,7 +26,12 @@ import com.digitalasset.canton.domain.sequencing.sequencer.block.BlockSequencer
import com.digitalasset.canton.domain.sequencing.sequencer.errors.CreateSubscriptionError
import com.digitalasset.canton.domain.sequencing.sequencer.traffic.SequencerRateLimitManager
import com.digitalasset.canton.error.SequencerBaseError
import com.digitalasset.canton.lifecycle.{AsyncCloseable, AsyncOrSyncCloseable, FlagCloseableAsync}
import com.digitalasset.canton.lifecycle.{
AsyncCloseable,
AsyncOrSyncCloseable,
FlagCloseableAsync,
FutureUnlessShutdown,
}
import com.digitalasset.canton.logging.{ErrorLoggingContext, NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.pekkostreams.dispatcher.Dispatcher
import com.digitalasset.canton.pekkostreams.dispatcher.SubSource.RangeSource
@ -78,12 +83,13 @@ trait BlockSequencerStateManagerBase extends FlagCloseableAsync {
*/
def processBlock(
bug: BlockUpdateGenerator
): Flow[BlockEvents, Traced[OrderedBlockUpdate], NotUsed]
): Flow[BlockEvents, Traced[OrderedBlockUpdate[SignedChunkEvents]], NotUsed]
/** Persists the [[com.digitalasset.canton.domain.block.BlockUpdate]]s and completes the waiting RPC calls
* as necessary.
*/
def applyBlockUpdate: Flow[Traced[BlockUpdate], Traced[CantonTimestamp], NotUsed]
def applyBlockUpdate
: Flow[Traced[BlockUpdate[SignedChunkEvents]], Traced[CantonTimestamp], NotUsed]
/** Wait for a member to be disabled on the underlying ledger */
def waitForMemberToBeDisabled(member: Member): Future[Unit]
@ -165,7 +171,7 @@ class BlockSequencerStateManager(
override def processBlock(
bug: BlockUpdateGenerator
): Flow[BlockEvents, Traced[OrderedBlockUpdate], NotUsed] = {
): Flow[BlockEvents, Traced[OrderedBlockUpdate[SignedChunkEvents]], NotUsed] = {
val head = getHeadState
val bugState = {
import TraceContext.Implicits.Empty.*
@ -175,6 +181,8 @@ class BlockSequencerStateManager(
.via(checkBlockHeight(head.block.height))
.via(chunkBlock(bug))
.via(processChunk(bug)(bugState))
.async
.via(signEvents(bug))
}
private def checkBlockHeight(
@ -221,27 +229,48 @@ class BlockSequencerStateManager(
private def processChunk(bug: BlockUpdateGenerator)(
initialState: bug.InternalState
): Flow[Traced[BlockChunk], Traced[OrderedBlockUpdate], NotUsed] = {
implicit val traceContext = TraceContext.empty
): Flow[Traced[BlockChunk], Traced[OrderedBlockUpdate[UnsignedChunkEvents]], NotUsed] = {
implicit val traceContext: TraceContext = TraceContext.empty
Flow[Traced[BlockChunk]].statefulMapAsyncUSAndDrain(initialState) { (state, tracedChunk) =>
implicit val traceContext: TraceContext = tracedChunk.traceContext
tracedChunk.traverse(blockChunk => Nested(bug.processBlockChunk(state, blockChunk))).value
}
}
override def applyBlockUpdate: Flow[Traced[BlockUpdate], Traced[CantonTimestamp], NotUsed] = {
implicit val traceContext = TraceContext.empty
Flow[Traced[BlockUpdate]].statefulMapAsync(getHeadState) { (priorHead, update) =>
implicit val traceContext = update.traceContext
val fut = update.value match {
case LocalBlockUpdate(local) =>
handleLocalEvent(priorHead, local)(TraceContext.todo)
case chunk: ChunkUpdate =>
handleChunkUpdate(priorHead, chunk)(TraceContext.todo)
case complete: CompleteBlockUpdate =>
handleComplete(priorHead, complete.block)(TraceContext.todo)
private def signEvents(bug: BlockUpdateGenerator): Flow[
Traced[OrderedBlockUpdate[UnsignedChunkEvents]],
Traced[OrderedBlockUpdate[SignedChunkEvents]],
NotUsed,
] = {
implicit val traceContext: TraceContext = TraceContext.empty
Flow[Traced[OrderedBlockUpdate[UnsignedChunkEvents]]].mapAsyncAndDrainUS(parallelism =
chunkSigningParallelism
)(
_.traverseWithTraceContext { implicit traceContext => update =>
update match {
case chunk: ChunkUpdate[UnsignedChunkEvents] =>
chunk.events.parTraverse(bug.signChunkEvents).map(signed => chunk.copy(events = signed))
case complete: CompleteBlockUpdate => FutureUnlessShutdown.pure(complete)
}
}
fut.map(newHead => newHead -> Traced(newHead.block.lastTs))
)
}
override def applyBlockUpdate
: Flow[Traced[BlockUpdate[SignedChunkEvents]], Traced[CantonTimestamp], NotUsed] = {
implicit val traceContext = TraceContext.empty
Flow[Traced[BlockUpdate[SignedChunkEvents]]].statefulMapAsync(getHeadState) {
(priorHead, update) =>
implicit val traceContext = update.traceContext
val fut = update.value match {
case LocalBlockUpdate(local) =>
handleLocalEvent(priorHead, local)(TraceContext.todo)
case chunk: ChunkUpdate[SignedChunkEvents] =>
handleChunkUpdate(priorHead, chunk)(TraceContext.todo)
case complete: CompleteBlockUpdate =>
handleComplete(priorHead, complete.block)(TraceContext.todo)
}
fut.map(newHead => newHead -> Traced(newHead.block.lastTs))
}
}
@ -401,8 +430,8 @@ class BlockSequencerStateManager(
}.discard
)
private def handleChunkUpdate(priorHead: HeadState, update: ChunkUpdate)(implicit
batchTraceContext: TraceContext
private def handleChunkUpdate(priorHead: HeadState, update: ChunkUpdate[SignedChunkEvents])(
implicit batchTraceContext: TraceContext
): Future[HeadState] = {
val priorState = priorHead.chunk
val chunkNumber = priorState.chunkNumber + 1
@ -411,7 +440,7 @@ class BlockSequencerStateManager(
s"newMembers in chunk $chunkNumber should be assigned a timestamp after the timestamp of the previous chunk or block",
)
assert(
update.signedEvents.view.flatMap(_.values.map(_.timestamp)).forall(_ > priorState.lastTs),
update.events.view.flatMap(_.timestamps).forall(_ > priorState.lastTs),
s"Events in chunk $chunkNumber have timestamp lower than in the previous chunk or block",
)
assert(
@ -423,8 +452,8 @@ class BlockSequencerStateManager(
def checkFirstSequencerCounters: Boolean = {
val firstSequencerCounterByMember =
update.signedEvents
.map(_.forgetNE.fmap(_.counter))
update.events
.map(_.counters)
.foldLeft(Map.empty[Member, SequencerCounter])(
MapsUtil.mergeWith(_, _)((first, _) => first)
)
@ -440,7 +469,7 @@ class BlockSequencerStateManager(
)
val lastTs =
(update.signedEvents.view.flatMap(_.values.map(_.timestamp)) ++
(update.events.view.flatMap(_.timestamps) ++
update.newMembers.values).maxOption.getOrElse(priorState.lastTs)
val newState = ChunkState(
@ -454,15 +483,15 @@ class BlockSequencerStateManager(
for {
_ <- store.partialBlockUpdate(
newMembers = update.newMembers,
events = update.signedEvents,
events = update.events.map(_.events),
acknowledgments = update.acknowledgements,
membersDisabled = Seq.empty,
inFlightAggregationUpdates = update.inFlightAggregationUpdates,
update.state.trafficState,
)
_ <- MonadUtil.sequentialTraverse[(Member, SequencerCounter), Future, Unit](
update.signedEvents
.flatMap(_.toSeq)
update.events
.flatMap(_.events)
.collect {
case (member, tombstone) if tombstone.isTombstone => member -> tombstone.counter
}
@ -694,6 +723,11 @@ class BlockSequencerStateManager(
object BlockSequencerStateManager {
/** Arbitrary number of parallelism for signing the sequenced events across chunks of blocks.
* Within a chunk, the parallelism is unbounded because we use `parTraverse`.
*/
val chunkSigningParallelism: Int = 10
def apply(
protocolVersion: ProtocolVersion,
domainId: DomainId,

View File

@ -7,19 +7,23 @@ import cats.syntax.functor.*
import com.daml.error.BaseError
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.SequencerCounter
import com.digitalasset.canton.crypto.SyncCryptoApi
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.block.BlockUpdateGenerator.SignedEvents
import com.digitalasset.canton.domain.block.BlockUpdateGenerator.EventsForSubmissionRequest
import com.digitalasset.canton.domain.block.data.{BlockInfo, BlockUpdateEphemeralState}
import com.digitalasset.canton.domain.sequencing.sequencer.InFlightAggregationUpdates
import com.digitalasset.canton.domain.sequencing.sequencer.block.BlockSequencer.LocalEvent
import com.digitalasset.canton.sequencing.protocol.SequencedEventTrafficState
import com.digitalasset.canton.topology.Member
import com.digitalasset.canton.util.MapsUtil
import scala.collection.MapView
/** Summarizes the updates that are to be persisted and signalled individually */
sealed trait BlockUpdate extends Product with Serializable
sealed trait BlockUpdate[+E <: ChunkEvents] extends Product with Serializable
/** Denotes an update that is generated from a block that went through ordering */
sealed trait OrderedBlockUpdate extends BlockUpdate
sealed trait OrderedBlockUpdate[+E <: ChunkEvents] extends BlockUpdate[E]
/** Signals that all updates in a block have been delivered as chunks.
* The [[com.digitalasset.canton.domain.block.data.BlockInfo]] must be consistent with
@ -32,7 +36,7 @@ sealed trait OrderedBlockUpdate extends BlockUpdate
* than the previous block
* The consistency conditions are checked in [[com.digitalasset.canton.domain.block.BlockSequencerStateManager]]'s `handleComplete`.
*/
final case class CompleteBlockUpdate(block: BlockInfo) extends OrderedBlockUpdate
final case class CompleteBlockUpdate(block: BlockInfo) extends OrderedBlockUpdate[Nothing]
/** Changes from processing a consecutive part of updates within a block from the blockchain.
* We expect all values to be consistent with one another:
@ -45,21 +49,22 @@ final case class CompleteBlockUpdate(block: BlockInfo) extends OrderedBlockUpdat
* @param newMembers Members that were added along with the timestamp that they are considered registered from.
* @param acknowledgements The highest valid acknowledged timestamp for each member in the block.
* @param invalidAcknowledgements All invalid acknowledgement timestamps in the block for each member.
* @param signedEvents New sequenced events for members.
* @param events New sequenced events for members, and the snapshot to be used for signing them.
* @param inFlightAggregationUpdates The updates to the in-flight aggregation states.
* Includes the clean-up of expired aggregations.
* @param lastSequencerEventTimestamp The highest timestamp of an event in `events` addressed to the sequencer, if any.
* @param state Updated ephemeral state to be used for processing subsequent chunks.
* @param signingSnapshot Snapshot to be used for signing the events.
*/
final case class ChunkUpdate(
final case class ChunkUpdate[+E <: ChunkEvents](
newMembers: Map[Member, CantonTimestamp] = Map.empty,
acknowledgements: Map[Member, CantonTimestamp] = Map.empty,
invalidAcknowledgements: Seq[(Member, CantonTimestamp, BaseError)] = Seq.empty,
signedEvents: Seq[SignedEvents] = Seq.empty,
events: Seq[E] = Seq.empty,
inFlightAggregationUpdates: InFlightAggregationUpdates = Map.empty,
lastSequencerEventTimestamp: Option[CantonTimestamp],
state: BlockUpdateEphemeralState,
) extends OrderedBlockUpdate {
) extends OrderedBlockUpdate[E] {
// ensure that all new members appear in the ephemeral state
require(
newMembers.keys.forall(state.registeredMembers.contains),
@ -67,22 +72,23 @@ final case class ChunkUpdate(
)
// check all events are from registered members
require(
signedEvents.view.flatMap(_.keys).forall(state.registeredMembers.contains),
events.view.flatMap(_.members).forall(state.registeredMembers.contains),
"events must be for registered members",
)
// check the counters assigned for each member are continuous
def isContinuous(counters: Seq[SequencerCounter]): Boolean =
NonEmpty.from(counters) match {
case None => true
case Some(countersNE) =>
val head = countersNE.head1
val expectedCounters = head until (head + countersNE.size)
counters == expectedCounters
}
locally {
val counters = signedEvents
.map(_.forgetNE.fmap(event => Seq(event.counter)))
def isContinuous(counters: Seq[SequencerCounter]): Boolean =
NonEmpty.from(counters) match {
case None => true
case Some(countersNE) =>
val head = countersNE.head1
val expectedCounters = head until (head + countersNE.size)
counters == expectedCounters
}
val counters = events
.map(_.counters.fmap(Seq(_)))
.foldLeft(Map.empty[Member, Seq[SequencerCounter]])(
MapsUtil.mergeWith(_, _)(_ ++ _)
)
@ -90,9 +96,36 @@ final case class ChunkUpdate(
counters.values.forall(isContinuous),
s"Non-continuous counters: $counters",
)
// The other consistency conditions are checked in `BlockSequencerStateManager.handleChunkUpdate`
}
// The other consistency conditions are checked in `BlockSequencerStateManager.handleChunkUpdate`
}
/** Denotes an update to the persisted state that is caused by a local event that has not gone through ordering */
final case class LocalBlockUpdate(local: LocalEvent) extends BlockUpdate
final case class LocalBlockUpdate(local: LocalEvent) extends BlockUpdate[Nothing]
sealed trait ChunkEvents extends Product with Serializable {
def members: Set[Member]
def counters: Map[Member, SequencerCounter]
def timestamps: Iterable[CantonTimestamp]
}
final case class UnsignedChunkEvents(
sender: Member,
events: EventsForSubmissionRequest,
signingSnapshot: SyncCryptoApi,
sequencingTimestamp: CantonTimestamp,
sequencingSnapshot: SyncCryptoApi,
trafficStates: MapView[Member, SequencedEventTrafficState],
) extends ChunkEvents {
override def members: Set[Member] = events.keySet
override def counters: Map[Member, SequencerCounter] = events.fmap(_.counter)
override def timestamps: Iterable[CantonTimestamp] = events.values.map(_.timestamp)
}
final case class SignedChunkEvents(
events: BlockUpdateGenerator.SignedEvents
) extends ChunkEvents {
override def members: Set[Member] = events.keySet
override def counters: Map[Member, SequencerCounter] = events.fmap(_.counter)
override def timestamps: Iterable[CantonTimestamp] = events.values.map(_.timestamp)
}

View File

@ -13,7 +13,6 @@ import cats.syntax.functorFilter.*
import cats.syntax.parallel.*
import cats.syntax.traverse.*
import com.daml.error.BaseError
import com.daml.nonempty.catsinstances.*
import com.daml.nonempty.{NonEmpty, NonEmptyUtil}
import com.digitalasset.canton.SequencerCounter
import com.digitalasset.canton.config.CantonRequireTypes.String73
@ -24,7 +23,6 @@ import com.digitalasset.canton.crypto.{
SyncCryptoClient,
}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.block.BlockUpdateGenerator.BlockChunk
import com.digitalasset.canton.domain.block.LedgerBlockEvent.*
import com.digitalasset.canton.domain.block.data.{
BlockEphemeralState,
@ -80,6 +78,8 @@ import scala.concurrent.{ExecutionContext, Future}
* (and especially that events are always read in the same order).
*/
trait BlockUpdateGenerator {
import BlockUpdateGenerator.*
type InternalState
def internalStateFor(state: BlockEphemeralState): InternalState
@ -93,12 +93,19 @@ trait BlockUpdateGenerator {
def processBlockChunk(state: InternalState, chunk: BlockChunk)(implicit
ec: ExecutionContext,
traceContext: TraceContext,
): FutureUnlessShutdown[(InternalState, OrderedBlockUpdate)]
): FutureUnlessShutdown[(InternalState, OrderedBlockUpdate[UnsignedChunkEvents])]
def signChunkEvents(events: UnsignedChunkEvents)(implicit
ec: ExecutionContext,
traceContext: TraceContext,
): FutureUnlessShutdown[SignedChunkEvents]
}
object BlockUpdateGenerator {
type SignedEvents = NonEmpty[Map[Member, OrdinarySerializedEvent]]
type EventsForSubmissionRequest = Map[Member, SequencedEvent[ClosedEnvelope]]
type SignedEvents = Map[Member, OrdinarySerializedEvent]
sealed trait BlockChunk extends Product with Serializable
final case class NextChunk(
@ -118,6 +125,7 @@ class BlockUpdateGeneratorImpl(
rateLimitManager: SequencerRateLimitManager,
orderingTimeFixMode: OrderingTimeFixMode,
protected val loggerFactory: NamedLoggerFactory,
unifiedSequencer: Boolean,
)(implicit val closeContext: CloseContext)
extends BlockUpdateGenerator
with NamedLogging {
@ -180,7 +188,7 @@ class BlockUpdateGeneratorImpl(
override final def processBlockChunk(state: InternalState, chunk: BlockChunk)(implicit
ec: ExecutionContext,
traceContext: TraceContext,
): FutureUnlessShutdown[(InternalState, OrderedBlockUpdate)] = {
): FutureUnlessShutdown[(InternalState, OrderedBlockUpdate[UnsignedChunkEvents])] = {
chunk match {
case EndOfBlock(height) =>
val newState = state.copy(lastBlockTs = state.lastChunkTs)
@ -200,7 +208,7 @@ class BlockUpdateGeneratorImpl(
)(implicit
ec: ExecutionContext,
traceContext: TraceContext,
): FutureUnlessShutdown[(State, ChunkUpdate)] = {
): FutureUnlessShutdown[(State, ChunkUpdate[UnsignedChunkEvents])] = {
val (lastTs, revFixedTsChanges) =
// With this logic, we assign to the initial non-Send events the same timestamp as for the last
// block. This means that we will include these events in the ephemeral state of the previous block
@ -309,7 +317,7 @@ class BlockUpdateGeneratorImpl(
}
result <- MonadUtil.foldLeftM(
(
Seq.empty[SignedEvents],
Seq.empty[UnsignedChunkEvents],
Map.empty[AggregationId, InFlightAggregationUpdate],
stateWithNewMembers.ephemeral,
Option.empty[CantonTimestamp],
@ -513,7 +521,7 @@ class BlockUpdateGeneratorImpl(
latestSequencerEventTimestamp: Option[CantonTimestamp],
)(
acc: (
Seq[SignedEvents],
Seq[UnsignedChunkEvents],
InFlightAggregationUpdates,
BlockUpdateEphemeralState,
Option[CantonTimestamp],
@ -523,7 +531,7 @@ class BlockUpdateGeneratorImpl(
ec: ExecutionContext
): FutureUnlessShutdown[
(
Seq[SignedEvents],
Seq[UnsignedChunkEvents],
InFlightAggregationUpdates,
BlockUpdateEphemeralState,
Option[CantonTimestamp],
@ -536,7 +544,7 @@ class BlockUpdateGeneratorImpl(
sequencingSnapshot,
signingSnapshotO,
) = sequencedSubmissionRequest
implicit val traceContext = sequencedSubmissionRequest.traceContext
implicit val traceContext: TraceContext = sequencedSubmissionRequest.traceContext
ErrorUtil.requireState(
sequencerEventTimestampSoFarO.isEmpty,
@ -549,7 +557,7 @@ class BlockUpdateGeneratorImpl(
sequencerEventTimestampO: Option[CantonTimestamp],
): FutureUnlessShutdown[
(
Seq[SignedEvents],
Seq[UnsignedChunkEvents],
InFlightAggregationUpdates,
BlockUpdateEphemeralState,
Option[CantonTimestamp],
@ -595,19 +603,22 @@ class BlockUpdateGeneratorImpl(
sequencingSnapshot,
latestSequencerEventTimestamp,
)
signedEvents <- signEvents(
} yield {
val unsignedEvents = UnsignedChunkEvents(
signedSubmissionRequest.content.sender,
deliverEventsNE,
signingSnapshot,
trafficUpdatedState,
sequencingTimestamp,
sequencingSnapshot,
trafficUpdatedState.trafficState.view.mapValues(_.toSequencedEventTrafficState),
)
} yield (
signedEvents +: reversedEvents,
newInFlightAggregationUpdates,
trafficUpdatedState,
sequencerEventTimestampO,
)
(
unsignedEvents +: reversedEvents,
newInFlightAggregationUpdates,
trafficUpdatedState,
sequencerEventTimestampO,
)
}
}
}
@ -1343,25 +1354,26 @@ class BlockUpdateGeneratorImpl(
} yield participantsOfParty ++ mediatorsOfDomain ++ sequencersOfDomain ++ allRecipients
}
private def signEvents(
events: NonEmpty[Map[Member, SequencedEvent[ClosedEnvelope]]],
snapshot: SyncCryptoApi,
ephemeralState: BlockUpdateEphemeralState,
// sequencingTimestamp and sequencingSnapshot used for tombstones when snapshot does not include sequencer signing keys
sequencingTimestamp: CantonTimestamp,
sequencingSnapshot: SyncCryptoApi,
)(implicit
override def signChunkEvents(unsignedEvents: UnsignedChunkEvents)(implicit
ec: ExecutionContext,
traceContext: TraceContext,
executionContext: ExecutionContext,
): FutureUnlessShutdown[SignedEvents] = {
(if (maybeLowerTopologyTimestampBound.forall(snapshot.ipsSnapshot.timestamp >= _)) {
): FutureUnlessShutdown[SignedChunkEvents] = {
val UnsignedChunkEvents(
sender,
events,
signingSnapshot,
sequencingTimestamp,
sequencingSnapshot,
trafficStates,
) = unsignedEvents
(if (maybeLowerTopologyTimestampBound.forall(signingSnapshot.ipsSnapshot.timestamp >= _)) {
FutureUnlessShutdown.outcomeF(
events.toSeq.toNEF
events.toSeq
.parTraverse { case (member, event) =>
SignedContent
.create(
snapshot.pureCrypto,
snapshot,
signingSnapshot.pureCrypto,
signingSnapshot,
event,
None,
HashPurpose.SequencedEventSignature,
@ -1373,10 +1385,17 @@ class BlockUpdateGeneratorImpl(
)
)
.map { signedContent =>
member -> OrdinarySequencedEvent(
signedContent,
ephemeralState.trafficState.get(member).map(_.toSequencedEventTrafficState),
)(traceContext)
val trafficStateO = Option
.when(!unifiedSequencer || member == sender) { // only include traffic state for the sender
trafficStates
.getOrElse(
member,
ErrorUtil.invalidState(s"Sender $sender unknown by rate limiter."),
)
}
member -> OrdinarySequencedEvent(signedContent, trafficStateO)(
traceContext
)
}
}
)
@ -1386,7 +1405,7 @@ class BlockUpdateGeneratorImpl(
// subscribers that this sequencer is not in a position to serve the events behind these sequencer counters.
// Comparing against the lower signing timestamp bound prevents tombstones in "steady-state" sequencing beyond
// "soon" after initial sequencer onboarding. See #13609
events.toSeq.toNEF.parTraverse { case (member, event) =>
events.toSeq.parTraverse { case (member, event) =>
logger.info(
s"Sequencer signing key not available on behalf of ${member.uid.id} at ${event.timestamp} and ${event.counter}. Sequencing tombstone."
)
@ -1424,7 +1443,7 @@ class BlockUpdateGeneratorImpl(
}
}
})
.map(_.fromNEF.toMap)
.map(signedEvents => SignedChunkEvents(signedEvents.toMap))
}
private def invalidSubmissionRequest(
@ -1520,6 +1539,13 @@ class BlockUpdateGeneratorImpl(
tc: TraceContext,
): EitherT[FutureUnlessShutdown, SubmissionRequestOutcome, BlockUpdateEphemeralState] = {
val newStateOF = for {
_ <- OptionT.when[FutureUnlessShutdown, Unit]({
request.sender match {
// Sequencers are not rate limited
case _: SequencerId => false
case _ => true
}
})(())
parameters <- OptionT(
sequencingSnapshot.ipsSnapshot.trafficControlParameters(protocolVersion)
)
@ -1608,7 +1634,8 @@ class BlockUpdateGeneratorImpl(
}
object BlockUpdateGeneratorImpl {
private type EventsForSubmissionRequest = Map[Member, SequencedEvent[ClosedEnvelope]]
import BlockUpdateGenerator.*
/** Describes the outcome of processing a submission request:
*

View File

@ -103,6 +103,7 @@ final case class MediatorNodeParameterConfig(
override val batching: BatchingConfig = BatchingConfig(),
override val caching: CachingConfigs = CachingConfigs(),
override val useNewTrafficControl: Boolean = false,
override val useUnifiedSequencer: Boolean = false,
) extends ProtocolConfig
with LocalNodeParametersConfig

View File

@ -256,6 +256,7 @@ class SequencerRuntime(
)
// hook for registering enterprise administration service if in an appropriate environment
additionalAdminServiceFactory(sequencer).foreach(register)
sequencer.adminServices.foreach(register)
}
def domainServices(implicit ec: ExecutionContext): Seq[ServerServiceDefinition] = Seq(

View File

@ -27,5 +27,6 @@ final case class SequencerNodeParameterConfig(
override val batching: BatchingConfig = BatchingConfig(),
override val caching: CachingConfigs = CachingConfigs(),
override val useNewTrafficControl: Boolean = false,
override val useUnifiedSequencer: Boolean = false,
) extends ProtocolConfig
with LocalNodeParametersConfig

View File

@ -71,6 +71,7 @@ object DatabaseSequencer {
cryptoApi: DomainSyncCryptoClient,
metrics: SequencerMetrics,
loggerFactory: NamedLoggerFactory,
unifiedSequencer: Boolean,
)(implicit
ec: ExecutionContext,
tracer: Tracer,
@ -107,6 +108,7 @@ object DatabaseSequencer {
cryptoApi,
metrics,
loggerFactory,
unifiedSequencer,
)
}
}
@ -130,6 +132,7 @@ class DatabaseSequencer(
cryptoApi: DomainSyncCryptoClient,
metrics: SequencerMetrics,
loggerFactory: NamedLoggerFactory,
unifiedSequencer: Boolean,
)(implicit ec: ExecutionContext, tracer: Tracer, materializer: Materializer)
extends BaseSequencer(
DomainTopologyManagerId(domainId),

View File

@ -30,6 +30,7 @@ import com.digitalasset.canton.topology.Member
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.traffic.TrafficControlErrors.TrafficControlError
import com.digitalasset.canton.util.EitherTUtil
import io.grpc.ServerServiceDefinition
import org.apache.pekko.Done
import org.apache.pekko.stream.KillSwitch
import org.apache.pekko.stream.scaladsl.Source
@ -162,6 +163,8 @@ trait Sequencer
/** Return the rate limit manager for this sequencer, if it exists.
*/
def rateLimitManager: Option[SequencerRateLimitManager] = None
def adminServices: Seq[ServerServiceDefinition] = Seq.empty
}
/** Sequencer pruning interface.

View File

@ -95,6 +95,7 @@ class CommunityDatabaseSequencerFactory(
domainSyncCryptoApi,
metrics,
loggerFactory,
nodeParameters.useUnifiedSequencer,
)
Future.successful(config.testingInterceptor.map(_(clock)(sequencer)(ec)).getOrElse(sequencer))

View File

@ -53,6 +53,7 @@ import com.digitalasset.canton.traffic.{
import com.digitalasset.canton.util.EitherTUtil.condUnitET
import com.digitalasset.canton.util.{EitherTUtil, PekkoUtil, SimpleExecutionQueue}
import com.digitalasset.canton.version.ProtocolVersion
import io.grpc.ServerServiceDefinition
import io.opentelemetry.api.trace.Tracer
import org.apache.pekko.stream.*
import org.apache.pekko.stream.scaladsl.{Keep, Merge, Sink, Source}
@ -82,6 +83,7 @@ class BlockSequencer(
prettyPrinter: CantonPrettyPrinter,
metrics: SequencerMetrics,
loggerFactory: NamedLoggerFactory,
unifiedSequencer: Boolean,
)(implicit executionContext: ExecutionContext, materializer: Materializer, tracer: Tracer)
extends BaseSequencer(
DomainTopologyManagerId(domainId),
@ -124,6 +126,7 @@ class BlockSequencer(
blockRateLimitManager,
orderingTimeFixMode,
loggerFactory,
unifiedSequencer = unifiedSequencer,
)(CloseContext(cryptoApi))
val driverSource = blockSequencerOps
@ -204,6 +207,8 @@ class BlockSequencer(
sendAsyncSignedInternal(signedContent)
}
override def adminServices: Seq[ServerServiceDefinition] = blockSequencerOps.adminServices
override protected def sendAsyncSignedInternal(
signedSubmission: SignedContent[SubmissionRequest]
)(implicit traceContext: TraceContext): EitherT[Future, SendAsyncError, Unit] = {

View File

@ -17,6 +17,7 @@ import com.digitalasset.canton.sequencing.protocol.{
}
import com.digitalasset.canton.topology.Member
import com.digitalasset.canton.tracing.TraceContext
import io.grpc.ServerServiceDefinition
import org.apache.pekko.stream.KillSwitch
import org.apache.pekko.stream.scaladsl.Source
@ -66,4 +67,6 @@ trait BlockSequencerOps extends AutoCloseable {
def health(implicit
traceContext: TraceContext
): Future[SequencerDriverHealthStatus]
def adminServices: Seq[ServerServiceDefinition] = Seq.empty
}

View File

@ -109,6 +109,7 @@ class DriverBlockSequencerFactory[C](
nodeParameters.loggingConfig.api.printer,
metrics,
domainLoggerFactory,
unifiedSequencer = nodeParameters.useUnifiedSequencer,
)
}

View File

@ -17,6 +17,7 @@ import com.digitalasset.canton.sequencing.protocol.*
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.version.ProtocolVersion
import io.grpc.ServerServiceDefinition
import org.apache.pekko.stream.*
import org.apache.pekko.stream.scaladsl.Source
@ -59,4 +60,6 @@ class DriverBlockSequencerOps(
override def close(): Unit =
driver.close()
override def adminServices: Seq[ServerServiceDefinition] = driver.adminServices
}

View File

@ -0,0 +1,89 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.sequencing.sequencer.block.bftordering.admin
import cats.implicits.*
import com.digitalasset.canton.config.RequireTypes.Port
import com.digitalasset.canton.networking.Endpoint
import com.digitalasset.canton.sequencer.admin.v30.{
GetPeerNetworkStatusResponse,
PeerEndpoint,
PeerEndpointHealth as ProtoPeerEndpointHealth,
PeerEndpointHealthStatus as ProtoPeerEndpointHealthStatus,
PeerEndpointStatus as ProtoPeerEndpointStatus,
}
object EnterpriseSequencerBftAdminData {
def endpointToProto(endpoint: Endpoint): PeerEndpoint =
PeerEndpoint.of(endpoint.host, endpoint.port.unwrap)
def endpointFromProto(peerEndpoint: PeerEndpoint): Either[String, Endpoint] =
for {
port <- Port.create(peerEndpoint.port).fold(e => Left(e.message), Right(_))
} yield Endpoint(peerEndpoint.host, port)
sealed trait PeerEndpointHealthStatus extends Product with Serializable
object PeerEndpointHealthStatus {
case object Unknown extends PeerEndpointHealthStatus
case object Unauthenticated extends PeerEndpointHealthStatus
case object Authenticated extends PeerEndpointHealthStatus
}
final case class PeerEndpointHealth(status: PeerEndpointHealthStatus, description: Option[String])
final case class PeerEndpointStatus(endpoint: Endpoint, health: PeerEndpointHealth) {
def toProto: ProtoPeerEndpointStatus =
ProtoPeerEndpointStatus.of(
Some(endpointToProto(endpoint)),
Some(
ProtoPeerEndpointHealth.of(
health.status match {
case PeerEndpointHealthStatus.Unknown =>
ProtoPeerEndpointHealthStatus.PEER_ENDPOINT_HEALTH_STATUS_UNKNOWN_ENDPOINT
case PeerEndpointHealthStatus.Unauthenticated =>
ProtoPeerEndpointHealthStatus.PEER_ENDPOINT_HEALTH_STATUS_UNAUTHENTICATED
case PeerEndpointHealthStatus.Authenticated =>
ProtoPeerEndpointHealthStatus.PEER_ENDPOINT_HEALTH_STATUS_AUTHENTICATED
},
health.description,
)
),
)
}
final case class PeerNetworkStatus(endpointStatuses: Seq[PeerEndpointStatus]) {
def +(status: PeerEndpointStatus): PeerNetworkStatus =
copy(endpointStatuses = endpointStatuses :+ status)
}
object PeerNetworkStatus {
def fromProto(response: GetPeerNetworkStatusResponse): Either[String, PeerNetworkStatus] =
response.statuses
.map { status =>
for {
protoEndpoint <- status.endpoint.toRight("Endpoint is missing")
port <- Port.create(protoEndpoint.port).fold(e => Left(e.message), Right(_))
endpoint = Endpoint(protoEndpoint.host, port)
protoHealth <- status.health.toRight("Health is missing")
healthDescription = protoHealth.description
health <- protoHealth.status match {
case ProtoPeerEndpointHealthStatus.PEER_ENDPOINT_HEALTH_STATUS_UNKNOWN_ENDPOINT =>
Right(PeerEndpointHealthStatus.Unknown)
case ProtoPeerEndpointHealthStatus.PEER_ENDPOINT_HEALTH_STATUS_UNAUTHENTICATED =>
Right(PeerEndpointHealthStatus.Unauthenticated)
case ProtoPeerEndpointHealthStatus.PEER_ENDPOINT_HEALTH_STATUS_AUTHENTICATED =>
Right(PeerEndpointHealthStatus.Authenticated)
case ProtoPeerEndpointHealthStatus.Unrecognized(unrecognizedValue) =>
Left(s"Health status is unrecognised: $unrecognizedValue")
case ProtoPeerEndpointHealthStatus.PEER_ENDPOINT_HEALTH_STATUS_UNSPECIFIED =>
Left("Health status is unspecified")
}
} yield PeerEndpointStatus(endpoint, PeerEndpointHealth(health, healthDescription))
}
.sequence
.map(PeerNetworkStatus(_))
}
}

View File

@ -45,6 +45,7 @@ abstract class DatabaseSequencerApiTest extends NonBftDomainSequencerApiTest {
crypto,
metrics,
loggerFactory,
unifiedSequencer = testedUseUnifiedSequencer,
)(executorService, tracer, materializer)
}

View File

@ -51,6 +51,7 @@ class DatabaseSequencerSnapshottingTest extends SequencerApiTest {
crypto,
metrics,
loggerFactory,
unifiedSequencer = testedUseUnifiedSequencer,
)(executorService, tracer, materializer)
}

View File

@ -101,6 +101,7 @@ class SequencerTest extends FixtureAsyncWordSpec with BaseTest with HasExecution
crypto,
metrics,
loggerFactory,
unifiedSequencer = testedUseUnifiedSequencer,
)(parallelExecutionContext, tracer, materializer)
def readAsSeq(

View File

@ -25,6 +25,7 @@ import com.digitalasset.canton.domain.block.{
OrderedBlockUpdate,
RawLedgerBlock,
SequencerDriverHealthStatus,
SignedChunkEvents,
}
import com.digitalasset.canton.domain.metrics.SequencerMetrics
import com.digitalasset.canton.domain.sequencing.sequencer.block.BlockSequencerFactory.OrderingTimeFixMode
@ -178,6 +179,7 @@ class BlockSequencerTest
),
SequencerMetrics.noop(this.getClass.getName),
loggerFactory,
unifiedSequencer = testedUseUnifiedSequencer,
)
override def close(): Unit = {
@ -233,11 +235,12 @@ class BlockSequencerTest
override def processBlock(
bug: BlockUpdateGenerator
): Flow[BlockEvents, Traced[OrderedBlockUpdate], NotUsed] =
): Flow[BlockEvents, Traced[OrderedBlockUpdate[SignedChunkEvents]], NotUsed] =
Flow[BlockEvents].mapConcat(_ => Seq.empty)
override def applyBlockUpdate: Flow[Traced[BlockUpdate], Traced[CantonTimestamp], NotUsed] =
Flow[Traced[BlockUpdate]].map(_.map(_ => CantonTimestamp.MinValue))
override def applyBlockUpdate
: Flow[Traced[BlockUpdate[SignedChunkEvents]], Traced[CantonTimestamp], NotUsed] =
Flow[Traced[BlockUpdate[SignedChunkEvents]]].map(_.map(_ => CantonTimestamp.MinValue))
override def getHeadState: BlockSequencerStateManager.HeadState =
BlockSequencerStateManager.HeadState(

View File

@ -25,7 +25,7 @@ import com.digitalasset.canton.time.TimeProvider
import com.digitalasset.canton.tracing.{TraceContext, Traced}
import com.digitalasset.canton.util.SimpleExecutionQueue
import com.google.protobuf.ByteString
import io.grpc.BindableService
import io.grpc.ServerServiceDefinition
import org.apache.pekko.stream.scaladsl.{Keep, Source}
import org.apache.pekko.stream.{KillSwitch, KillSwitches}
@ -54,7 +54,7 @@ class ReferenceBlockOrderer(
loggerFactory,
)
override def grpcServices: Seq[BindableService] = Seq()
override def grpcServices: Seq[ServerServiceDefinition] = Seq()
override def sendRequest(
tag: String,

View File

@ -11,7 +11,7 @@ import com.digitalasset.canton.config.NonNegativeDuration
import com.digitalasset.canton.ledger.api.auth.AuthorizationError.Expired
import com.digitalasset.canton.ledger.error.groups.AuthorizationChecksErrors
import com.digitalasset.canton.ledger.localstore.api.UserManagementStore
import com.digitalasset.canton.logging.ErrorLoggingContext
import com.digitalasset.canton.logging.{ErrorLoggingContext, SuppressionRule}
import io.grpc.StatusRuntimeException
import io.grpc.stub.ServerCallStreamObserver
import org.apache.pekko.actor.{Cancellable, Scheduler}
@ -19,6 +19,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchersSugar, MockitoSugar}
import org.scalatest.concurrent.{Eventually, IntegrationPatience}
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import org.slf4j.event.Level.INFO
import java.time.{Clock, Duration, Instant, ZoneId}
import scala.concurrent.ExecutionContext
@ -145,7 +146,7 @@ class OngoingAuthorizationObserverSpec
order.verify(delegate, times(1)).onError(captor.capture())
order.verifyNoMoreInteractions()
loggerFactory.assertLogs(
loggerFactory.assertLogs(SuppressionRule.Level(INFO))(
within = {
// Scheduled task is cancelled
verify(cancellableMock, times(1)).cancel()
@ -157,7 +158,7 @@ class OngoingAuthorizationObserverSpec
)
},
assertions =
_.warningMessage should include("ACCESS_TOKEN_EXPIRED(6,0): Claims were valid until "),
_.infoMessage should include("ACCESS_TOKEN_EXPIRED(2,0): Claims were valid until "),
)
// onError has already been called by tested implementation so subsequent onNext, onError and onComplete

View File

@ -144,7 +144,7 @@ class StreamAuthorizationComponentSpec
.failed
.map { t =>
// the client stream should be cancelled with error
t.getMessage should include("UNAUTHENTICATED")
t.getMessage should include("ACCESS_TOKEN_EXPIRED")
// the server stream should be completed
fixture.waitForServerPekkoStream shouldBe None
}

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
name: carbonv1-tests
source: .
version: 3.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
name: carbonv2-tests
data-dependencies:
- ../../../../scala-2.13/resource_managed/main/carbonv1-tests-3.0.0.dar

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
name: experimental-tests
source: .
version: 3.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
name: model-tests
source: .
version: 3.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
name: package-management-tests
source: .
version: 3.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
name: semantic-tests
source: .
version: 3.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
name: upgrade-tests
source: .
version: 1.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
name: upgrade-tests
source: .
version: 2.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
name: upgrade-tests
source: .
version: 3.0.0

View File

@ -71,7 +71,7 @@ object AuthorizationChecksErrors extends AuthorizationChecksErrorGroup {
object AccessTokenExpired
extends ErrorCode(
id = "ACCESS_TOKEN_EXPIRED",
ErrorCategory.AuthInterceptorInvalidAuthenticationCredentials,
ErrorCategory.ContentionOnSharedResources,
) {
final case class Reject(override val cause: String)(implicit
loggingContext: ContextualizedErrorLogger

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
build-options:
- --target=2.1
name: JsonEncodingTest

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
build-options:
- --target=2.dev
name: JsonEncodingTestDev

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240318.12913.0.v1c415c97
sdk-version: 3.0.0-snapshot.20240319.12919.0.v2e579bcd
build-options:
- --target=2.1
name: AdminWorkflows

View File

@ -69,6 +69,7 @@ object ParticipantNodeParameters {
dbMigrateAndStart = false,
useNewTrafficControl = false,
exitOnFatalFailures = true,
useUnifiedSequencer = false,
),
partyChangeNotification = PartyNotificationConfig.Eager,
adminWorkflow = AdminWorkflowConfig(

View File

@ -369,6 +369,7 @@ final case class ParticipantNodeParameterConfig(
config.NonNegativeFiniteDuration.ofSeconds(0),
override val useNewTrafficControl: Boolean = false,
disableUpgradeValidation: Boolean = false,
override val useUnifiedSequencer: Boolean = false,
) extends LocalNodeParametersConfig
/** Parameters for the participant node's stores

View File

@ -610,6 +610,7 @@ object ProcessingSteps {
def requestCounter: RequestCounter
def requestSequencerCounter: SequencerCounter
def mediator: MediatorsOfDomain
def locallyRejected: Boolean
def rootHashO: Option[RootHash]
}
@ -617,8 +618,8 @@ object ProcessingSteps {
object PendingRequestData {
def unapply(
arg: PendingRequestData
): Some[(RequestCounter, SequencerCounter, MediatorsOfDomain)] = {
Some((arg.requestCounter, arg.requestSequencerCounter, arg.mediator))
): Some[(RequestCounter, SequencerCounter, MediatorsOfDomain, Boolean)] = {
Some((arg.requestCounter, arg.requestSequencerCounter, arg.mediator, arg.locallyRejected))
}
}
}

View File

@ -40,7 +40,6 @@ import com.digitalasset.canton.participant.store.SyncDomainEphemeralState
import com.digitalasset.canton.participant.sync.SyncServiceError.SyncServiceAlarm
import com.digitalasset.canton.participant.sync.TimestampedEvent
import com.digitalasset.canton.protocol.*
import com.digitalasset.canton.protocol.messages.Verdict.Approve
import com.digitalasset.canton.protocol.messages.*
import com.digitalasset.canton.sequencing.client.*
import com.digitalasset.canton.sequencing.protocol.*
@ -58,8 +57,8 @@ import com.digitalasset.canton.{DiscardOps, LfPartyId, RequestCounter, Sequencer
import com.google.common.annotations.VisibleForTesting
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.{ExecutionContext, Future}
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import scala.concurrent.{ExecutionContext, Future, blocking}
import scala.util.{Failure, Success}
/** The [[ProtocolProcessor]] combines [[ProcessingSteps]] specific to a particular kind of request
@ -1038,7 +1037,7 @@ abstract class ProtocolProcessor[
pendingDataAndResponsesAndTimeoutEvent <-
if (isCleanReplay(rc)) {
val pendingData = CleanReplayData(rc, sc, mediator)
val pendingData = CleanReplayData(rc, sc, mediator, locallyRejected = false)
val responses = Seq.empty[(ConfirmationResponse, Recipients)]
val timeoutEvent = Either.right(Option.empty[TimestampedEvent])
EitherT.pure[FutureUnlessShutdown, steps.RequestError](
@ -1067,6 +1066,7 @@ abstract class ProtocolProcessor[
pendingRequestCounter,
pendingSequencerCounter,
_,
_locallyRejected,
) = pendingData
_ = if (
pendingRequestCounter != rc
@ -1354,7 +1354,7 @@ abstract class ProtocolProcessor[
): Future[Boolean] = Future.successful {
val invalidO = for {
case WrappedPendingRequestData(pendingRequestData) <- Some(pendingRequestDataOrReplayData)
case PendingTransaction(txId, _, _, _, _, requestTime, _, _, _, _) <- Some(
case PendingTransaction(txId, _, _, _, _, requestTime, _, _, _, _, _) <- Some(
pendingRequestData
)
@ -1445,11 +1445,14 @@ abstract class ProtocolProcessor[
steps.requestType.PendingRequestData
],
)(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, steps.ResultError, Unit] = {
val PendingRequestData(requestCounter, requestSequencerCounter, _) =
val PendingRequestData(requestCounter, requestSequencerCounter, _, locallyRejected) =
pendingRequestDataOrReplayData
val cleanReplay = isCleanReplay(requestCounter, pendingRequestDataOrReplayData)
val pendingSubmissionDataO = pendingSubmissionDataForRequest(pendingRequestDataOrReplayData)
// TODO(i15395): handle this more gracefully
checkContradictoryMediatorApprove(locallyRejected, verdict)
for {
commitAndEvent <- pendingRequestDataOrReplayData match {
case WrappedPendingRequestData(pendingRequestData) =>
@ -1478,11 +1481,7 @@ abstract class ProtocolProcessor[
(commitSetOF, contractsToBeStored, eventO)
}
case _: CleanReplayData =>
val commitSetOF = verdict match {
case _: Approve => Some(Future.successful(CommitSet.empty))
case _ => None
}
val commitSetOF = Option.when(verdict.isApprove)(Future.successful(CommitSet.empty))
val eventO = None
EitherT.pure[FutureUnlessShutdown, steps.ResultError](
@ -1559,6 +1558,19 @@ abstract class ProtocolProcessor[
} yield ()
}
private def checkContradictoryMediatorApprove(
locallyRejected: Boolean,
verdict: Verdict,
)(implicit traceContext: TraceContext): Unit = {
if (
isApprovalContradictionCheckEnabled(
loggerFactory.name
) && verdict.isApprove && locallyRejected
) {
ErrorUtil.invalidState(s"Mediator approved a request that we have locally rejected")
}
}
private[this] def logResultWarnings(
resultTimestamp: CantonTimestamp,
result: EitherT[
@ -1761,6 +1773,45 @@ abstract class ProtocolProcessor[
}
object ProtocolProcessor {
private val approvalContradictionCheckIsEnabled = new AtomicReference[Boolean](true)
private val testsAllowedToDisableApprovalContradictionCheck = Seq(
"LedgerAuthorizationReferenceXIntegrationTestDefault",
"LedgerAuthorizationBftOrderingXIntegrationTestDefault",
"PackageVettingIntegrationTestDefault",
)
private[protocol] def isApprovalContradictionCheckEnabled(loggerName: String): Boolean = {
val checkIsEnabled = approvalContradictionCheckIsEnabled.get()
// Ensure check is enabled except for tests allowed to disable it
checkIsEnabled || !testsAllowedToDisableApprovalContradictionCheck.exists(loggerName.startsWith)
}
@VisibleForTesting
def withApprovalContradictionCheckDisabled[A](
loggerFactory: NamedLoggerFactory
)(body: => A): A = {
// Limit disabling the checks to specific tests
require(
testsAllowedToDisableApprovalContradictionCheck.exists(loggerFactory.name.startsWith),
"The approval contradiction check can only be disabled for some specific tests",
)
val logger = loggerFactory.getLogger(this.getClass)
blocking {
synchronized {
logger.info("Disabling approval contradiction check")
approvalContradictionCheckIsEnabled.set(false)
try {
body
} finally {
approvalContradictionCheckIsEnabled.set(true)
logger.info("Re-enabling approval contradiction check")
}
}
}
}
sealed trait PendingRequestDataOrReplayData[+A <: PendingRequestData]
extends PendingRequestData
@ -1776,6 +1827,8 @@ object ProtocolProcessor {
override def isCleanReplay: Boolean = false
override def mediator: MediatorsOfDomain = unwrap.mediator
override def locallyRejected: Boolean = unwrap.locallyRejected
override def rootHashO: Option[RootHash] = unwrap.rootHashO
}
@ -1783,6 +1836,7 @@ object ProtocolProcessor {
override val requestCounter: RequestCounter,
override val requestSequencerCounter: SequencerCounter,
override val mediator: MediatorsOfDomain,
override val locallyRejected: Boolean,
) extends PendingRequestDataOrReplayData[Nothing] {
override def isCleanReplay: Boolean = true

View File

@ -1008,6 +1008,7 @@ class TransactionProcessingSteps(
val pendingTransaction =
createPendingTransaction(
requestId,
responses,
transactionValidationResult,
rc,
sc,
@ -1093,6 +1094,7 @@ class TransactionProcessingSteps(
requestSequencerCounter,
transactionValidationResult,
_,
_locallyRejected,
) =
pendingTransaction
val submitterMetaO = transactionValidationResult.submitterMetadataO
@ -1155,6 +1157,7 @@ class TransactionProcessingSteps(
private[this] def createPendingTransaction(
id: RequestId,
responses: Seq[ConfirmationResponse],
transactionValidationResult: TransactionValidationResult,
rc: RequestCounter,
sc: SequencerCounter,
@ -1182,6 +1185,9 @@ class TransactionProcessingSteps(
replayCheckResult,
) = transactionValidationResult
// We consider that we rejected if at least one of the responses is not "approve'
val locallyRejected = responses.exists { response => !response.localVerdict.isApprove }
validation.PendingTransaction(
transactionId,
freshOwnTimelyTx,
@ -1193,6 +1199,7 @@ class TransactionProcessingSteps(
sc,
transactionValidationResult,
mediator,
locallyRejected,
)
}

View File

@ -42,6 +42,7 @@ import com.digitalasset.canton.participant.protocol.validation.{
}
import com.digitalasset.canton.participant.store.SyncDomainEphemeralState
import com.digitalasset.canton.participant.util.DAMLe
import com.digitalasset.canton.participant.util.DAMLe.PackageResolver
import com.digitalasset.canton.protocol.WellFormedTransaction.WithoutSuffixes
import com.digitalasset.canton.protocol.*
import com.digitalasset.canton.sequencing.client.{SendAsyncClientError, SequencerClient}
@ -68,6 +69,7 @@ class TransactionProcessor(
override protected val timeouts: ProcessingTimeout,
override protected val loggerFactory: NamedLoggerFactory,
futureSupervisor: FutureSupervisor,
packageResolver: PackageResolver,
enableContractUpgrading: Boolean,
)(implicit val ec: ExecutionContext)
extends ProtocolProcessor[
@ -90,8 +92,8 @@ class TransactionProcessor(
damle,
confirmationRequestFactory.transactionTreeFactory,
buildAuthenticator(crypto),
staticDomainParameters.protocolVersion,
participantId,
packageResolver,
enableContractUpgrading,
loggerFactory,
),

View File

@ -116,12 +116,6 @@ object TransactionTreeFactory {
)
}
final case class CommonMetadataError(message: String) extends TransactionTreeConversionError {
override def pretty: Pretty[CommonMetadataError] = prettyOfClass(
unnamedParam(_.message.unquoted)
)
}
final case class SubmitterMetadataError(message: String) extends TransactionTreeConversionError {
override def pretty: Pretty[SubmitterMetadataError] = prettyOfClass(
unnamedParam(_.message.unquoted)
@ -160,6 +154,14 @@ object TransactionTreeFactory {
prettyOfString(err => show"Some packages are not known to all informees.\n${err.unknownTo}")
}
final case class ConflictingPackagePreferenceError(
conflicts: Map[LfPackageName, Set[LfPackageId]]
) extends TransactionTreeConversionError {
override def pretty: Pretty[ConflictingPackagePreferenceError] = prettyOfString { err =>
show"Detected conflicting package-ids for the same package name\n${err.conflicts}"
}
}
final case class PackageUnknownTo(
packageId: LfPackageId,
participantId: ParticipantId,
@ -168,4 +170,5 @@ object TransactionTreeFactory {
show"Participant $participantId has not vetted ${put.packageId}"
}
}
}

View File

@ -474,8 +474,11 @@ abstract class TransactionTreeFactoryImpl(
protected def createActionDescription(
actionNode: LfActionNode,
seed: Option[LfHash],
packagePreference: Set[LfPackageId],
): ActionDescription =
checked(ActionDescription.tryFromLfActionNode(actionNode, seed, protocolVersion))
checked(
ActionDescription.tryFromLfActionNode(actionNode, seed, packagePreference, protocolVersion)
)
protected def createViewCommonData(
rootView: TransactionViewDecomposition.NewView,

View File

@ -11,9 +11,11 @@ import com.daml.lf.transaction.ContractStateMachine.KeyInactive
import com.daml.lf.transaction.Transaction.{KeyActive, KeyCreate, KeyInput, NegativeKeyLookup}
import com.daml.lf.transaction.{ContractKeyUniquenessMode, ContractStateMachine}
import com.digitalasset.canton.crypto.{HashOps, HmacOps, Salt, SaltSeed}
import com.digitalasset.canton.data.TransactionViewDecomposition.{NewView, SameView}
import com.digitalasset.canton.data.*
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.participant.protocol.submission.TransactionTreeFactory.{
ConflictingPackagePreferenceError,
ContractKeyResolutionError,
MissingContractKeyLookupError,
SerializableContractOfId,
@ -27,9 +29,10 @@ import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.ShowUtil.*
import com.digitalasset.canton.util.{ErrorUtil, MapsUtil, MonadUtil}
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{LfKeyResolver, LfPartyId, checked}
import com.digitalasset.canton.{LfKeyResolver, LfPackageId, LfPackageName, LfPartyId, checked}
import java.util.UUID
import scala.annotation.tailrec
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
@ -55,6 +58,40 @@ class TransactionTreeFactoryImplV3(
private val initialCsmState: ContractStateMachine.State[Unit] =
ContractStateMachine.initial[Unit](ContractKeyUniquenessMode.Off)
private[submission] def buildPackagePreference(
decomposition: TransactionViewDecomposition
): Either[ConflictingPackagePreferenceError, Set[LfPackageId]] = {
def nodePref(n: LfActionNode): Set[(LfPackageName, LfPackageId)] = n match {
case ex: LfNodeExercises if ex.interfaceId.isDefined =>
Set(ex.packageName -> ex.templateId.packageId)
case _ => Set.empty
}
@tailrec
def go(
decompositions: List[TransactionViewDecomposition],
resolved: Set[(LfPackageName, LfPackageId)],
): Set[(LfPackageName, LfPackageId)] = {
decompositions match {
case Nil =>
resolved
case (v: SameView) :: others =>
go(others, resolved ++ nodePref(v.lfNode))
case (v: NewView) :: others =>
go(v.tailNodes.toList ::: others, resolved ++ nodePref(v.lfNode))
}
}
val preferences = go(List(decomposition), Set.empty)
MapsUtil
.toNonConflictingMap(preferences)
.bimap(
conflicts => ConflictingPackagePreferenceError(conflicts),
map => map.values.toSet,
)
}
protected[submission] class State private (
override val mediator: MediatorsOfDomain,
override val transactionUUID: UUID,
@ -272,7 +309,8 @@ class TransactionTreeFactoryImplV3(
// Compute the parameters of the view
seed = view.rootSeed
actionDescription = createActionDescription(suffixedRootNode, seed)
packagePreference <- EitherT.fromEither[Future](buildPackagePreference(view))
actionDescription = createActionDescription(suffixedRootNode, seed, packagePreference)
viewCommonData = createViewCommonData(view, viewCommonDataSalt)
viewKeyInputs = state.csmState.globalKeyInputs
resolvedK <- EitherT.fromEither[Future](

View File

@ -398,21 +398,6 @@ private[transfer] class TransferInProcessingSteps(
activenessResult <- EitherT.right[TransferProcessorError](activenessResultFuture)
requestId = RequestId(ts)
// construct pending data and response
entry = PendingTransferIn(
requestId,
rc,
sc,
txInRequest.tree.rootHash,
txInRequest.contract,
txInRequest.transferCounter,
txInRequest.submitterMetadata,
txInRequest.creatingTransactionId,
transferringParticipant,
transferId,
hostedStks.toSet,
mediator,
)
responses <- validationResultO match {
case None =>
EitherT.rightT[FutureUnlessShutdown, TransferProcessorError](Seq.empty)
@ -478,6 +463,28 @@ private[transfer] class TransferInProcessingSteps(
.map(transferResponse => Seq(transferResponse -> Recipients.cc(mediator)))
}
} yield {
// We consider that we rejected if at least one of the responses is not "approve'
val locallyRejected = responses.exists { case (response, _) =>
!response.localVerdict.isApprove
}
// construct pending data and response
val entry = PendingTransferIn(
requestId,
rc,
sc,
txInRequest.tree.rootHash,
txInRequest.contract,
txInRequest.transferCounter,
txInRequest.submitterMetadata,
txInRequest.creatingTransactionId,
transferringParticipant,
transferId,
hostedStks.toSet,
mediator,
locallyRejected,
)
StorePendingDataAndSendResponseAndCreateTimeout(
entry,
responses,
@ -514,6 +521,7 @@ private[transfer] class TransferInProcessingSteps(
transferId,
hostedStakeholders,
_,
locallyRejected,
) = pendingRequestData
def rejected(
@ -674,6 +682,7 @@ object TransferInProcessingSteps {
transferId: TransferId,
hostedStakeholders: Set[LfPartyId],
mediator: MediatorsOfDomain,
override val locallyRejected: Boolean,
) extends PendingTransfer
with PendingRequestData {

View File

@ -457,26 +457,6 @@ class TransferOutProcessingSteps(
)
requestId = RequestId(ts)
entry = PendingTransferOut(
requestId,
rc,
sc,
fullTree.tree.rootHash,
WithContractHash.fromContract(contract, fullTree.contractId),
fullTree.transferCounter,
contract.rawContractInstance.contractInstance.unversioned.template,
contract.rawContractInstance.contractInstance.unversioned.packageName,
transferringParticipant,
fullTree.submitterMetadata,
transferId,
fullTree.targetDomain,
fullTree.stakeholders,
hostedStks.toSet,
fullTree.targetTimeProof,
transferInExclusivity,
mediator,
)
transferOutDecisionTime <- ProcessingSteps
.getDecisionTime(sourceSnapshot.ipsSnapshot, ts)
.leftMap(TransferParametersError(domainId.unwrap, _))
@ -513,16 +493,42 @@ class TransferOutProcessingSteps(
confirmingStakeholders,
fullTree.tree.rootHash,
)
} yield StorePendingDataAndSendResponseAndCreateTimeout(
entry,
responseOpt.map(_ -> Recipients.cc(mediator)).toList,
RejectionArgs(
} yield {
// We consider that we rejected if at least one of the responses is not "approve'
val locallyRejected = responseOpt.exists { response => !response.localVerdict.isApprove }
val entry = PendingTransferOut(
requestId,
rc,
sc,
fullTree.tree.rootHash,
WithContractHash.fromContract(contract, fullTree.contractId),
fullTree.transferCounter,
contract.rawContractInstance.contractInstance.unversioned.template,
contract.rawContractInstance.contractInstance.unversioned.packageName,
transferringParticipant,
fullTree.submitterMetadata,
transferId,
fullTree.targetDomain,
fullTree.stakeholders,
hostedStks.toSet,
fullTree.targetTimeProof,
transferInExclusivity,
mediator,
locallyRejected,
)
StorePendingDataAndSendResponseAndCreateTimeout(
entry,
LocalRejectError.TimeRejects.LocalTimeout
.Reject()
.toLocalReject(sourceDomainProtocolVersion.v),
),
)
responseOpt.map(_ -> Recipients.cc(mediator)).toList,
RejectionArgs(
entry,
LocalRejectError.TimeRejects.LocalTimeout
.Reject()
.toLocalReject(sourceDomainProtocolVersion.v),
),
)
}
}
private[this] def getTransferInExclusivity(
@ -566,6 +572,7 @@ class TransferOutProcessingSteps(
_targetTimeProof,
transferInExclusivity,
_mediatorId,
locallyRejected,
) = pendingRequestData
val pendingSubmissionData = pendingSubmissionMap.get(rootHash)
@ -794,6 +801,7 @@ object TransferOutProcessingSteps {
targetTimeProof: TimeProof,
transferInExclusivity: Option[CantonTimestamp],
mediator: MediatorsOfDomain,
override val locallyRejected: Boolean,
) extends PendingTransfer
with PendingRequestData {

View File

@ -8,7 +8,7 @@ import cats.syntax.alternative.*
import cats.syntax.bifunctor.*
import cats.syntax.functor.*
import cats.syntax.parallel.*
import com.daml.lf.data.Ref.{Identifier, PackageId}
import com.daml.lf.data.Ref.{Identifier, PackageId, PackageName}
import com.daml.lf.engine
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.data.ViewParticipantData.RootAction
@ -24,7 +24,11 @@ import com.digitalasset.canton.participant.protocol.SerializableContractAuthenti
import com.digitalasset.canton.participant.protocol.TransactionProcessingSteps.CommonData
import com.digitalasset.canton.participant.protocol.submission.TransactionTreeFactory
import com.digitalasset.canton.participant.protocol.submission.TransactionTreeFactory.TransactionTreeConversionError
import com.digitalasset.canton.participant.protocol.validation.ModelConformanceChecker.*
import com.digitalasset.canton.participant.protocol.validation.ModelConformanceChecker.{
ConflictingNameBindings,
PackageNotFound,
*,
}
import com.digitalasset.canton.participant.store.{
ContractLookup,
ContractLookupAndVerification,
@ -32,6 +36,7 @@ import com.digitalasset.canton.participant.store.{
StoredContract,
}
import com.digitalasset.canton.participant.util.DAMLe
import com.digitalasset.canton.participant.util.DAMLe.PackageResolver
import com.digitalasset.canton.protocol.WellFormedTransaction.{
WithSuffixes,
WithSuffixesAndMerged,
@ -42,9 +47,8 @@ import com.digitalasset.canton.sequencing.protocol.MediatorsOfDomain
import com.digitalasset.canton.topology.ParticipantId
import com.digitalasset.canton.topology.client.TopologySnapshot
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.ErrorUtil
import com.digitalasset.canton.util.FutureInstances.*
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.util.{ErrorUtil, MapsUtil}
import com.digitalasset.canton.{
LfCommand,
LfCreateCommand,
@ -74,6 +78,7 @@ class ModelConformanceChecker(
Boolean,
ViewHash,
TraceContext,
Map[PackageName, PackageId],
) => EitherT[
Future,
DAMLeError,
@ -86,6 +91,7 @@ class ModelConformanceChecker(
val transactionTreeFactory: TransactionTreeFactory,
participantId: ParticipantId,
val serializableContractAuthenticator: SerializableContractAuthenticator,
val packageResolver: PackageResolver,
val enableContractUpgrading: Boolean,
override protected val loggerFactory: NamedLoggerFactory,
)(implicit executionContext: ExecutionContext)
@ -212,7 +218,7 @@ class ModelConformanceChecker(
): EitherT[Future, Error, Map[LfContractId, StoredContract]] = {
view.tryFlattenToParticipantViews
.flatMap(_.viewParticipantData.coreInputs)
.parTraverse { case (cid, inputContract @ InputContract(contract, _)) =>
.parTraverse { case (cid, InputContract(contract, _)) =>
validateContract(contract, traceContext)
.leftMap {
case DAMLeFailure(error) =>
@ -225,6 +231,34 @@ class ModelConformanceChecker(
.map(_.toMap)
}
private def buildPackageNameMap(
packageIds: Set[PackageId]
)(implicit
traceContext: TraceContext
): EitherT[Future, Error, Map[PackageName, PackageId]] = {
EitherT(for {
resolvedE <- packageIds.toSeq.parTraverse(pId =>
packageResolver(pId)(traceContext)
.map({
case None => Left(pId)
case Some(ast) => Right((pId, ast.metadata.name))
})
)
} yield {
for {
resolved <- resolvedE.separate match {
case (Seq(), resolved) => Right(resolved)
case (unresolved, _) => Left(PackageNotFound(Map(participantId -> unresolved.toSet)))
}
resolvedNameBindings = resolved.map({ case (pId, name) => name -> pId })
nameBindings <- MapsUtil.toNonConflictingMap(resolvedNameBindings) leftMap { conflicts =>
ConflictingNameBindings(Map(participantId -> conflicts))
}
} yield nameBindings
})
}
private def checkView(
view: TransactionView,
viewPosition: ViewPosition,
@ -241,8 +275,10 @@ class ModelConformanceChecker(
traceContext: TraceContext
): EitherT[Future, Error, WithRollbackScope[WellFormedTransaction[WithSuffixes]]] = {
val viewParticipantData = view.viewParticipantData.tryUnwrap
val RootAction(cmd, authorizers, failed) =
val RootAction(cmd, authorizers, failed, packageIdPreference) =
viewParticipantData.rootAction(enableContractUpgrading)
val rbContext = viewParticipantData.rollbackContext
val seed = viewParticipantData.actionDescription.seedOption
for {
@ -256,6 +292,9 @@ class ModelConformanceChecker(
resolverFromView,
serializableContractAuthenticator,
)
packagePreference <- buildPackageNameMap(packageIdPreference)
lfTxAndMetadata <- reinterpret(
contractLookupAndVerification,
authorizers,
@ -266,6 +305,7 @@ class ModelConformanceChecker(
failed,
view.viewHash,
traceContext,
packagePreference,
)
.leftWiden[Error]
(lfTx, metadata, resolverFromReinterpretation) = lfTxAndMetadata
@ -279,7 +319,7 @@ class ModelConformanceChecker(
.fromEither[Future](
WellFormedTransaction.normalizeAndCheck(lfTx, metadata, WithoutSuffixes)
)
.leftMap[Error](err => TransactionNotWellformed(err, view.viewHash))
.leftMap[Error](err => TransactionNotWellFormed(err, view.viewHash))
salts = transactionTreeFactory.saltsFromView(view)
reconstructedViewAndTx <- checked(
transactionTreeFactory.tryReconstruct(
@ -349,8 +389,8 @@ object ModelConformanceChecker {
damle: DAMLe,
transactionTreeFactory: TransactionTreeFactory,
serializableContractAuthenticator: SerializableContractAuthenticator,
protocolVersion: ProtocolVersion,
participantId: ParticipantId,
packageResolver: PackageResolver,
enableContractUpgrading: Boolean,
loggerFactory: NamedLoggerFactory,
)(implicit executionContext: ExecutionContext): ModelConformanceChecker = {
@ -364,6 +404,7 @@ object ModelConformanceChecker {
expectFailure: Boolean,
viewHash: ViewHash,
traceContext: TraceContext,
packageResolution: Map[PackageName, PackageId],
): EitherT[Future, DAMLeError, (LfVersionedTransaction, TransactionMetadata, LfKeyResolver)] =
damle
.reinterpret(
@ -374,6 +415,7 @@ object ModelConformanceChecker {
submissionTime,
rootSeed,
expectFailure,
packageResolution,
)(traceContext)
.leftMap(DAMLeError(_, viewHash))
@ -383,6 +425,7 @@ object ModelConformanceChecker {
transactionTreeFactory,
participantId,
serializableContractAuthenticator,
packageResolver,
enableContractUpgrading,
loggerFactory,
)
@ -459,24 +502,8 @@ object ModelConformanceChecker {
override def pretty: Pretty[DAMLeError] = adHocPrettyInstance
}
/** Indicates a different number of declared and reconstructed create nodes. */
final case class CreatedContractsDeclaredIncorrectly(
declaredCreateNodes: Seq[CreatedContract],
reconstructedCreateNodes: Seq[LfNodeCreate],
viewHash: ViewHash,
) extends Error {
override def pretty: Pretty[CreatedContractsDeclaredIncorrectly] = prettyOfClass(
param("declaredCreateNodes", _.declaredCreateNodes),
param(
"reconstructedCreateNodes",
_.reconstructedCreateNodes.map(_.templateId),
),
unnamedParam(_.viewHash),
)
}
final case class TransactionNotWellformed(cause: String, viewHash: ViewHash) extends Error {
override def pretty: Pretty[TransactionNotWellformed] = prettyOfClass(
final case class TransactionNotWellFormed(cause: String, viewHash: ViewHash) extends Error {
override def pretty: Pretty[TransactionNotWellFormed] = prettyOfClass(
param("cause", _.cause.unquoted),
unnamedParam(_.viewHash),
)
@ -510,14 +537,6 @@ object ModelConformanceChecker {
)
}
final case class JoinedTransactionNotWellFormed(
cause: String
) extends Error {
override def pretty: Pretty[JoinedTransactionNotWellFormed] = prettyOfClass(
param("cause", _.cause.unquoted)
)
}
final case class InvalidInputContract(
contractId: LfContractId,
templateId: Identifier,
@ -549,6 +568,34 @@ object ModelConformanceChecker {
)
}
final case class PackageNotFound(
missing: Map[ParticipantId, Set[PackageId]]
) extends Error {
override def pretty: Pretty[PackageNotFound] = prettyOfClass(
unnamedParam(
_.missing
.map { case (participant, packageIds) =>
show"$participant can not find $packageIds".unquoted
}
.mkShow("\n")
)
)
}
final case class ConflictingNameBindings(
conflicting: Map[ParticipantId, Map[PackageName, Set[PackageId]]]
) extends Error {
override def pretty: Pretty[ConflictingNameBindings] = prettyOfClass(
unnamedParam(
_.conflicting
.map { case (participant, conflicts) =>
show"$participant has detected conflicting package name resolutions $conflicts".unquoted
}
.mkShow("\n")
)
)
}
final case class Result(
transactionId: TransactionId,
suffixedTransaction: WellFormedTransaction[WithSuffixesAndMerged],

View File

@ -25,6 +25,7 @@ final case class PendingTransaction(
override val requestSequencerCounter: SequencerCounter,
transactionValidationResult: TransactionValidationResult,
override val mediator: MediatorsOfDomain,
override val locallyRejected: Boolean,
) extends PendingRequestData {
val requestId: RequestId = RequestId(requestTime)

View File

@ -64,6 +64,7 @@ import com.digitalasset.canton.participant.traffic.{
ParticipantTrafficControlSubscriber,
TrafficStateController,
}
import com.digitalasset.canton.participant.util.DAMLe.PackageResolver
import com.digitalasset.canton.participant.util.{DAMLe, TimeOfChange}
import com.digitalasset.canton.platform.apiserver.execution.AuthorityResolver
import com.digitalasset.canton.protocol.WellFormedTransaction.WithoutSuffixes
@ -164,6 +165,9 @@ class SyncDomain(
loggerFactory,
)
private val packageResolver: PackageResolver = pkgId =>
traceContext => packageService.getPackage(pkgId)(traceContext)
private val damle =
new DAMLe(
pkgId => traceContext => packageService.getPackage(pkgId)(traceContext),
@ -187,6 +191,7 @@ class SyncDomain(
timeouts,
loggerFactory,
futureSupervisor,
packageResolver = packageResolver,
enableContractUpgrading = parameters.enableContractUpgrading,
)

View File

@ -6,7 +6,7 @@ package com.digitalasset.canton.participant.util
import cats.data.EitherT
import com.daml.lf.VersionRange
import com.daml.lf.data.Ref.PackageId
import com.daml.lf.data.{ImmArray, Time}
import com.daml.lf.data.{ImmArray, Ref, Time}
import com.daml.lf.engine.*
import com.daml.lf.interpretation.Error as LfInterpretationError
import com.daml.lf.language.Ast.Package
@ -118,6 +118,7 @@ class DAMLe(
submissionTime: CantonTimestamp,
rootSeed: Option[LfHash],
expectFailure: Boolean,
packageResolution: Map[Ref.PackageName, Ref.PackageId] = Map.empty,
)(implicit
traceContext: TraceContext
): EitherT[
@ -176,6 +177,7 @@ class DAMLe(
nodeSeed = rootSeed,
submissionTime = submissionTime.toLf,
ledgerEffectiveTime = ledgerTime.toLf,
packageResolution = packageResolution,
)
}
@ -202,6 +204,7 @@ class DAMLe(
nodeSeed = Some(DAMLe.zeroSeed),
submissionTime = Time.Timestamp.Epoch, // Only used to compute contract ids
ledgerEffectiveTime = ledgerEffectiveTime.ts.underlying,
packageResolution = Map.empty,
)
for {
txWithMetadata <- EitherT(

View File

@ -38,6 +38,7 @@ class Phase37SynchronizerTest extends AnyWordSpec with BaseTest with HasExecutio
RequestCounter(i),
SequencerCounter(i),
MediatorsOfDomain(MediatorGroupIndex.one),
locallyRejected = false,
)
)

View File

@ -509,7 +509,12 @@ class ProtocolProcessorTest
}
"transit to confirmed" in {
val pd = TestPendingRequestData(rc, requestSc, MediatorsOfDomain(MediatorGroupIndex.one))
val pd = TestPendingRequestData(
rc,
requestSc,
MediatorsOfDomain(MediatorGroupIndex.one),
locallyRejected = false,
)
val (sut, _persistent, ephemeral) =
testProcessingSteps(overrideConstructedPendingRequestDataO = Some(pd))
val before = ephemeral.requestJournal.query(rc).value.futureValue
@ -530,7 +535,12 @@ class ProtocolProcessorTest
"leave the request state unchanged when doing a clean replay" in {
val pendingData =
TestPendingRequestData(rc, requestSc, MediatorsOfDomain(MediatorGroupIndex.one))
TestPendingRequestData(
rc,
requestSc,
MediatorsOfDomain(MediatorGroupIndex.one),
locallyRejected = false,
)
val (sut, _persistent, ephemeral) =
testProcessingSteps(
overrideConstructedPendingRequestDataO = Some(pendingData),
@ -564,7 +574,12 @@ class ProtocolProcessorTest
}
"trigger a timeout when the result doesn't arrive" in {
val pd = TestPendingRequestData(rc, requestSc, MediatorsOfDomain(MediatorGroupIndex.one))
val pd = TestPendingRequestData(
rc,
requestSc,
MediatorsOfDomain(MediatorGroupIndex.one),
locallyRejected = false,
)
val (sut, _persistent, ephemeral) =
testProcessingSteps(overrideConstructedPendingRequestDataO = Some(pd))
@ -911,7 +926,12 @@ class ProtocolProcessorTest
.complete(
Some(
WrappedPendingRequestData(
TestPendingRequestData(rc, requestSc, MediatorsOfDomain(MediatorGroupIndex.one))
TestPendingRequestData(
rc,
requestSc,
MediatorsOfDomain(MediatorGroupIndex.one),
locallyRejected = false,
)
)
)
)
@ -1038,6 +1058,7 @@ class ProtocolProcessorTest
rc,
requestSc,
MediatorsOfDomain(MediatorGroupIndex.one),
locallyRejected = false,
)
)
)

View File

@ -247,7 +247,12 @@ class TestProcessingSteps(
] = {
val res = StorePendingDataAndSendResponseAndCreateTimeout(
pendingRequestData.getOrElse(
TestPendingRequestData(RequestCounter(0), SequencerCounter(0), mediator)
TestPendingRequestData(
RequestCounter(0),
SequencerCounter(0),
mediator,
locallyRejected = false,
)
),
Seq.empty,
(),
@ -339,9 +344,10 @@ object TestProcessingSteps {
type TestViewType = TestViewType.type
final case class TestPendingRequestData(
requestCounter: RequestCounter,
requestSequencerCounter: SequencerCounter,
mediator: MediatorsOfDomain,
override val requestCounter: RequestCounter,
override val requestSequencerCounter: SequencerCounter,
override val mediator: MediatorsOfDomain,
override val locallyRejected: Boolean,
) extends PendingRequestData {
override def rootHashO: Option[RootHash] = None

View File

@ -652,6 +652,7 @@ class TransferInProcessingStepsTest extends AsyncWordSpec with BaseTest with Has
transferId,
contract.metadata.stakeholders,
MediatorsOfDomain(MediatorGroupIndex.one),
locallyRejected = false,
)
for {

View File

@ -845,6 +845,7 @@ final class TransferOutProcessingStepsTest
timeEvent,
Some(transferInExclusivity),
MediatorsOfDomain(MediatorGroupIndex.one),
locallyRejected = false,
)
_ <- valueOrFail(
outProcessingSteps

View File

@ -6,8 +6,10 @@ package com.digitalasset.canton.participant.protocol.validation
import cats.data.EitherT
import cats.syntax.parallel.*
import com.daml.lf.data.ImmArray
import com.daml.lf.data.Ref.PackageId
import com.daml.lf.data.Ref.{PackageId, PackageName}
import com.daml.lf.engine
import com.daml.lf.language.Ast.{Expr, GenPackage, PackageMetadata}
import com.daml.lf.language.LanguageVersion
import com.daml.nonempty.{NonEmpty, NonEmptyUtil}
import com.digitalasset.canton.data.{
CantonTimestamp,
@ -23,6 +25,7 @@ import com.digitalasset.canton.participant.protocol.{
TransactionProcessingSteps,
}
import com.digitalasset.canton.participant.store.ContractLookup
import com.digitalasset.canton.participant.util.DAMLe.PackageResolver
import com.digitalasset.canton.protocol.ExampleTransactionFactory.{lfHash, submittingParticipant}
import com.digitalasset.canton.protocol.*
import com.digitalasset.canton.topology.client.TopologySnapshot
@ -30,7 +33,15 @@ import com.digitalasset.canton.topology.transaction.VettedPackagesX
import com.digitalasset.canton.topology.{TestingIdentityFactoryX, TestingTopologyX}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.FutureInstances.*
import com.digitalasset.canton.{BaseTest, LfCommand, LfKeyResolver, LfPartyId, RequestCounter}
import com.digitalasset.canton.{
BaseTest,
LfCommand,
LfKeyResolver,
LfPackageName,
LfPackageVersion,
LfPartyId,
RequestCounter,
}
import org.scalatest.Assertion
import org.scalatest.wordspec.AsyncWordSpec
import pprint.Tree
@ -63,6 +74,7 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
_inRollback: Boolean,
_viewHash: ViewHash,
_traceContext: TraceContext,
_packageResolution: Map[PackageName, PackageId],
): EitherT[Future, DAMLeError, (LfVersionedTransaction, TransactionMetadata, LfKeyResolver)] = {
ledgerTime shouldEqual factory.ledgerTime
@ -88,6 +100,7 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
_inRollback: Boolean,
_viewHash: ViewHash,
_traceContext: TraceContext,
_packageResolution: Map[PackageName, PackageId],
): EitherT[Future, DAMLeError, (LfVersionedTransaction, TransactionMetadata, LfKeyResolver)] =
fail("Reinterpret should not be called by this test case.")
@ -132,6 +145,14 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
mcc.check(rootViewTrees, keyResolvers, RequestCounter(0), ips, commonData)
}
val packageName: LfPackageName = PackageName.assertFromString("package-name")
val packageVersion: LfPackageVersion = LfPackageVersion.assertFromString("1.0.0")
val packageMetadata: PackageMetadata = PackageMetadata(packageName, packageVersion, None)
val genPackage: GenPackage[Expr] =
GenPackage(Map.empty, Set.empty, LanguageVersion.default, packageMetadata)
val packageResolver: PackageResolver = pkgId =>
traceContext => Future.successful(Some(genPackage))
"A model conformance checker" when {
val relevantExamples = factory.standardHappyCases.filter {
// If the transaction is empty there is no transaction view message. Therefore, the checker is not invoked.
@ -149,6 +170,7 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
transactionTreeFactory,
submittingParticipant,
dummyAuthenticator,
packageResolver,
enableContractUpgrading = false,
loggerFactory,
)
@ -199,6 +221,7 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
transactionTreeFactory,
submittingParticipant,
dummyAuthenticator,
packageResolver,
enableContractUpgrading = false,
loggerFactory,
)
@ -228,7 +251,7 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
val error = DAMLeError(mock[engine.Error], mockViewHash)
val sut = new ModelConformanceChecker(
(_, _, _, _, _, _, _, _, _) =>
(_, _, _, _, _, _, _, _, _, _) =>
EitherT.leftT[Future, (LfVersionedTransaction, TransactionMetadata, LfKeyResolver)](
error
),
@ -236,6 +259,7 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
transactionTreeFactory,
submittingParticipant,
dummyAuthenticator,
packageResolver,
enableContractUpgrading = false,
loggerFactory,
)
@ -275,6 +299,7 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
transactionTreeFactory,
submittingParticipant,
dummyAuthenticator,
packageResolver,
enableContractUpgrading = true,
loggerFactory,
)
@ -301,7 +326,7 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
),
)
val sut = new ModelConformanceChecker(
(_, _, _, _, _, _, _, _, _) =>
(_, _, _, _, _, _, _, _, _, _) =>
EitherT.pure[Future, DAMLeError](
(reinterpreted, subviewMissing.metadata, subviewMissing.keyResolver)
),
@ -309,6 +334,7 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
transactionTreeFactory,
submittingParticipant,
dummyAuthenticator,
packageResolver,
enableContractUpgrading = false,
loggerFactory,
)
@ -387,6 +413,7 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
transactionTreeFactory = transactionTreeFactory,
participantId = submittingParticipant,
serializableContractAuthenticator = dummyAuthenticator,
packageResolver = packageResolver,
enableContractUpgrading = false,
loggerFactory,
)

View File

@ -7,7 +7,7 @@ import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.time.TimeProvider
import com.digitalasset.canton.tracing.{TraceContext, Traced}
import com.google.protobuf.ByteString
import io.grpc.BindableService
import io.grpc.ServerServiceDefinition
import org.apache.pekko.stream.scaladsl.Source
import org.apache.pekko.stream.{KillSwitch, Materializer}
import pureconfig.{ConfigReader, ConfigWriter}
@ -43,7 +43,7 @@ trait BlockOrderer extends AutoCloseable {
/** Additional services exposed by the [[com.digitalasset.canton.domain.block.BlockOrderer]], e.g., to other peer nodes.
*/
def grpcServices: Seq[BindableService]
def grpcServices: Seq[ServerServiceDefinition]
/** Send a request.
* Requests are ordered and delivered as [[com.digitalasset.canton.domain.block.BlockOrderer.Block]] to subscribers.

View File

@ -7,7 +7,7 @@ import com.digitalasset.canton.logging.{NamedLoggerFactory, TracedLogger}
import com.digitalasset.canton.time.TimeProvider
import com.digitalasset.canton.tracing.{TraceContext, Traced}
import com.google.protobuf.ByteString
import io.grpc.BindableService
import io.grpc.ServerServiceDefinition
import org.apache.pekko.stream.scaladsl.Source
import org.apache.pekko.stream.{KillSwitch, Materializer}
import pureconfig.{ConfigReader, ConfigWriter}
@ -63,7 +63,7 @@ object BlockOrderingSequencer {
private val logger = loggerFactory.getTracedLogger(getClass)
override def adminServices: Seq[BindableService] =
override def adminServices: Seq[ServerServiceDefinition] =
// This is a bit of a semantic abuse, as not all exposed services will be administrative (e.g., P2P);
// perhaps we can rename the field in the sequencer API.
blockOrderer.grpcServices

View File

@ -7,7 +7,7 @@ import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.time.TimeProvider
import com.digitalasset.canton.tracing.{TraceContext, Traced}
import com.google.protobuf.ByteString
import io.grpc.BindableService
import io.grpc.ServerServiceDefinition
import org.apache.pekko.stream.scaladsl.Source
import org.apache.pekko.stream.{KillSwitch, Materializer}
import pureconfig.{ConfigReader, ConfigWriter}
@ -127,7 +127,7 @@ trait SequencerDriver extends AutoCloseable {
/** Services for administering the ledger driver.
* These services will be exposed on the sequencer node's admin API endpoint.
*/
def adminServices: Seq[BindableService]
def adminServices: Seq[ServerServiceDefinition]
// Write operations

View File

@ -67,6 +67,7 @@ trait TestEssentials
BaseTest.testedReleaseProtocolVersion
protected lazy val defaultStaticDomainParameters: StaticDomainParameters =
BaseTest.defaultStaticDomainParameters
protected lazy val testedUseUnifiedSequencer: Boolean = BaseTest.testedUseUnifiedSequencer
// default to providing an empty trace context to all tests
protected implicit def traceContext: TraceContext = TraceContext.empty
@ -311,7 +312,7 @@ trait BaseTest
lazy val CantonTestsPath: String = BaseTest.CantonTestsPath
lazy val PerformanceTestPath: String = BaseTest.PerformanceTestPath
lazy val DamlTestFilesPath: String = BaseTest.DamlTestFilesPath
lazy val DamlTestLfV21FilesPath: String = BaseTest.DamlTestLfV21FilesPath
lazy val DamlTestLfDevFilesPath: String = BaseTest.DamlTestLfDevFilesPath
}
object BaseTest {
@ -419,6 +420,8 @@ object BaseTest {
testedProtocolVersion
)
lazy val testedUseUnifiedSequencer: Boolean = tryGetUseUnifiedSequencerFromEnv
lazy val CantonExamplesPath: String = getResourcePath("CantonExamples.dar")
lazy val CantonTestsPath: String = getResourcePath("CantonTests.dar")
lazy val CantonTestsDevPath: String = getResourcePath("CantonTestsDev.dar")
@ -427,7 +430,7 @@ object BaseTest {
lazy val PerformanceTestPath: String = getResourcePath("PerformanceTest.dar")
lazy val DamlScript3TestFilesPath: String = getResourcePath("DamlScript3TestFiles.dar")
lazy val DamlTestFilesPath: String = getResourcePath("DamlTestFiles.dar")
lazy val DamlTestLfV21FilesPath: String = getResourcePath("DamlTestLfV21Files.dar")
lazy val DamlTestLfDevFilesPath: String = getResourcePath("DamlTestLfDevFiles.dar")
def getResourcePath(name: String): String =
Option(getClass.getClassLoader.getResource(name))
@ -441,6 +444,10 @@ object BaseTest {
.get("CANTON_PROTOCOL_VERSION")
.map(ProtocolVersion.tryCreate)
protected def tryGetUseUnifiedSequencerFromEnv: Boolean = sys.env
.get("CANTON_UNIFIED_SEQUENCER")
.exists(_.toBoolean)
}
trait BaseTestWordSpec extends BaseTest with AnyWordSpecLike {

View File

@ -52,5 +52,7 @@ object MockedNodeParameters {
override def useNewTrafficControl: Boolean = false
override def exitOnFatalFailures: Boolean = ???
override def useUnifiedSequencer: Boolean = false
}
}

View File

@ -22,6 +22,11 @@ final case class Traced[+A](value: A)(implicit override val traceContext: TraceC
def traverse[F[_], B](f: A => F[B])(implicit F: Functor[F]): F[Traced[B]] =
F.map(f(value))(Traced(_))
def traverseWithTraceContext[F[_], B](fn: TraceContext => A => F[B])(implicit
F: Functor[F]
): F[Traced[B]] =
F.map(fn(traceContext)(value))(Traced(_))
def withTraceContext[B](fn: TraceContext => A => B): B = fn(traceContext)(value)
def copy[B](value: B): Traced[B] = Traced(value)(traceContext)

View File

@ -1 +1 @@
20240319.12918.v6ad9b204
20240320.12940.v147a56d2