mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
update canton to e5a4cdfc (#17584)
CHANGELOG_BEGIN CHANGELOG_END Co-authored-by: Azure Pipelines Daml Build <support@digitalasset.com>
This commit is contained in:
parent
fc88867925
commit
94597ad486
35
canton/README.md
Normal file
35
canton/README.md
Normal file
@ -0,0 +1,35 @@
|
||||
# Canton
|
||||
|
||||
[![CircleCI](https://circleci.com/gh/DACH-NY/canton.svg?style=svg&circle-token=5cf96f68761df465e62cf13b03cf75f4e9f67eb7)](https://circleci.com/gh/DACH-NY/canton)
|
||||
|
||||
Canton is a next-generation Daml ledger interoperability protocol that implements Daml's built-in models of
|
||||
authorization and privacy faithfully.
|
||||
|
||||
* By partitioning the global state it solves both the privacy problems and the scaling bottlenecks of platforms such as
|
||||
a single Ethereum instance.
|
||||
|
||||
* It allows developers to balance auditability requirements with the right to forget, making it well-suited for building
|
||||
GDPR-compliant systems.
|
||||
|
||||
* Canton handles authentication and data transport through our so-called synchronization domains.
|
||||
|
||||
* Domains can be deployed at will to address scalability, operational or trust concerns.
|
||||
|
||||
* Domains can be implemented on top of various technologies, depending on the trust requirements.
|
||||
|
||||
* Domains are permissioned but can be federated at no interoperability cost, yielding a virtual global ledger that
|
||||
enables truly global workflow composition.
|
||||
|
||||
Refer to the [Canton Whitepaper](https://www.canton.io/publications/canton-whitepaper.pdf) for further details.
|
||||
|
||||
## Running
|
||||
|
||||
Please read [Getting Started](https://docs.daml.com/canton/tutorials/getting_started.html)
|
||||
for instructions on how to get started with Canton.
|
||||
|
||||
Consult the [Canton User Manual](https://docs.daml.com/canton/about.html) for further
|
||||
references of Canton's configuration, command-line arguments, or its console.
|
||||
|
||||
## Development
|
||||
|
||||
Please read our [CONTRIBUTING guidelines](CONTRIBUTING.md).
|
@ -8,12 +8,13 @@ import cats.syntax.functorFilter.*
|
||||
import cats.syntax.traverse.*
|
||||
import com.daml.jwt.JwtDecoder
|
||||
import com.daml.jwt.domain.Jwt
|
||||
import com.daml.ledger.api
|
||||
import com.daml.ledger.api.v1.CommandsOuterClass
|
||||
import com.daml.ledger.api.v1.admin.package_management_service.PackageDetails
|
||||
import com.daml.ledger.api.v1.admin.party_management_service.PartyDetails as ProtoPartyDetails
|
||||
import com.daml.ledger.api.v1.command_completion_service.Checkpoint
|
||||
import com.daml.ledger.api.v1.commands.{Command, DisclosedContract}
|
||||
import com.daml.ledger.api.v1.completion.Completion
|
||||
import com.daml.ledger.api.v1.event.CreatedEvent
|
||||
import com.daml.ledger.api.v1.event_query_service.{
|
||||
GetEventsByContractIdResponse,
|
||||
GetEventsByContractKeyResponse,
|
||||
@ -27,6 +28,7 @@ import com.daml.ledger.api.v2.transaction.{
|
||||
TransactionTree as TransactionTreeV2,
|
||||
}
|
||||
import com.daml.ledger.client.binding.{Contract, TemplateCompanion}
|
||||
import com.daml.ledger.{api, javaapi as javab}
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.metrics.api.MetricHandle.{Histogram, Meter}
|
||||
import com.daml.metrics.api.{MetricHandle, MetricName, MetricsContext}
|
||||
@ -70,7 +72,7 @@ import com.digitalasset.canton.ledger.api.{DeduplicationPeriod, domain}
|
||||
import com.digitalasset.canton.ledger.client.services.admin.IdentityProviderConfigClient
|
||||
import com.digitalasset.canton.logging.NamedLogging
|
||||
import com.digitalasset.canton.networking.grpc.{GrpcError, RecordingStreamObserver}
|
||||
import com.digitalasset.canton.participant.ledger.api.client.DecodeUtil
|
||||
import com.digitalasset.canton.participant.ledger.api.client.{DecodeUtil, JavaDecodeUtil}
|
||||
import com.digitalasset.canton.protocol.LfContractId
|
||||
import com.digitalasset.canton.topology.{DomainId, ParticipantId, PartyId}
|
||||
import com.digitalasset.canton.tracing.NoTracing
|
||||
@ -1505,6 +1507,243 @@ trait BaseLedgerApiAdministration extends NoTracing {
|
||||
})
|
||||
}
|
||||
|
||||
@Help.Summary("Group of commands that utilize java bindings", FeatureFlag.Testing)
|
||||
@Help.Group("Ledger Api (Java bindings)")
|
||||
object javaapi extends Helpful {
|
||||
|
||||
@Help.Summary("Submit commands (Java bindings)", FeatureFlag.Testing)
|
||||
@Help.Group("Command Submission (Java bindings)")
|
||||
object commands extends Helpful {
|
||||
@Help.Summary(
|
||||
"Submit java codegen commands and wait for the resulting transaction, returning the transaction tree or failing otherwise",
|
||||
FeatureFlag.Testing,
|
||||
)
|
||||
@Help.Description(
|
||||
"""Submits a command on behalf of the `actAs` parties, waits for the resulting transaction to commit and returns it.
|
||||
| If the timeout is set, it also waits for the transaction to appear at all other configured
|
||||
| participants who were involved in the transaction. The call blocks until the transaction commits or fails;
|
||||
| the timeout only specifies how long to wait at the other participants.
|
||||
| Fails if the transaction doesn't commit, or if it doesn't become visible to the involved participants in
|
||||
| the allotted time.
|
||||
| Note that if the optTimeout is set and the involved parties are concurrently enabled/disabled or their
|
||||
| participants are connected/disconnected, the command may currently result in spurious timeouts or may
|
||||
| return before the transaction appears at all the involved participants."""
|
||||
)
|
||||
def submit(
|
||||
actAs: Seq[PartyId],
|
||||
commands: Seq[javab.data.Command],
|
||||
workflowId: String = "",
|
||||
commandId: String = "",
|
||||
optTimeout: Option[NonNegativeDuration] = Some(timeouts.ledgerCommand),
|
||||
deduplicationPeriod: Option[DeduplicationPeriod] = None,
|
||||
submissionId: String = "",
|
||||
minLedgerTimeAbs: Option[Instant] = None,
|
||||
readAs: Seq[PartyId] = Seq.empty,
|
||||
disclosedContracts: Seq[CommandsOuterClass.DisclosedContract] = Seq.empty,
|
||||
applicationId: String = applicationId,
|
||||
): javab.data.TransactionTree = check(FeatureFlag.Testing) {
|
||||
val tx = consoleEnvironment.run {
|
||||
ledgerApiCommand(
|
||||
LedgerApiCommands.CommandService.SubmitAndWaitTransactionTree(
|
||||
actAs.map(_.toLf),
|
||||
readAs.map(_.toLf),
|
||||
commands.map(c => Command.fromJavaProto(c.toProtoCommand)),
|
||||
workflowId,
|
||||
commandId,
|
||||
deduplicationPeriod,
|
||||
submissionId,
|
||||
minLedgerTimeAbs,
|
||||
disclosedContracts.map(DisclosedContract.fromJavaProto(_)),
|
||||
applicationId,
|
||||
)
|
||||
)
|
||||
}
|
||||
javab.data.TransactionTree.fromProto(
|
||||
TransactionTree.toJavaProto(optionallyAwait(tx, tx.transactionId, optTimeout))
|
||||
)
|
||||
}
|
||||
|
||||
@Help.Summary(
|
||||
"Submit java codegen command and wait for the resulting transaction, returning the flattened transaction or failing otherwise",
|
||||
FeatureFlag.Testing,
|
||||
)
|
||||
@Help.Description(
|
||||
"""Submits a command on behalf of the `actAs` parties, waits for the resulting transaction to commit, and returns the "flattened" transaction.
|
||||
| If the timeout is set, it also waits for the transaction to appear at all other configured
|
||||
| participants who were involved in the transaction. The call blocks until the transaction commits or fails;
|
||||
| the timeout only specifies how long to wait at the other participants.
|
||||
| Fails if the transaction doesn't commit, or if it doesn't become visible to the involved participants in
|
||||
| the allotted time.
|
||||
| Note that if the optTimeout is set and the involved parties are concurrently enabled/disabled or their
|
||||
| participants are connected/disconnected, the command may currently result in spurious timeouts or may
|
||||
| return before the transaction appears at all the involved participants."""
|
||||
)
|
||||
def submit_flat(
|
||||
actAs: Seq[PartyId],
|
||||
commands: Seq[javab.data.Command],
|
||||
workflowId: String = "",
|
||||
commandId: String = "",
|
||||
optTimeout: Option[config.NonNegativeDuration] = Some(timeouts.ledgerCommand),
|
||||
deduplicationPeriod: Option[DeduplicationPeriod] = None,
|
||||
submissionId: String = "",
|
||||
minLedgerTimeAbs: Option[Instant] = None,
|
||||
readAs: Seq[PartyId] = Seq.empty,
|
||||
disclosedContracts: Seq[DisclosedContract] = Seq.empty,
|
||||
applicationId: String = applicationId,
|
||||
): javab.data.Transaction = check(FeatureFlag.Testing) {
|
||||
val tx = consoleEnvironment.run {
|
||||
ledgerApiCommand(
|
||||
LedgerApiCommands.CommandService.SubmitAndWaitTransaction(
|
||||
actAs.map(_.toLf),
|
||||
readAs.map(_.toLf),
|
||||
commands.map(c => Command.fromJavaProto(c.toProtoCommand)),
|
||||
workflowId,
|
||||
commandId,
|
||||
deduplicationPeriod,
|
||||
submissionId,
|
||||
minLedgerTimeAbs,
|
||||
disclosedContracts,
|
||||
applicationId,
|
||||
)
|
||||
)
|
||||
}
|
||||
javab.data.Transaction.fromProto(
|
||||
Transaction.toJavaProto(optionallyAwait(tx, tx.transactionId, optTimeout))
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Help.Summary("Read from transaction stream (Java bindings)", FeatureFlag.Testing)
|
||||
@Help.Group("Transactions (Java bindings)")
|
||||
object transactions extends Helpful {
|
||||
|
||||
@Help.Summary(
|
||||
"Get transaction trees in the format expected by the Java bindings",
|
||||
FeatureFlag.Testing,
|
||||
)
|
||||
@Help.Description(
|
||||
"""This function connects to the transaction tree stream for the given parties and collects transaction trees
|
||||
|until either `completeAfter` transaction trees have been received or `timeout` has elapsed.
|
||||
|The returned transaction trees can be filtered to be between the given offsets (default: no filtering).
|
||||
|If the participant has been pruned via `pruning.prune` and if `beginOffset` is lower than the pruning offset,
|
||||
|this command fails with a `NOT_FOUND` error."""
|
||||
)
|
||||
def trees(
|
||||
partyIds: Set[PartyId],
|
||||
completeAfter: Int,
|
||||
beginOffset: LedgerOffset =
|
||||
new LedgerOffset().withBoundary(LedgerOffset.LedgerBoundary.LEDGER_BEGIN),
|
||||
endOffset: Option[LedgerOffset] = None,
|
||||
verbose: Boolean = true,
|
||||
timeout: config.NonNegativeDuration = timeouts.ledgerCommand,
|
||||
): Seq[javab.data.TransactionTree] = check(FeatureFlag.Testing)({
|
||||
ledger_api.transactions
|
||||
.trees(partyIds, completeAfter, beginOffset, endOffset, verbose, timeout)
|
||||
.map(t => javab.data.TransactionTree.fromProto(TransactionTree.toJavaProto(t)))
|
||||
})
|
||||
|
||||
@Help.Summary(
|
||||
"Get flat transactions in the format expected by the Java bindings",
|
||||
FeatureFlag.Testing,
|
||||
)
|
||||
@Help.Description(
|
||||
"""This function connects to the flat transaction stream for the given parties and collects transactions
|
||||
|until either `completeAfter` transaction trees have been received or `timeout` has elapsed.
|
||||
|The returned transactions can be filtered to be between the given offsets (default: no filtering).
|
||||
|If the participant has been pruned via `pruning.prune` and if `beginOffset` is lower than the pruning offset,
|
||||
|this command fails with a `NOT_FOUND` error."""
|
||||
)
|
||||
def flat(
|
||||
partyIds: Set[PartyId],
|
||||
completeAfter: Int,
|
||||
beginOffset: LedgerOffset =
|
||||
new LedgerOffset().withBoundary(LedgerOffset.LedgerBoundary.LEDGER_BEGIN),
|
||||
endOffset: Option[LedgerOffset] = None,
|
||||
verbose: Boolean = true,
|
||||
timeout: config.NonNegativeDuration = timeouts.ledgerCommand,
|
||||
): Seq[javab.data.Transaction] = check(FeatureFlag.Testing)({
|
||||
ledger_api.transactions
|
||||
.flat(partyIds, completeAfter, beginOffset, endOffset, verbose, timeout)
|
||||
.map(t => javab.data.Transaction.fromProto(Transaction.toJavaProto(t)))
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
@Help.Summary("Read active contracts (Java bindings)", FeatureFlag.Testing)
|
||||
@Help.Group("Active Contracts (Java bindings)")
|
||||
object acs extends Helpful {
|
||||
|
||||
@Help.Summary(
|
||||
"Wait until a contract becomes availableand return the Java codegen contract",
|
||||
FeatureFlag.Testing,
|
||||
)
|
||||
@Help.Description(
|
||||
"""This function can be used for contracts with a code-generated Scala model.
|
||||
|You can refine your search using the `filter` function argument.
|
||||
|The command will wait until the contract appears or throw an exception once it times out."""
|
||||
)
|
||||
def await[
|
||||
TC <: javab.data.codegen.Contract[TCid, T],
|
||||
TCid <: javab.data.codegen.ContractId[T],
|
||||
T <: javab.data.Template,
|
||||
](companion: javab.data.codegen.ContractCompanion[TC, TCid, T])(
|
||||
partyId: PartyId,
|
||||
predicate: TC => Boolean = (_: TC) => true,
|
||||
timeout: config.NonNegativeDuration = timeouts.ledgerCommand,
|
||||
): TC = check(FeatureFlag.Testing)({
|
||||
val result = new AtomicReference[Option[TC]](None)
|
||||
ConsoleMacros.utils.retry_until_true(timeout) {
|
||||
val tmp = filter(companion)(partyId, predicate)
|
||||
result.set(tmp.headOption)
|
||||
tmp.nonEmpty
|
||||
}
|
||||
consoleEnvironment.runE {
|
||||
result
|
||||
.get()
|
||||
.toRight(s"Failed to find contract of type ${companion.TEMPLATE_ID} after $timeout")
|
||||
}
|
||||
})
|
||||
|
||||
@Help.Summary(
|
||||
"Filter the ACS for contracts of a particular Java code-generated template",
|
||||
FeatureFlag.Testing,
|
||||
)
|
||||
@Help.Description(
|
||||
"""To use this function, ensure a code-generated Java model for the target template exists.
|
||||
|You can refine your search using the `predicate` function argument."""
|
||||
)
|
||||
def filter[
|
||||
TC <: javab.data.codegen.Contract[TCid, T],
|
||||
TCid <: javab.data.codegen.ContractId[T],
|
||||
T <: javab.data.Template,
|
||||
](templateCompanion: javab.data.codegen.ContractCompanion[TC, TCid, T])(
|
||||
partyId: PartyId,
|
||||
predicate: TC => Boolean = (_: TC) => true,
|
||||
): Seq[TC] = check(FeatureFlag.Testing) {
|
||||
val javaTemplateId = templateCompanion.TEMPLATE_ID
|
||||
val templateId = TemplateId(
|
||||
javaTemplateId.getPackageId,
|
||||
javaTemplateId.getModuleName,
|
||||
javaTemplateId.getEntityName,
|
||||
)
|
||||
ledger_api.acs
|
||||
.of_party(partyId, filterTemplates = Seq(templateId))
|
||||
.map(_.event)
|
||||
.flatMap(ev =>
|
||||
JavaDecodeUtil
|
||||
.decodeCreated(templateCompanion)(
|
||||
javab.data.CreatedEvent.fromProto(CreatedEvent.toJavaProto(ev))
|
||||
)
|
||||
.toList
|
||||
)
|
||||
.filter(predicate)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** @return The modified map where deletion from the original are represented as keys with empty values
|
||||
|
@ -28,9 +28,10 @@ import com.digitalasset.canton.util.Thereafter.syntax.ThereafterOps
|
||||
import com.digitalasset.canton.util.{ErrorUtil, MonadUtil}
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import scala.collection.compat.immutable.ArraySeq
|
||||
import scala.collection.mutable
|
||||
import scala.concurrent.{ExecutionContext, Future, Promise, blocking}
|
||||
import scala.util.{Failure, Success, Try}
|
||||
import scala.util.{Failure, Random, Success, Try}
|
||||
|
||||
class SequencersTransportState(
|
||||
initialSequencerTransports: SequencerTransports,
|
||||
@ -40,6 +41,8 @@ class SequencersTransportState(
|
||||
extends NamedLogging
|
||||
with FlagCloseable {
|
||||
|
||||
private val random: Random = new Random(1L)
|
||||
|
||||
private val closeReasonPromise = Promise[SequencerClient.CloseReason]()
|
||||
|
||||
def completion: Future[SequencerClient.CloseReason] = closeReasonPromise.future
|
||||
@ -86,34 +89,29 @@ class SequencersTransportState(
|
||||
})
|
||||
}.flatten
|
||||
|
||||
def transport(implicit traceContext: TraceContext): SequencerClientTransportCommon = blocking(
|
||||
lock.synchronized {
|
||||
val (_, sequencerTransportState) = state.view
|
||||
.filterKeys(subscriptionIsHealthy)
|
||||
.headOption // TODO (i12377): Introduce round robin
|
||||
// TODO (i12377): Can we fallback to first sequencer transport here or should we
|
||||
// introduce EitherT and propagate error handling?
|
||||
.orElse(state.headOption)
|
||||
.getOrElse(
|
||||
sys.error(
|
||||
"No healthy subscription at the moment. Try again later."
|
||||
) // TODO (i12377): Error handling
|
||||
)
|
||||
sequencerTransportState.transport.clientTransport
|
||||
}
|
||||
)
|
||||
|
||||
def subscriptionIsHealthy(
|
||||
sequencerId: SequencerId
|
||||
)(implicit traceContext: TraceContext): Boolean =
|
||||
transportState(sequencerId)
|
||||
.map(
|
||||
_.subscription
|
||||
.exists(x =>
|
||||
!x.resilientSequencerSubscription.isFailed && !x.resilientSequencerSubscription.isClosing
|
||||
)
|
||||
)
|
||||
.onShutdown(false)
|
||||
def transport(implicit traceContext: TraceContext): SequencerClientTransportCommon =
|
||||
blocking(lock.synchronized {
|
||||
// Pick a random healthy sequencer to send to.
|
||||
// We can use a plain Random instance across all threads calling this method,
|
||||
// because this method anyway uses locking on its own.
|
||||
// (In general, ThreadLocalRandom would void contention on the random number generation, but
|
||||
// the plain Random has the advantage that we can hard-code the seed so that the chosen sequencers
|
||||
// are easier to reproduce for debugging and tests.)
|
||||
val healthySequencers = state.view
|
||||
.collect { case (_sequencerId, state) if state.isSubscriptionHealthy => state }
|
||||
.to(ArraySeq)
|
||||
val chosenSequencer =
|
||||
if (healthySequencers.isEmpty)
|
||||
// TODO(i12377): Can we fallback to first sequencer transport here or should we
|
||||
// introduce EitherT and propagate error handling?
|
||||
state.values.headOption
|
||||
.getOrElse(
|
||||
// TODO(i12377): Error handling
|
||||
ErrorUtil.invalidState("No sequencer subscription at the moment. Try again later.")
|
||||
)
|
||||
else healthySequencers(random.nextInt(healthySequencers.size))
|
||||
chosenSequencer.transport.clientTransport
|
||||
})
|
||||
|
||||
def transport(sequencerId: SequencerId)(implicit
|
||||
traceContext: TraceContext
|
||||
@ -323,6 +321,10 @@ final case class SequencerTransportState(
|
||||
)
|
||||
}
|
||||
|
||||
def isSubscriptionHealthy: Boolean = subscription.exists { subscription =>
|
||||
!subscription.resilientSequencerSubscription.isFailed && !subscription.resilientSequencerSubscription.isClosing
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object SequencerTransportState {
|
||||
|
@ -17,7 +17,7 @@ import com.digitalasset.canton.config.ProcessingTimeout
|
||||
import com.digitalasset.canton.config.RequireTypes.PositiveInt
|
||||
import com.digitalasset.canton.crypto.{PublicKey, SignatureCheckError}
|
||||
import com.digitalasset.canton.data.CantonTimestamp
|
||||
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
|
||||
import com.digitalasset.canton.lifecycle.{FlagCloseable, FutureUnlessShutdown}
|
||||
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
|
||||
import com.digitalasset.canton.logging.{ErrorLoggingContext, NamedLoggerFactory, NamedLogging}
|
||||
import com.digitalasset.canton.resource.{DbStorage, MemoryStorage, Storage}
|
||||
@ -330,7 +330,7 @@ object TimeQuery {
|
||||
}
|
||||
|
||||
trait TopologyStoreCommon[+StoreID <: TopologyStoreId, ValidTx, StoredTx, SignedTx]
|
||||
extends AutoCloseable {
|
||||
extends FlagCloseable {
|
||||
|
||||
this: NamedLogging =>
|
||||
|
||||
|
@ -264,7 +264,7 @@ object TopologyStoreX {
|
||||
val storeLoggerFactory = loggerFactory.append("store", storeId.toString)
|
||||
storage match {
|
||||
case _: MemoryStorage =>
|
||||
new InMemoryTopologyStoreX(storeId, storeLoggerFactory)
|
||||
new InMemoryTopologyStoreX(storeId, storeLoggerFactory, timeouts)
|
||||
case dbStorage: DbStorage =>
|
||||
new DbTopologyStoreX(dbStorage, storeId, maxDbConnections, timeouts, storeLoggerFactory)
|
||||
}
|
||||
|
@ -111,7 +111,7 @@ trait InMemoryTopologyStoreCommon[+StoreId <: TopologyStoreId] extends NamedLogg
|
||||
class InMemoryTopologyStore[+StoreId <: TopologyStoreId](
|
||||
val storeId: StoreId,
|
||||
val loggerFactory: NamedLoggerFactory,
|
||||
timeouts: ProcessingTimeout,
|
||||
override val timeouts: ProcessingTimeout,
|
||||
futureSupervisor: FutureSupervisor,
|
||||
)(implicit val ec: ExecutionContext)
|
||||
extends TopologyStore[StoreId]
|
||||
@ -616,6 +616,6 @@ class InMemoryTopologyStore[+StoreId <: TopologyStoreId](
|
||||
Future.successful(StoredTopologyTransactions(ret.toSeq))
|
||||
})
|
||||
|
||||
override def close(): Unit = ()
|
||||
override def onClosed(): Unit = ()
|
||||
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
package com.digitalasset.canton.topology.store.memory
|
||||
|
||||
import com.daml.nonempty.NonEmpty
|
||||
import com.digitalasset.canton.config.ProcessingTimeout
|
||||
import com.digitalasset.canton.data.CantonTimestamp
|
||||
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
|
||||
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
|
||||
@ -32,12 +33,13 @@ import scala.concurrent.{ExecutionContext, Future, blocking}
|
||||
class InMemoryTopologyStoreX[+StoreId <: TopologyStoreId](
|
||||
val storeId: StoreId,
|
||||
val loggerFactory: NamedLoggerFactory,
|
||||
override val timeouts: ProcessingTimeout,
|
||||
)(implicit ec: ExecutionContext)
|
||||
extends TopologyStoreX[StoreId]
|
||||
with InMemoryTopologyStoreCommon[StoreId]
|
||||
with NamedLogging {
|
||||
|
||||
override def close(): Unit = {}
|
||||
override def onClosed(): Unit = ()
|
||||
|
||||
private case class TopologyStoreEntry(
|
||||
transaction: GenericSignedTopologyTransactionX,
|
||||
|
@ -35,6 +35,13 @@ abstract class DomainTopologyStoreBase[
|
||||
val item = createTopologyStore(storeId)
|
||||
store.set(Some(item))
|
||||
item
|
||||
case Some(value) if value.isClosing =>
|
||||
loggerFactory
|
||||
.getLogger(getClass)
|
||||
.debug(s"Topology store $storeId exists but is closed. Creating a new one.")
|
||||
val item = createTopologyStore(storeId)
|
||||
store.set(Some(item))
|
||||
item
|
||||
case Some(value) =>
|
||||
if (storeId != value.storeId) {
|
||||
loggerFactory
|
||||
|
@ -180,7 +180,7 @@ class IncomingTopologyTransactionAuthorizationValidatorTestX
|
||||
|
||||
def mk(
|
||||
store: InMemoryTopologyStoreX[TopologyStoreId] =
|
||||
new InMemoryTopologyStoreX(DomainStore(Factory.domainId1), loggerFactory)
|
||||
new InMemoryTopologyStoreX(DomainStore(Factory.domainId1), loggerFactory, timeouts)
|
||||
) = {
|
||||
val validator =
|
||||
new IncomingTopologyTransactionAuthorizationValidatorX(
|
||||
@ -311,7 +311,8 @@ class IncomingTopologyTransactionAuthorizationValidatorTestX
|
||||
}
|
||||
}
|
||||
"succeed and use load existing delegations" in {
|
||||
val store = new InMemoryTopologyStoreX(TopologyStoreId.AuthorizedStore, loggerFactory)
|
||||
val store =
|
||||
new InMemoryTopologyStoreX(TopologyStoreId.AuthorizedStore, loggerFactory, timeouts)
|
||||
val validator = mk(store)
|
||||
import Factory.*
|
||||
for {
|
||||
@ -414,7 +415,7 @@ class IncomingTopologyTransactionAuthorizationValidatorTestX
|
||||
}
|
||||
"succeed with loading existing identifier delegations" in {
|
||||
val store: InMemoryTopologyStoreX[TopologyStoreId.AuthorizedStore] =
|
||||
new InMemoryTopologyStoreX(TopologyStoreId.AuthorizedStore, loggerFactory)
|
||||
new InMemoryTopologyStoreX(TopologyStoreId.AuthorizedStore, loggerFactory, timeouts)
|
||||
val validator = mk(store)
|
||||
import Factory.*
|
||||
for {
|
||||
@ -476,7 +477,8 @@ class IncomingTopologyTransactionAuthorizationValidatorTestX
|
||||
|
||||
"correctly determine cascading update for" should {
|
||||
"namespace additions" in {
|
||||
val store = new InMemoryTopologyStoreX(TopologyStoreId.AuthorizedStore, loggerFactory)
|
||||
val store =
|
||||
new InMemoryTopologyStoreX(TopologyStoreId.AuthorizedStore, loggerFactory, timeouts)
|
||||
val validator = mk(store)
|
||||
import Factory.*
|
||||
for {
|
||||
@ -499,7 +501,8 @@ class IncomingTopologyTransactionAuthorizationValidatorTestX
|
||||
}
|
||||
|
||||
"namespace removals" in {
|
||||
val store = new InMemoryTopologyStoreX(TopologyStoreId.AuthorizedStore, loggerFactory)
|
||||
val store =
|
||||
new InMemoryTopologyStoreX(TopologyStoreId.AuthorizedStore, loggerFactory, timeouts)
|
||||
val validator = mk(store)
|
||||
import Factory.*
|
||||
val Rns1k1_k1 = mkTrans(ns1k1_k1.transaction.reverse)
|
||||
@ -524,7 +527,8 @@ class IncomingTopologyTransactionAuthorizationValidatorTestX
|
||||
}
|
||||
|
||||
"identifier additions and removals" in {
|
||||
val store = new InMemoryTopologyStoreX(TopologyStoreId.AuthorizedStore, loggerFactory)
|
||||
val store =
|
||||
new InMemoryTopologyStoreX(TopologyStoreId.AuthorizedStore, loggerFactory, timeouts)
|
||||
val validator = mk(store)
|
||||
import Factory.*
|
||||
val Rid1ak4_k1 = mkTrans(id1ak4_k1.transaction.reverse)
|
||||
@ -556,7 +560,8 @@ class IncomingTopologyTransactionAuthorizationValidatorTestX
|
||||
}
|
||||
|
||||
"cascading invalidation pre-existing identifier uids" in {
|
||||
val store = new InMemoryTopologyStoreX(TopologyStoreId.AuthorizedStore, loggerFactory)
|
||||
val store =
|
||||
new InMemoryTopologyStoreX(TopologyStoreId.AuthorizedStore, loggerFactory, timeouts)
|
||||
val validator = mk(store)
|
||||
import Factory.*
|
||||
import Factory.SigningKeys.{ec as _, *}
|
||||
|
@ -15,6 +15,6 @@ class InMemoryDownloadTopologyStateForInitializationServiceTest
|
||||
extends DownloadTopologyStateForInitializationServiceTest {
|
||||
override protected def createTopologyStore(): TopologyStoreX[TopologyStoreId.DomainStore] = {
|
||||
val storeId = DomainStore(DefaultTestIdentities.domainId, getClass.getSimpleName.take(40))
|
||||
new InMemoryTopologyStoreX[TopologyStoreId.DomainStore](storeId, loggerFactory)
|
||||
new InMemoryTopologyStoreX[TopologyStoreId.DomainStore](storeId, loggerFactory, timeouts)
|
||||
}
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ class InMemoryTopologyStoreXTest extends TopologyStoreXTest {
|
||||
new InMemoryTopologyStoreX(
|
||||
TopologyStoreId.AuthorizedStore,
|
||||
loggerFactory,
|
||||
timeouts,
|
||||
)
|
||||
)
|
||||
}
|
||||
|
@ -127,6 +127,9 @@ object AuthServiceJWTCodec {
|
||||
private[this] final val propSub: String = "sub"
|
||||
private[this] final val propScope: String = "scope"
|
||||
private[this] final val propParty: String = "party" // Legacy JSON API payload
|
||||
private[this] final val legacyProperties: List[String] =
|
||||
List(propLedgerId, propParticipantId, propApplicationId, propAdmin, propActAs, propReadAs)
|
||||
private[this] final val scopeRegex = """(?:[\w\-]+/)?([\w\-]+)""".r
|
||||
|
||||
// ------------------------------------------------------------------------------------------------------------------
|
||||
// Encoding
|
||||
@ -216,10 +219,17 @@ object AuthServiceJWTCodec {
|
||||
parsed <- Try(readPayload(json))
|
||||
} yield parsed
|
||||
|
||||
def readPayload(value: JsValue): AuthServiceJWTPayload = value match {
|
||||
private def readPayload(value: JsValue): AuthServiceJWTPayload = value match {
|
||||
case JsObject(fields) =>
|
||||
val scope = fields.get(propScope)
|
||||
val scopes = scope.toList.collect({ case JsString(scope) => scope.split(" ") }).flatten
|
||||
// Support scopes in two formats: 'daml_ledger_api' and '<arbitrary_resource_server_name>/daml_ledger_api'
|
||||
val scopes = scope.toList
|
||||
.collect({ case JsString(scope) => scope.split(" ") })
|
||||
.flatten
|
||||
.map {
|
||||
case scopeRegex(scopeSuffix) => scopeSuffix
|
||||
case fullScope => fullScope
|
||||
}
|
||||
// We're using this rather restrictive test to ensure we continue parsing all legacy sandbox tokens that
|
||||
// are in use before the 2.0 release; and thereby maintain full backwards compatibility.
|
||||
val audienceValue = readOptionalStringOrArray(propAud, fields)
|
||||
@ -271,51 +281,56 @@ object AuthServiceJWTCodec {
|
||||
format = StandardJWTTokenFormat.Scope,
|
||||
audiences = List.empty, // we do not read or extract audience claims for Scope-based tokens
|
||||
)
|
||||
} else {
|
||||
scope.foreach(s =>
|
||||
logger.warn(
|
||||
s"Access token with unknown scope \"$s\" is being parsed as a custom claims token. Issue tokens with adjusted or no scope to get rid of this warning."
|
||||
)
|
||||
)
|
||||
if (!fields.contains(oidcNamespace)) {
|
||||
// Legacy format
|
||||
logger.warn(s"Token ${value.compactPrint} is using a deprecated JWT payload format")
|
||||
CustomDamlJWTPayload(
|
||||
ledgerId = readOptionalString(propLedgerId, fields),
|
||||
participantId = readOptionalString(propParticipantId, fields),
|
||||
applicationId = readOptionalString(propApplicationId, fields),
|
||||
exp = readInstant(propExp, fields),
|
||||
admin = readOptionalBoolean(propAdmin, fields).getOrElse(false),
|
||||
actAs = readOptionalStringList(propActAs, fields) ++ readOptionalString(
|
||||
propParty,
|
||||
fields,
|
||||
).toList,
|
||||
readAs = readOptionalStringList(propReadAs, fields),
|
||||
)
|
||||
} else {
|
||||
// New format: OIDC compliant
|
||||
val customClaims = fields
|
||||
.getOrElse(
|
||||
oidcNamespace,
|
||||
deserializationError(
|
||||
s"Could not read ${value.prettyPrint} as AuthServiceJWTPayload: namespace missing"
|
||||
),
|
||||
} else
|
||||
fields.get(oidcNamespace) match {
|
||||
case Some(oidcNamespaceField) =>
|
||||
// Custom claims format, OIDC compliant
|
||||
val customClaims = oidcNamespaceField
|
||||
.asJsObject(
|
||||
s"Could not read ${value.prettyPrint} as AuthServiceJWTPayload: namespace is not an object"
|
||||
)
|
||||
.fields
|
||||
CustomDamlJWTPayload(
|
||||
ledgerId = readOptionalString(propLedgerId, customClaims),
|
||||
participantId = readOptionalString(propParticipantId, customClaims),
|
||||
applicationId = readOptionalString(propApplicationId, customClaims),
|
||||
exp = readInstant(propExp, fields),
|
||||
admin = readOptionalBoolean(propAdmin, customClaims).getOrElse(false),
|
||||
actAs = readOptionalStringList(propActAs, customClaims),
|
||||
readAs = readOptionalStringList(propReadAs, customClaims),
|
||||
)
|
||||
.asJsObject(
|
||||
s"Could not read ${value.prettyPrint} as AuthServiceJWTPayload: namespace is not an object"
|
||||
case None if legacyProperties.exists(fields.contains) =>
|
||||
// Legacy custom format without the nesting underneath the OIDC compliant extension
|
||||
logger.warn(s"Token ${value.compactPrint} is using a deprecated JWT payload format")
|
||||
CustomDamlJWTPayload(
|
||||
ledgerId = readOptionalString(propLedgerId, fields),
|
||||
participantId = readOptionalString(propParticipantId, fields),
|
||||
applicationId = readOptionalString(propApplicationId, fields),
|
||||
exp = readInstant(propExp, fields),
|
||||
admin = readOptionalBoolean(propAdmin, fields).getOrElse(false),
|
||||
actAs = readOptionalStringList(propActAs, fields) ++ readOptionalString(
|
||||
propParty,
|
||||
fields,
|
||||
).toList,
|
||||
readAs = readOptionalStringList(propReadAs, fields),
|
||||
)
|
||||
case None if scopes.nonEmpty =>
|
||||
deserializationError(
|
||||
msg =
|
||||
s"Access token with unknown scope \"${scopes.mkString}\". Issue tokens with adjusted or no scope to get rid of this warning."
|
||||
)
|
||||
case _ =>
|
||||
logger.warn(s"Token ${value.compactPrint} is using an unsupported format")
|
||||
CustomDamlJWTPayload(
|
||||
ledgerId = None,
|
||||
participantId = None,
|
||||
applicationId = None,
|
||||
exp = readInstant(propExp, fields),
|
||||
admin = false,
|
||||
actAs = List.empty,
|
||||
readAs = List.empty,
|
||||
)
|
||||
.fields
|
||||
CustomDamlJWTPayload(
|
||||
ledgerId = readOptionalString(propLedgerId, customClaims),
|
||||
participantId = readOptionalString(propParticipantId, customClaims),
|
||||
applicationId = readOptionalString(propApplicationId, customClaims),
|
||||
exp = readInstant(propExp, fields),
|
||||
admin = readOptionalBoolean(propAdmin, customClaims).getOrElse(false),
|
||||
actAs = readOptionalStringList(propActAs, customClaims),
|
||||
readAs = readOptionalStringList(propReadAs, customClaims),
|
||||
)
|
||||
}
|
||||
}
|
||||
case _ =>
|
||||
deserializationError(
|
||||
s"Could not read ${value.prettyPrint} as AuthServiceJWTPayload: value is not an object"
|
||||
|
@ -4,7 +4,6 @@
|
||||
package com.digitalasset.canton.ledger.client.services.commands
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.scaladsl.{Flow, Source}
|
||||
import com.daml.grpc.adapter.ExecutionSequencerFactory
|
||||
import com.daml.ledger.api.v1.command_completion_service.CommandCompletionServiceGrpc.CommandCompletionServiceStub
|
||||
@ -14,16 +13,11 @@ import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
|
||||
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
|
||||
import com.digitalasset.canton.ledger.api.domain.LedgerId
|
||||
import com.digitalasset.canton.ledger.client.configuration.CommandClientConfiguration
|
||||
import com.digitalasset.canton.ledger.client.services.commands.CommandTrackerFlow.Materialized
|
||||
import com.digitalasset.canton.ledger.client.services.commands.tracker.CompletionResponse.{
|
||||
CompletionFailure,
|
||||
CompletionSuccess,
|
||||
}
|
||||
import com.digitalasset.canton.logging.NamedLoggerFactory
|
||||
import com.digitalasset.canton.util.Ctx
|
||||
import com.google.protobuf.empty.Empty
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.concurrent.Future
|
||||
import scala.util.Try
|
||||
|
||||
/** Enables easy access to command services and high level operations on top of them.
|
||||
@ -50,18 +44,8 @@ final class CommandClient(
|
||||
loggerFactory,
|
||||
)
|
||||
|
||||
type TrackCommandFlow[Context] =
|
||||
Flow[
|
||||
Ctx[Context, CommandSubmission],
|
||||
Ctx[Context, Either[CompletionFailure, CompletionSuccess]],
|
||||
Materialized[
|
||||
NotUsed,
|
||||
Context,
|
||||
],
|
||||
]
|
||||
|
||||
/** Submit a single command. Successful result does not guarantee that the resulting transaction has been written to
|
||||
* the ledger. In order to get that semantic, use [[trackCommands]] or [[trackCommandsUnbounded]].
|
||||
* the ledger.
|
||||
*/
|
||||
def submitSingleCommand(
|
||||
submitRequest: SubmitRequest,
|
||||
@ -69,35 +53,6 @@ final class CommandClient(
|
||||
): Future[Empty] =
|
||||
it.submitSingleCommand(submitRequest, token)
|
||||
|
||||
/** Submits and tracks a single command. High frequency usage is discouraged as it causes a dedicated completion
|
||||
* stream to be established and torn down.
|
||||
*/
|
||||
def trackSingleCommand(submitRequest: SubmitRequest, token: Option[String] = None)(implicit
|
||||
mat: Materializer
|
||||
): Future[Either[CompletionFailure, CompletionSuccess]] =
|
||||
it.trackSingleCommand(submitRequest, ledgerId, token)
|
||||
|
||||
/** Tracks the results (including timeouts) of incoming commands.
|
||||
* Applies a maximum bound for in-flight commands which have been submitted, but not confirmed through command completions.
|
||||
*
|
||||
* The resulting flow will backpressure if downstream backpressures, independently of the number of in-flight commands.
|
||||
*
|
||||
* @param parties Commands that have a submitting party which is not part of this collection will fail the stream.
|
||||
*/
|
||||
def trackCommands[Context](parties: Seq[String], token: Option[String] = None)(implicit
|
||||
ec: ExecutionContext
|
||||
): Future[TrackCommandFlow[Context]] = it.trackCommands(parties, ledgerId, token)
|
||||
|
||||
/** Tracks the results (including timeouts) of incoming commands.
|
||||
*
|
||||
* The resulting flow will backpressure if downstream backpressures, independently of the number of in-flight commands.
|
||||
*
|
||||
* @param parties Commands that have a submitting party which is not part of this collection will fail the stream.
|
||||
*/
|
||||
def trackCommandsUnbounded[Context](parties: Seq[String], token: Option[String] = None)(implicit
|
||||
ec: ExecutionContext
|
||||
): Future[TrackCommandFlow[Context]] = it.trackCommandsUnbounded(parties, ledgerId, token)
|
||||
|
||||
def completionSource(
|
||||
parties: Seq[String],
|
||||
offset: LedgerOffset,
|
||||
|
@ -1,129 +0,0 @@
|
||||
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.canton.ledger.client.services.commands
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.{Concat, Flow, GraphDSL, Merge, Source}
|
||||
import akka.stream.{DelayOverflowStrategy, FlowShape, OverflowStrategy}
|
||||
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
|
||||
import com.digitalasset.canton.DiscardOps
|
||||
import com.digitalasset.canton.ledger.client.services.commands.tracker.CompletionResponse.{
|
||||
CompletionFailure,
|
||||
CompletionSuccess,
|
||||
}
|
||||
import com.digitalasset.canton.ledger.client.services.commands.tracker.{
|
||||
CommandTracker,
|
||||
TrackedCommandKey,
|
||||
}
|
||||
import com.digitalasset.canton.util.Ctx
|
||||
import com.google.protobuf.empty.Empty
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import java.time.Duration
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration.{DurationInt, FiniteDuration}
|
||||
import scala.util.Try
|
||||
|
||||
/** Tracks commands and emits results upon their completion or timeout.
|
||||
* The outbound data is minimal, users must put whatever else they need into the context objects.
|
||||
*/
|
||||
object CommandTrackerFlow {
|
||||
|
||||
private val logger = LoggerFactory.getLogger(getClass)
|
||||
|
||||
final case class Materialized[SubmissionMat, Context](
|
||||
submissionMat: SubmissionMat,
|
||||
trackingMat: Future[immutable.Map[TrackedCommandKey, Context]],
|
||||
)
|
||||
|
||||
def apply[Context, SubmissionMat](
|
||||
commandSubmissionFlow: Flow[
|
||||
Ctx[(Context, TrackedCommandKey), CommandSubmission],
|
||||
Ctx[(Context, TrackedCommandKey), Try[
|
||||
Empty
|
||||
]],
|
||||
SubmissionMat,
|
||||
],
|
||||
createCommandCompletionSource: LedgerOffset => Source[CompletionStreamElement, NotUsed],
|
||||
startingOffset: LedgerOffset,
|
||||
maximumCommandTimeout: Duration,
|
||||
backOffDuration: FiniteDuration = 1.second,
|
||||
timeoutDetectionPeriod: FiniteDuration = 1.second,
|
||||
): Flow[Ctx[Context, CommandSubmission], Ctx[
|
||||
Context,
|
||||
Either[CompletionFailure, CompletionSuccess],
|
||||
], Materialized[
|
||||
SubmissionMat,
|
||||
Context,
|
||||
]] = {
|
||||
|
||||
val trackerExternal = new CommandTracker[Context](maximumCommandTimeout, timeoutDetectionPeriod)
|
||||
|
||||
Flow.fromGraph(GraphDSL.createGraph(commandSubmissionFlow, trackerExternal)(Materialized.apply) {
|
||||
implicit builder => (submissionFlow, tracker) =>
|
||||
import GraphDSL.Implicits.*
|
||||
|
||||
val wrapResult =
|
||||
builder.add(Flow[Ctx[(Context, TrackedCommandKey), Try[Empty]]].map(Left.apply))
|
||||
|
||||
val wrapCompletion = builder.add(Flow[CompletionStreamElement].map(Right.apply))
|
||||
|
||||
val merge = builder.add(
|
||||
Merge[Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]](
|
||||
inputPorts = 2,
|
||||
eagerComplete = false,
|
||||
)
|
||||
)
|
||||
|
||||
val startAt = builder.add(Source.single(startingOffset))
|
||||
val concat = builder.add(Concat[LedgerOffset](2))
|
||||
|
||||
val completionFlow = builder.add(
|
||||
Flow[LedgerOffset]
|
||||
.buffer(1, OverflowStrategy.dropHead) // storing the last offset
|
||||
.expand(offset =>
|
||||
Iterator.iterate(offset)(identity)
|
||||
) // so we always have an element to fetch
|
||||
.flatMapConcat(
|
||||
createCommandCompletionSource(_).recoverWithRetries(
|
||||
1,
|
||||
{ case e =>
|
||||
logger.warn(
|
||||
s"Completion Stream failed with an error. Trying to recover in $backOffDuration..."
|
||||
)
|
||||
logger.debug(
|
||||
s"Completion Stream failed with an error. Trying to recover in $backOffDuration...",
|
||||
e,
|
||||
)
|
||||
delayedEmptySource(backOffDuration)
|
||||
},
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
// format: OFF
|
||||
(startAt.out ~> concat).discard
|
||||
(tracker.offsetOut ~> concat).discard
|
||||
concat.out ~> completionFlow.in
|
||||
|
||||
tracker.submitRequestOut ~> submissionFlow ~> wrapResult ~> merge.in(0)
|
||||
tracker.commandResultIn <~ merge.out
|
||||
merge.in(1) <~ wrapCompletion <~ completionFlow.out
|
||||
// format: ON
|
||||
|
||||
FlowShape(tracker.submitRequestIn, tracker.resultOut)
|
||||
})
|
||||
}
|
||||
|
||||
private def delayedEmptySource(
|
||||
delay: FiniteDuration
|
||||
): Source[CompletionStreamElement, NotUsed] = {
|
||||
Source
|
||||
.single(1)
|
||||
.delay(delay, DelayOverflowStrategy.backpressure)
|
||||
.flatMapConcat(_ => Source.empty[CompletionStreamElement])
|
||||
}
|
||||
|
||||
}
|
@ -1,380 +0,0 @@
|
||||
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.canton.ledger.client.services.commands.tracker
|
||||
|
||||
import akka.stream.stage.*
|
||||
import akka.stream.{Attributes, Inlet, Outlet}
|
||||
import com.daml.grpc.{GrpcException, GrpcStatus}
|
||||
import com.daml.ledger.api.v1.command_completion_service.Checkpoint
|
||||
import com.daml.ledger.api.v1.completion.Completion
|
||||
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
|
||||
import com.digitalasset.canton.DiscardOps
|
||||
import com.digitalasset.canton.ledger.client.services.commands.tracker.CommandTracker.*
|
||||
import com.digitalasset.canton.ledger.client.services.commands.tracker.CompletionResponse.{
|
||||
CompletionFailure,
|
||||
CompletionSuccess,
|
||||
}
|
||||
import com.digitalasset.canton.ledger.client.services.commands.{
|
||||
CommandSubmission,
|
||||
CompletionStreamElement,
|
||||
tracker,
|
||||
}
|
||||
import com.digitalasset.canton.util.Ctx
|
||||
import com.google.protobuf.empty.Empty
|
||||
import com.google.rpc.status.Status as StatusProto
|
||||
import io.grpc.Status
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import java.time.{Duration, Instant}
|
||||
import scala.collection.{immutable, mutable}
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.{Future, Promise}
|
||||
import scala.util.control.NoStackTrace
|
||||
import scala.util.{Failure, Success, Try}
|
||||
|
||||
/** Implements the logic of command tracking via two streams, a submit request and command completion stream.
|
||||
* These streams behave like standard `Flows`, applying tracking and processing logic along the way,
|
||||
* except that:
|
||||
* <ul><li>
|
||||
* if the command completion stream is failed, cancelled or completed, the submit request
|
||||
* stream is completed,
|
||||
* </li><li>
|
||||
* if the request stream is cancelled or completed, and there are no outstanding tracked commands,
|
||||
* the command stream is completed, and
|
||||
* </li><li>
|
||||
* if the request stream is failed, the stage completes and failure is transmitted to the result stream outlet.
|
||||
* </li></ul>
|
||||
* Materializes a future that completes when this stage completes or fails,
|
||||
* yielding a map containing any commands that were not completed.
|
||||
* </li></ul>
|
||||
* We also have an output for offsets, so the most recent offsets can be reused for recovery.
|
||||
*/
|
||||
private[commands] class CommandTracker[Context](
|
||||
maximumCommandTimeout: Duration,
|
||||
timeoutDetectionPeriod: FiniteDuration,
|
||||
) extends GraphStageWithMaterializedValue[
|
||||
CommandTrackerShape[Context],
|
||||
Future[Map[TrackedCommandKey, Context]],
|
||||
] {
|
||||
|
||||
private val logger = LoggerFactory.getLogger(this.getClass.getName)
|
||||
|
||||
val submitRequestIn: Inlet[Ctx[Context, CommandSubmission]] =
|
||||
Inlet[Ctx[Context, CommandSubmission]]("submitRequestIn")
|
||||
val submitRequestOut: Outlet[Ctx[(Context, TrackedCommandKey), CommandSubmission]] =
|
||||
Outlet[Ctx[(Context, TrackedCommandKey), CommandSubmission]]("submitRequestOut")
|
||||
val commandResultIn
|
||||
: Inlet[Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]] =
|
||||
Inlet[Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]](
|
||||
"commandResultIn"
|
||||
)
|
||||
val resultOut: Outlet[ContextualizedCompletionResponse[Context]] =
|
||||
Outlet[ContextualizedCompletionResponse[Context]]("resultOut")
|
||||
val offsetOut: Outlet[LedgerOffset] =
|
||||
Outlet[LedgerOffset]("offsetOut")
|
||||
|
||||
override def createLogicAndMaterializedValue(
|
||||
inheritedAttributes: Attributes
|
||||
): (GraphStageLogic, Future[Map[TrackedCommandKey, Context]]) = {
|
||||
|
||||
val promise = Promise[immutable.Map[TrackedCommandKey, Context]]()
|
||||
|
||||
val logic: TimerGraphStageLogic = new TimerGraphStageLogic(shape) {
|
||||
|
||||
val timeout_detection = "timeout-detection"
|
||||
|
||||
override def preStart(): Unit = {
|
||||
scheduleWithFixedDelay(timeout_detection, timeoutDetectionPeriod, timeoutDetectionPeriod)
|
||||
}
|
||||
|
||||
override protected def onTimer(timerKey: Any): Unit = {
|
||||
timerKey match {
|
||||
case `timeout_detection` =>
|
||||
val timeouts = getResponsesForTimeouts(Instant.now)
|
||||
if (timeouts.nonEmpty) emitMultiple(resultOut, timeouts.to(immutable.Iterable))
|
||||
case _ => // unknown timer, nothing to do
|
||||
}
|
||||
}
|
||||
|
||||
private val pendingCommands = new mutable.HashMap[TrackedCommandKey, TrackingData[Context]]()
|
||||
|
||||
setHandler(
|
||||
submitRequestOut,
|
||||
new OutHandler {
|
||||
override def onPull(): Unit = pull(submitRequestIn)
|
||||
|
||||
override def onDownstreamFinish(cause: Throwable): Unit = {
|
||||
cancel(submitRequestIn)
|
||||
completeStageIfTerminal()
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
setHandler(
|
||||
submitRequestIn,
|
||||
new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
val submitRequest = grab(submitRequestIn)
|
||||
registerSubmission(submitRequest)
|
||||
val commands = submitRequest.value.commands
|
||||
val submissionId = commands.submissionId
|
||||
val commandId = commands.commandId
|
||||
logger.trace(s"Submitted command $commandId in submission $submissionId.")
|
||||
push(
|
||||
submitRequestOut,
|
||||
submitRequest.enrich((context, _) =>
|
||||
context -> TrackedCommandKey(submissionId, commandId)
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
logger.trace("Command upstream finished.")
|
||||
complete(submitRequestOut)
|
||||
completeStageIfTerminal()
|
||||
}
|
||||
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = {
|
||||
fail(resultOut, ex)
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
setHandler(
|
||||
resultOut,
|
||||
new OutHandler {
|
||||
override def onPull(): Unit = if (!hasBeenPulled(commandResultIn)) pull(commandResultIn)
|
||||
},
|
||||
)
|
||||
|
||||
setHandler(
|
||||
commandResultIn,
|
||||
new InHandler {
|
||||
|
||||
/** This port was pulled by [[resultOut]], so that port expects an output.
|
||||
* If processing the input produces one, we push it through [[resultOut]], otherwise we pull this port again.
|
||||
* If multiple outputs are produced (possible with timeouts only) we get rid of them with emitMultiple.
|
||||
*/
|
||||
override def onPush(): Unit = {
|
||||
grab(commandResultIn) match {
|
||||
case Left(submitResponse) =>
|
||||
pushResultOrPullCommandResultIn(handleSubmitResponse(submitResponse))
|
||||
|
||||
case Right(CompletionStreamElement.CompletionElement(completion, checkpoint)) =>
|
||||
pushResultOrPullCommandResultIn(getResponseForCompletion(completion, checkpoint))
|
||||
|
||||
case Right(CompletionStreamElement.CheckpointElement(checkpoint)) =>
|
||||
if (!hasBeenPulled(commandResultIn)) pull(commandResultIn)
|
||||
checkpoint.offset.foreach(emit(offsetOut, _))
|
||||
}
|
||||
|
||||
completeStageIfTerminal()
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
setHandler(
|
||||
offsetOut,
|
||||
new OutHandler {
|
||||
override def onPull(): Unit =
|
||||
() // nothing to do here as the offset stream will be read with constant demand, storing the latest element
|
||||
},
|
||||
)
|
||||
|
||||
private def pushResultOrPullCommandResultIn(
|
||||
completionResponse: Option[ContextualizedCompletionResponse[Context]]
|
||||
): Unit = {
|
||||
// The command tracker detects timeouts outside the regular pull/push
|
||||
// mechanism of the input/output ports. Basically the timeout
|
||||
// detection jumps the line when emitting outputs on `resultOut`. If it
|
||||
// then processes a regular completion, it tries to push to `resultOut`
|
||||
// even though it hasn't been pulled again in the meantime. Using `emit`
|
||||
// instead of `push` when a completion arrives makes akka take care of
|
||||
// handling the signaling properly.
|
||||
completionResponse match {
|
||||
case Some(response) => emit(resultOut, response)
|
||||
case None =>
|
||||
if (!hasBeenPulled(commandResultIn)) {
|
||||
pull(commandResultIn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def completeStageIfTerminal(): Unit = {
|
||||
if (isClosed(submitRequestIn) && pendingCommands.isEmpty) {
|
||||
completeStage()
|
||||
}
|
||||
}
|
||||
|
||||
import CommandTracker.nonTerminalCodes
|
||||
|
||||
private def handleSubmitResponse(
|
||||
submitResponse: Ctx[(Context, TrackedCommandKey), Try[Empty]]
|
||||
): Option[ContextualizedCompletionResponse[Context]] = {
|
||||
val Ctx((_, commandKey), value, _) = submitResponse
|
||||
value match {
|
||||
case Failure(GrpcException(status @ GrpcStatus(code, _), metadata))
|
||||
if !nonTerminalCodes(code) =>
|
||||
getResponseForTerminalStatusCode(
|
||||
commandKey,
|
||||
GrpcStatus.toProto(status, metadata),
|
||||
)
|
||||
case Failure(throwable) =>
|
||||
logger.warn(
|
||||
s"Service responded with error for submitting command with context ${submitResponse.context}. Status of command is unknown. watching for completion...",
|
||||
throwable,
|
||||
)
|
||||
None
|
||||
case Success(_) =>
|
||||
logger.trace(
|
||||
s"Received confirmation that command ${commandKey.commandId} from submission ${commandKey.submissionId} was accepted."
|
||||
)
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
private def registerSubmission(submission: Ctx[Context, CommandSubmission]): Unit = {
|
||||
val commands = submission.value.commands
|
||||
val submissionId = commands.submissionId
|
||||
val commandId = commands.commandId
|
||||
logger.trace(s"Begin tracking of command $commandId for submission $submissionId.")
|
||||
if (submissionId.isEmpty) {
|
||||
throw new IllegalArgumentException(
|
||||
s"The submission ID for the command ID $commandId is empty. This should not happen."
|
||||
)
|
||||
}
|
||||
if (pendingCommands.contains(TrackedCommandKey(submissionId, commandId))) {
|
||||
// TODO(i12280) return an error identical to the server side duplicate command error once that's defined.
|
||||
throw new IllegalStateException(
|
||||
s"A command $commandId from a submission $submissionId is already being tracked. CommandIds submitted to the CommandTracker must be unique."
|
||||
) with NoStackTrace
|
||||
}
|
||||
val commandTimeout = submission.value.timeout match {
|
||||
case Some(timeout) => durationOrdering.min(timeout, maximumCommandTimeout)
|
||||
case None => maximumCommandTimeout
|
||||
}
|
||||
val trackingData = TrackingData(
|
||||
commandId = commandId,
|
||||
commandTimeout = Instant.now().plus(commandTimeout),
|
||||
context = submission.context,
|
||||
)
|
||||
pendingCommands += TrackedCommandKey(submissionId, commandId) -> trackingData
|
||||
()
|
||||
}
|
||||
|
||||
private def getResponsesForTimeouts(
|
||||
instant: Instant
|
||||
): Seq[ContextualizedCompletionResponse[Context]] = {
|
||||
logger.trace("Checking timeouts at {}", instant)
|
||||
pendingCommands.view.flatMap { case (commandKey, trackingData) =>
|
||||
if (trackingData.commandTimeout.isBefore(instant)) {
|
||||
pendingCommands -= commandKey
|
||||
logger.info(
|
||||
s"Command {} from submission {} (command timeout {}) timed out at checkpoint {}.",
|
||||
commandKey.commandId,
|
||||
commandKey.submissionId,
|
||||
trackingData.commandTimeout,
|
||||
instant,
|
||||
)
|
||||
List(
|
||||
Ctx(
|
||||
trackingData.context,
|
||||
Left(CompletionResponse.TimeoutResponse(commandId = trackingData.commandId)),
|
||||
)
|
||||
)
|
||||
} else {
|
||||
Nil
|
||||
}
|
||||
}.toSeq
|
||||
}
|
||||
|
||||
private def getResponseForCompletion(
|
||||
completion: Completion,
|
||||
checkpoint: Option[Checkpoint],
|
||||
): Option[ContextualizedCompletionResponse[Context]] = {
|
||||
val commandId = completion.commandId
|
||||
val maybeSubmissionId = Option(completion.submissionId).filter(_.nonEmpty)
|
||||
logger.trace {
|
||||
val completionDescription = completion.status match {
|
||||
case Some(StatusProto(code, _, _, _)) if code == Status.Code.OK.value =>
|
||||
"successful completion of command"
|
||||
case _ => "failed completion of command"
|
||||
}
|
||||
s"Handling $completionDescription $commandId from submission $maybeSubmissionId."
|
||||
}
|
||||
|
||||
maybeSubmissionId
|
||||
.map { submissionId =>
|
||||
val key = TrackedCommandKey(submissionId, completion.commandId)
|
||||
val trackedCommandForCompletion = pendingCommands.remove(key)
|
||||
trackedCommandForCompletion.map(trackingData =>
|
||||
Ctx(
|
||||
trackingData.context,
|
||||
tracker.CompletionResponse(completion = completion, checkpoint = checkpoint),
|
||||
)
|
||||
)
|
||||
}
|
||||
.getOrElse {
|
||||
logger.warn("Ignoring a completion with an empty submission ID.")
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
private def getResponseForTerminalStatusCode(
|
||||
commandKey: TrackedCommandKey,
|
||||
status: StatusProto,
|
||||
): Option[ContextualizedCompletionResponse[Context]] = {
|
||||
logger.trace(
|
||||
s"Handling failure of command ${commandKey.commandId} from submission ${commandKey.submissionId}."
|
||||
)
|
||||
pendingCommands
|
||||
.remove(commandKey)
|
||||
.map { t =>
|
||||
Ctx(
|
||||
t.context,
|
||||
tracker.CompletionResponse(
|
||||
completion = Completion(
|
||||
commandKey.commandId,
|
||||
Some(status),
|
||||
submissionId = commandKey.submissionId,
|
||||
),
|
||||
checkpoint = None,
|
||||
),
|
||||
)
|
||||
}
|
||||
.orElse {
|
||||
logger.trace(
|
||||
s"Platform signaled failure for unknown command ${commandKey.commandId} from submission ${commandKey.submissionId}."
|
||||
)
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
override def postStop(): Unit = {
|
||||
promise
|
||||
.tryComplete(Success(pendingCommands.view.map { case (k, v) =>
|
||||
k -> v.context
|
||||
}.toMap))
|
||||
.discard
|
||||
super.postStop()
|
||||
}
|
||||
}
|
||||
|
||||
logic -> promise.future
|
||||
}
|
||||
|
||||
override def shape: CommandTrackerShape[Context] =
|
||||
CommandTrackerShape(submitRequestIn, submitRequestOut, commandResultIn, resultOut, offsetOut)
|
||||
|
||||
}
|
||||
|
||||
object CommandTracker {
|
||||
type ContextualizedCompletionResponse[Context] =
|
||||
Ctx[Context, Either[CompletionFailure, CompletionSuccess]]
|
||||
|
||||
private val durationOrdering = implicitly[Ordering[Duration]]
|
||||
|
||||
private val nonTerminalCodes =
|
||||
Set(Status.Code.UNKNOWN, Status.Code.INTERNAL, Status.Code.OK)
|
||||
}
|
@ -1,44 +0,0 @@
|
||||
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.canton.ledger.client.services.commands.tracker
|
||||
|
||||
import akka.stream.{Inlet, Outlet, Shape}
|
||||
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
|
||||
import com.digitalasset.canton.ledger.client.services.commands.tracker.CompletionResponse.{
|
||||
CompletionFailure,
|
||||
CompletionSuccess,
|
||||
}
|
||||
import com.digitalasset.canton.ledger.client.services.commands.{
|
||||
CommandSubmission,
|
||||
CompletionStreamElement,
|
||||
}
|
||||
import com.digitalasset.canton.util.Ctx
|
||||
import com.google.protobuf.empty.Empty
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.util.Try
|
||||
|
||||
private[tracker] final case class CommandTrackerShape[Context](
|
||||
submitRequestIn: Inlet[Ctx[Context, CommandSubmission]],
|
||||
submitRequestOut: Outlet[Ctx[(Context, TrackedCommandKey), CommandSubmission]],
|
||||
commandResultIn: Inlet[
|
||||
Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]
|
||||
],
|
||||
resultOut: Outlet[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]],
|
||||
offsetOut: Outlet[LedgerOffset],
|
||||
) extends Shape {
|
||||
|
||||
override def inlets: immutable.Seq[Inlet[_]] = Vector(submitRequestIn, commandResultIn)
|
||||
|
||||
override def outlets: immutable.Seq[Outlet[_]] = Vector(submitRequestOut, resultOut, offsetOut)
|
||||
|
||||
override def deepCopy(): Shape =
|
||||
CommandTrackerShape[Context](
|
||||
submitRequestIn.carbonCopy(),
|
||||
submitRequestOut.carbonCopy(),
|
||||
commandResultIn.carbonCopy(),
|
||||
resultOut.carbonCopy(),
|
||||
offsetOut.carbonCopy(),
|
||||
)
|
||||
}
|
@ -1,134 +0,0 @@
|
||||
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.canton.ledger.client.services.commands.tracker
|
||||
|
||||
import com.daml.error.{ContextualizedErrorLogger, GrpcStatuses}
|
||||
import com.daml.grpc.GrpcStatus
|
||||
import com.daml.ledger.api.v1.command_completion_service.Checkpoint
|
||||
import com.daml.ledger.api.v1.completion.Completion
|
||||
import com.digitalasset.canton.ledger.error.CommonErrors
|
||||
import com.google.rpc.status.Status as StatusProto
|
||||
import com.google.rpc.Status as StatusJavaProto
|
||||
import io.grpc.Status.Code
|
||||
import io.grpc.{StatusRuntimeException, protobuf}
|
||||
|
||||
object CompletionResponse {
|
||||
|
||||
/** Represents failures from executing submissions through gRPC.
|
||||
*/
|
||||
sealed trait CompletionFailure
|
||||
final case class NotOkResponse(completion: Completion, checkpoint: Option[Checkpoint])
|
||||
extends CompletionFailure {
|
||||
val commandId: String = completion.commandId
|
||||
val grpcStatus: StatusProto = completion.getStatus
|
||||
def metadata: Map[String, String] = Map(
|
||||
GrpcStatuses.DefiniteAnswerKey -> GrpcStatuses.isDefiniteAnswer(grpcStatus).toString
|
||||
)
|
||||
}
|
||||
final case class TimeoutResponse(commandId: String) extends CompletionFailure
|
||||
|
||||
final case class NoStatusInResponse(completion: Completion, checkpoint: Option[Checkpoint])
|
||||
extends CompletionFailure {
|
||||
val commandId: String = completion.commandId
|
||||
}
|
||||
|
||||
/** Represents failures of submissions throughout the execution queue.
|
||||
*/
|
||||
sealed trait TrackedCompletionFailure
|
||||
|
||||
/** The submission was executed after it was enqueued but was not successful.
|
||||
*/
|
||||
final case class QueueCompletionFailure(failure: CompletionFailure)
|
||||
extends TrackedCompletionFailure
|
||||
|
||||
/** The submission could not be added to the execution queue.
|
||||
* @param status - gRPC status chosen based on the reason why adding to the queue failed
|
||||
*/
|
||||
final case class QueueSubmitFailure(status: StatusJavaProto) extends TrackedCompletionFailure
|
||||
|
||||
final case class CompletionSuccess(
|
||||
completion: Completion,
|
||||
checkpoint: Option[Checkpoint],
|
||||
) {
|
||||
val commandId: String = completion.commandId
|
||||
val transactionId: String = completion.transactionId
|
||||
val originalStatus: StatusProto = completion.getStatus
|
||||
}
|
||||
|
||||
def apply(
|
||||
completion: Completion,
|
||||
checkpoint: Option[Checkpoint],
|
||||
): Either[CompletionFailure, CompletionSuccess] =
|
||||
completion.status match {
|
||||
case Some(grpcStatus) if Code.OK.value() == grpcStatus.code =>
|
||||
Right(CompletionSuccess(completion, checkpoint))
|
||||
case Some(_) =>
|
||||
Left(NotOkResponse(completion, checkpoint))
|
||||
case None =>
|
||||
Left(NoStatusInResponse(completion, checkpoint))
|
||||
}
|
||||
|
||||
/** For backwards compatibility, clients that are too coupled to [[com.daml.ledger.api.v1.completion.Completion]] as a type can convert back from response
|
||||
*/
|
||||
def toCompletion(response: Either[CompletionFailure, CompletionSuccess]): Completion =
|
||||
response match {
|
||||
case Left(failure) =>
|
||||
failure match {
|
||||
case NotOkResponse(completion, _) =>
|
||||
completion
|
||||
case TimeoutResponse(commandId) =>
|
||||
Completion(
|
||||
commandId = commandId,
|
||||
status = Some(StatusProto(Code.ABORTED.value(), "Timeout")),
|
||||
)
|
||||
case NoStatusInResponse(completion, _) =>
|
||||
completion
|
||||
}
|
||||
case Right(success) =>
|
||||
success.completion
|
||||
}
|
||||
|
||||
def toException(response: TrackedCompletionFailure)(implicit
|
||||
contextualizedErrorLogger: ContextualizedErrorLogger
|
||||
): StatusRuntimeException = {
|
||||
val status = response match {
|
||||
case QueueCompletionFailure(failure) =>
|
||||
val metadata = extractMetadata(failure)
|
||||
extractStatus(failure, metadata)
|
||||
case QueueSubmitFailure(status) => status
|
||||
}
|
||||
protobuf.StatusProto.toStatusRuntimeException(status)
|
||||
}
|
||||
|
||||
private def extractMetadata(response: CompletionFailure): Map[String, String] = response match {
|
||||
case notOkResponse: CompletionResponse.NotOkResponse => notOkResponse.metadata
|
||||
case _ => Map.empty
|
||||
}
|
||||
|
||||
private def extractStatus(
|
||||
response: CompletionFailure,
|
||||
metadata: Map[String, String],
|
||||
)(implicit contextualizedErrorLogger: ContextualizedErrorLogger): StatusJavaProto =
|
||||
response match {
|
||||
case notOkResponse: CompletionResponse.NotOkResponse =>
|
||||
val statusBuilder = GrpcStatus.toJavaBuilder(notOkResponse.grpcStatus)
|
||||
GrpcStatus.buildStatus(metadata, statusBuilder)
|
||||
case CompletionResponse.TimeoutResponse(_) =>
|
||||
CommonErrors.RequestTimeOut
|
||||
.Reject(
|
||||
"Timed out while awaiting for a completion corresponding to a command submission.",
|
||||
definiteAnswer = false,
|
||||
)
|
||||
.asGrpcStatus
|
||||
case CompletionResponse.NoStatusInResponse(_, _) =>
|
||||
CommonErrors.ServiceInternalError
|
||||
.Generic(
|
||||
"Missing status in completion response.",
|
||||
throwableO = None,
|
||||
)
|
||||
.asGrpcStatus
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -1,6 +0,0 @@
|
||||
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.canton.ledger.client.services.commands.tracker
|
||||
|
||||
final case class TrackedCommandKey(submissionId: String, commandId: String)
|
@ -1,12 +0,0 @@
|
||||
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.canton.ledger.client.services.commands.tracker
|
||||
|
||||
import java.time.Instant
|
||||
|
||||
private[tracker] final case class TrackingData[Context](
|
||||
commandId: String,
|
||||
commandTimeout: Instant,
|
||||
context: Context,
|
||||
)
|
@ -4,9 +4,7 @@
|
||||
package com.digitalasset.canton.ledger.client.services.commands.withoutledgerid
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
|
||||
import com.codahale.metrics as codahale
|
||||
import akka.stream.scaladsl.{Flow, Source}
|
||||
import com.daml.grpc.adapter.ExecutionSequencerFactory
|
||||
import com.daml.ledger.api.v1.command_completion_service.CommandCompletionServiceGrpc.CommandCompletionServiceStub
|
||||
import com.daml.ledger.api.v1.command_completion_service.{
|
||||
@ -17,26 +15,17 @@ import com.daml.ledger.api.v1.command_completion_service.{
|
||||
import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.CommandSubmissionServiceStub
|
||||
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
|
||||
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
|
||||
import com.daml.metrics.api.dropwizard.DropwizardCounter
|
||||
import com.digitalasset.canton.ledger.api.SubmissionIdGenerator
|
||||
import com.digitalasset.canton.ledger.api.domain.LedgerId
|
||||
import com.digitalasset.canton.ledger.api.validation.CommandsValidator
|
||||
import com.digitalasset.canton.ledger.client.LedgerClient
|
||||
import com.digitalasset.canton.ledger.client.configuration.CommandClientConfiguration
|
||||
import com.digitalasset.canton.ledger.client.services.commands.CommandTrackerFlow.Materialized
|
||||
import com.digitalasset.canton.ledger.client.services.commands.*
|
||||
import com.digitalasset.canton.ledger.client.services.commands.tracker.CompletionResponse.{
|
||||
CompletionFailure,
|
||||
CompletionSuccess,
|
||||
}
|
||||
import com.digitalasset.canton.ledger.client.services.commands.tracker.TrackedCommandKey
|
||||
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
|
||||
import com.digitalasset.canton.util.Ctx
|
||||
import com.digitalasset.canton.util.akkastreams.MaxInFlight
|
||||
import com.google.protobuf.empty.Empty
|
||||
import scalaz.syntax.tag.*
|
||||
|
||||
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
|
||||
import scala.concurrent.Future
|
||||
import scala.util.Try
|
||||
|
||||
/** Enables easy access to command services and high level operations on top of them.
|
||||
@ -55,20 +44,10 @@ final class CommandClient(
|
||||
)(implicit esf: ExecutionSequencerFactory)
|
||||
extends NamedLogging {
|
||||
|
||||
type TrackCommandFlow[Context] =
|
||||
Flow[
|
||||
Ctx[Context, CommandSubmission],
|
||||
Ctx[Context, Either[CompletionFailure, CompletionSuccess]],
|
||||
Materialized[
|
||||
NotUsed,
|
||||
Context,
|
||||
],
|
||||
]
|
||||
|
||||
private val submissionIdGenerator: SubmissionIdGenerator = SubmissionIdGenerator.Random
|
||||
|
||||
/** Submit a single command. Successful result does not guarantee that the resulting transaction has been written to
|
||||
* the ledger. In order to get that semantic, use [[trackCommands]] or [[trackCommandsUnbounded]].
|
||||
* the ledger.
|
||||
*/
|
||||
def submitSingleCommand(
|
||||
submitRequest: SubmitRequest,
|
||||
@ -86,103 +65,6 @@ final class CommandClient(
|
||||
.submit(submitRequest)
|
||||
}
|
||||
|
||||
/** Submits and tracks a single command. High frequency usage is discouraged as it causes a dedicated completion
|
||||
* stream to be established and torn down.
|
||||
*/
|
||||
def trackSingleCommand(
|
||||
submitRequest: SubmitRequest,
|
||||
ledgerIdToUse: LedgerId,
|
||||
token: Option[String] = None,
|
||||
)(implicit
|
||||
mat: Materializer
|
||||
): Future[Either[CompletionFailure, CompletionSuccess]] = {
|
||||
implicit val executionContext: ExecutionContextExecutor = mat.executionContext
|
||||
val commands = submitRequest.getCommands
|
||||
val effectiveActAs = CommandsValidator.effectiveSubmitters(commands).actAs
|
||||
for {
|
||||
tracker <- trackCommandsUnbounded[Unit](effectiveActAs.toList, ledgerIdToUse, token)
|
||||
result <- Source
|
||||
.single(Ctx.unit(CommandSubmission(commands)))
|
||||
.via(tracker)
|
||||
.runWith(Sink.head)
|
||||
} yield {
|
||||
result.value
|
||||
}
|
||||
}
|
||||
|
||||
/** Tracks the results (including timeouts) of incoming commands.
|
||||
* Applies a maximum bound for in-flight commands which have been submitted, but not confirmed through command completions.
|
||||
*
|
||||
* The resulting flow will backpressure if downstream backpressures, independently of the number of in-flight commands.
|
||||
*
|
||||
* @param parties Commands that have a submitting party which is not part of this collection will fail the stream.
|
||||
*/
|
||||
def trackCommands[Context](
|
||||
parties: Seq[String],
|
||||
ledgerIdToUse: LedgerId,
|
||||
token: Option[String] = None,
|
||||
)(implicit
|
||||
ec: ExecutionContext
|
||||
): Future[TrackCommandFlow[Context]] = {
|
||||
for {
|
||||
tracker <- trackCommandsUnbounded[Context](parties, ledgerIdToUse, token)
|
||||
} yield {
|
||||
// The counters are ignored on the client
|
||||
MaxInFlight(
|
||||
config.maxCommandsInFlight,
|
||||
DropwizardCounter("capacity", new codahale.Counter),
|
||||
DropwizardCounter("name", new codahale.Counter),
|
||||
)
|
||||
.joinMat(tracker)(Keep.right)
|
||||
}
|
||||
}
|
||||
|
||||
/** Tracks the results (including timeouts) of incoming commands.
|
||||
*
|
||||
* The resulting flow will backpressure if downstream backpressures, independently of the number of in-flight commands.
|
||||
*
|
||||
* @param parties Commands that have a submitting party which is not part of this collection will fail the stream.
|
||||
*/
|
||||
def trackCommandsUnbounded[Context](
|
||||
parties: Seq[String],
|
||||
ledgerIdToUse: LedgerId,
|
||||
token: Option[String] = None,
|
||||
)(implicit
|
||||
ec: ExecutionContext
|
||||
): Future[TrackCommandFlow[Context]] =
|
||||
for {
|
||||
ledgerEnd <- getCompletionEnd(ledgerIdToUse, token)
|
||||
} yield {
|
||||
partyFilter(parties.toSet)
|
||||
.via(
|
||||
CommandUpdaterFlow[Context](config, submissionIdGenerator, applicationId, ledgerIdToUse)
|
||||
)
|
||||
.viaMat(
|
||||
CommandTrackerFlow[Context, NotUsed](
|
||||
commandSubmissionFlow = CommandSubmissionFlow[(Context, TrackedCommandKey)](
|
||||
submit(token),
|
||||
config.maxParallelSubmissions,
|
||||
loggerFactory,
|
||||
),
|
||||
createCommandCompletionSource =
|
||||
offset => completionSource(parties, offset, ledgerIdToUse, token),
|
||||
startingOffset = ledgerEnd.getOffset,
|
||||
maximumCommandTimeout = config.defaultDeduplicationTime,
|
||||
)
|
||||
)(Keep.right)
|
||||
}
|
||||
|
||||
private def partyFilter[Context](allowedParties: Set[String]) =
|
||||
Flow[Ctx[Context, CommandSubmission]].map { elem =>
|
||||
val commands = elem.value.commands
|
||||
val effectiveActAs = CommandsValidator.effectiveSubmitters(commands).actAs
|
||||
if (effectiveActAs.subsetOf(allowedParties)) elem
|
||||
else
|
||||
throw new IllegalArgumentException(
|
||||
s"Attempted submission and tracking of command ${commands.commandId} by parties $effectiveActAs while some of those parties are not part of the subscription set $allowedParties."
|
||||
)
|
||||
}
|
||||
|
||||
def completionSource(
|
||||
parties: Seq[String],
|
||||
offset: LedgerOffset,
|
||||
|
@ -242,6 +242,68 @@ class AuthServiceJWTCodecSpec
|
||||
parse(serialized) shouldBe Success(expected)
|
||||
}
|
||||
|
||||
"support standard JWT claims with one composite scope" in {
|
||||
val serialized =
|
||||
"""{
|
||||
| "iss": "issuer",
|
||||
| "aud": "someParticipantId",
|
||||
| "sub": "someUserId",
|
||||
| "exp": 100,
|
||||
| "scope": "resource_server/daml_ledger_api"
|
||||
|}
|
||||
""".stripMargin
|
||||
val expected = StandardJWTPayload(
|
||||
issuer = Some("issuer"),
|
||||
participantId = Some("someParticipantId"),
|
||||
userId = "someUserId",
|
||||
exp = Some(Instant.ofEpochSecond(100)),
|
||||
format = StandardJWTTokenFormat.Scope,
|
||||
audiences = List.empty,
|
||||
)
|
||||
parse(serialized) shouldBe Success(expected)
|
||||
}
|
||||
|
||||
"support standard JWT claims with one composite scope and no audience" in {
|
||||
val serialized =
|
||||
"""{
|
||||
| "iss": "issuer",
|
||||
| "sub": "someUserId",
|
||||
| "exp": 100,
|
||||
| "scope": "resource_server/daml_ledger_api"
|
||||
|}
|
||||
""".stripMargin
|
||||
val expected = StandardJWTPayload(
|
||||
issuer = Some("issuer"),
|
||||
participantId = None,
|
||||
userId = "someUserId",
|
||||
exp = Some(Instant.ofEpochSecond(100)),
|
||||
format = StandardJWTTokenFormat.Scope,
|
||||
audiences = List.empty,
|
||||
)
|
||||
parse(serialized) shouldBe Success(expected)
|
||||
}
|
||||
|
||||
"support standard JWT claims with one composite scope with a dash" in {
|
||||
val serialized =
|
||||
"""{
|
||||
| "iss": "issuer",
|
||||
| "aud": "someParticipantId",
|
||||
| "sub": "someUserId",
|
||||
| "exp": 100,
|
||||
| "scope": "resource-server/daml_ledger_api"
|
||||
|}
|
||||
""".stripMargin
|
||||
val expected = StandardJWTPayload(
|
||||
issuer = Some("issuer"),
|
||||
participantId = Some("someParticipantId"),
|
||||
userId = "someUserId",
|
||||
exp = Some(Instant.ofEpochSecond(100)),
|
||||
format = StandardJWTTokenFormat.Scope,
|
||||
audiences = List.empty,
|
||||
)
|
||||
parse(serialized) shouldBe Success(expected)
|
||||
}
|
||||
|
||||
"support standard JWT claims with extra scopes" in {
|
||||
val serialized =
|
||||
"""{
|
||||
@ -262,6 +324,26 @@ class AuthServiceJWTCodecSpec
|
||||
parse(serialized) shouldBe Success(expected)
|
||||
}
|
||||
|
||||
"support standard JWT claims with extra composite scopes" in {
|
||||
val serialized =
|
||||
"""{
|
||||
| "aud": "someParticipantId",
|
||||
| "sub": "someUserId",
|
||||
| "exp": 100,
|
||||
| "scope": "resource_server/dummy-scope1 resource_server/daml_ledger_api resource_server/dummy-scope2"
|
||||
|}
|
||||
""".stripMargin
|
||||
val expected = StandardJWTPayload(
|
||||
issuer = None,
|
||||
participantId = Some("someParticipantId"),
|
||||
userId = "someUserId",
|
||||
exp = Some(Instant.ofEpochSecond(100)),
|
||||
format = StandardJWTTokenFormat.Scope,
|
||||
audiences = List.empty,
|
||||
)
|
||||
parse(serialized) shouldBe Success(expected)
|
||||
}
|
||||
|
||||
"support standard JWT claims with iss claim as string" in {
|
||||
val serialized =
|
||||
"""{
|
||||
@ -453,6 +535,21 @@ class AuthServiceJWTCodecSpec
|
||||
parse(serialized).failed.get.getMessage
|
||||
.contains("must include participantId value prefixed by") shouldBe true
|
||||
}
|
||||
|
||||
"reject token with invalid scope" in {
|
||||
val serialized =
|
||||
"""{
|
||||
| "iss": "issuer",
|
||||
| "aud": "someParticipantId",
|
||||
| "sub": "someUserId",
|
||||
| "exp": 100,
|
||||
| "scope": "resource-server/daml-ledger-api"
|
||||
|}
|
||||
""".stripMargin
|
||||
parse(serialized).failed.get.getMessage should include(
|
||||
"Access token with unknown scope"
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,705 +0,0 @@
|
||||
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.canton.ledger.client.services.commands
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.{Flow, Keep, Source, SourceQueueWithComplete}
|
||||
import akka.stream.testkit.javadsl.TestSink
|
||||
import akka.stream.testkit.scaladsl.TestSource
|
||||
import akka.stream.testkit.{TestPublisher, TestSubscriber}
|
||||
import akka.stream.{OverflowStrategy, QueueOfferResult}
|
||||
import com.daml.api.util.TimestampConversion.*
|
||||
import com.daml.concurrent.ExecutionContext
|
||||
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
|
||||
import com.daml.ledger.api.v1.command_completion_service.Checkpoint
|
||||
import com.daml.ledger.api.v1.commands.Commands
|
||||
import com.daml.ledger.api.v1.completion.Completion
|
||||
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
|
||||
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset.LedgerBoundary.LEDGER_BEGIN
|
||||
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset.Value.{Absolute, Boundary}
|
||||
import com.digitalasset.canton.concurrent.DirectExecutionContext
|
||||
import com.digitalasset.canton.ledger.client.services.commands.tracker.CompletionResponse.{
|
||||
CompletionFailure,
|
||||
CompletionSuccess,
|
||||
NotOkResponse,
|
||||
}
|
||||
import com.digitalasset.canton.ledger.client.services.commands.tracker.{
|
||||
CompletionResponse,
|
||||
TrackedCommandKey,
|
||||
}
|
||||
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
|
||||
import com.digitalasset.canton.util.Ctx
|
||||
import com.digitalasset.canton.{BaseTest, DiscardOps}
|
||||
import com.google.protobuf.empty.Empty
|
||||
import com.google.protobuf.timestamp.Timestamp
|
||||
import com.google.rpc.code.*
|
||||
import com.google.rpc.status.Status as StatusProto
|
||||
import io.grpc.StatusRuntimeException
|
||||
import org.scalatest.OptionValues
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AsyncWordSpec
|
||||
|
||||
import java.time.{Duration, Instant}
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import scala.concurrent.duration.DurationLong
|
||||
import scala.concurrent.{Future, Promise}
|
||||
import scala.util.{Failure, Success, Try}
|
||||
|
||||
class CommandTrackerFlowTest
|
||||
extends AsyncWordSpec
|
||||
with Matchers
|
||||
with OptionValues
|
||||
with AkkaBeforeAndAfterAll
|
||||
with ScalaFutures
|
||||
with BaseTest {
|
||||
|
||||
type C[Value] = Ctx[(Int, TrackedCommandKey), Value]
|
||||
|
||||
private val allSubmissionsSuccessful: Flow[Ctx[(Int, TrackedCommandKey), CommandSubmission], Ctx[
|
||||
(Int, TrackedCommandKey),
|
||||
Try[Empty],
|
||||
], NotUsed] =
|
||||
Flow[C[CommandSubmission]].map {
|
||||
_.map(_ => Success(Empty.defaultInstance))
|
||||
}
|
||||
|
||||
private val shortDuration = Duration.ofSeconds(1L)
|
||||
|
||||
private lazy val submissionSource = TestSource.probe[Ctx[Int, CommandSubmission]]
|
||||
private lazy val resultSink =
|
||||
TestSink.probe[Ctx[Int, Either[CompletionFailure, CompletionSuccess]]](system)
|
||||
|
||||
private val mrt = Instant.EPOCH.plus(shortDuration)
|
||||
private val submissionId = "submissionId"
|
||||
private val commandId = "commandId"
|
||||
private val abortedCompletion =
|
||||
Completion(
|
||||
commandId,
|
||||
Some(StatusProto(Code.ABORTED.value)),
|
||||
submissionId = submissionId,
|
||||
)
|
||||
private val successStatus = StatusProto(Code.OK.value)
|
||||
private val context = 1
|
||||
private val submission = newSubmission(submissionId, commandId)
|
||||
|
||||
private def newSubmission(
|
||||
submissionId: String,
|
||||
commandId: String,
|
||||
timeout: Option[Duration] = None,
|
||||
) =
|
||||
Ctx(
|
||||
context,
|
||||
CommandSubmission(Commands(commandId = commandId, submissionId = submissionId), timeout),
|
||||
)
|
||||
|
||||
private case class Handle(
|
||||
submissions: TestPublisher.Probe[Ctx[Int, CommandSubmission]],
|
||||
completions: TestSubscriber.Probe[Ctx[Int, Either[CompletionFailure, CompletionSuccess]]],
|
||||
whatever: Future[Map[TrackedCommandKey, Int]],
|
||||
completionsStreamMock: CompletionStreamMock,
|
||||
)
|
||||
|
||||
private class CompletionStreamMock(override protected val loggerFactory: NamedLoggerFactory)
|
||||
extends NamedLogging {
|
||||
|
||||
private case class State(
|
||||
queue: SourceQueueWithComplete[CompletionStreamElement],
|
||||
startOffset: LedgerOffset,
|
||||
)
|
||||
|
||||
private implicit val directEc: ExecutionContext[Nothing] =
|
||||
ExecutionContext(DirectExecutionContext(noTracingLogger))
|
||||
|
||||
private val stateRef = new AtomicReference[Promise[State]](Promise[State]())
|
||||
|
||||
def createCompletionsSource(
|
||||
ledgerOffset: LedgerOffset
|
||||
): Source[CompletionStreamElement, NotUsed] = {
|
||||
val (queue, completionSource) =
|
||||
Source
|
||||
.queue[CompletionStreamElement](Int.MaxValue, OverflowStrategy.backpressure)
|
||||
.preMaterialize()
|
||||
stateRef.get().success(State(queue, ledgerOffset))
|
||||
completionSource
|
||||
}
|
||||
|
||||
def send(elem: CompletionStreamElement): Future[QueueOfferResult] =
|
||||
for {
|
||||
state <- stateRef.get().future
|
||||
res <- state.queue.offer(elem)
|
||||
} yield res
|
||||
|
||||
def breakCompletionsStream(): Future[Unit] =
|
||||
stateRef
|
||||
.getAndSet(Promise[State]())
|
||||
.future
|
||||
.map(state => state.queue.fail(new RuntimeException("boom")))
|
||||
|
||||
def getLastOffset: Future[LedgerOffset] =
|
||||
stateRef.get().future.map(_.startOffset)
|
||||
|
||||
}
|
||||
|
||||
"Command tracking flow" when {
|
||||
|
||||
"two commands are submitted with the same ID" should {
|
||||
|
||||
"fail the stream" in {
|
||||
|
||||
val Handle(submissions, results, _, _) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
|
||||
submissions.sendNext(submission)
|
||||
submissions.sendNext(submission)
|
||||
|
||||
results.expectError() shouldBe a[RuntimeException]
|
||||
}
|
||||
}
|
||||
|
||||
"a command is submitted without a submission id" should {
|
||||
|
||||
"throw an exception" in {
|
||||
val Handle(submissions, results, _, _) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
|
||||
submissions.sendNext(newSubmission("", commandId))
|
||||
|
||||
val actualException = results.expectError()
|
||||
actualException shouldBe an[IllegalArgumentException]
|
||||
actualException.getMessage shouldBe s"The submission ID for the command ID $commandId is empty. This should not happen."
|
||||
}
|
||||
}
|
||||
|
||||
"the stream fails" should {
|
||||
|
||||
"expose internal state as materialized value" in {
|
||||
|
||||
val Handle(submissions, _, unhandledF, _) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
|
||||
submissions.sendNext(submission)
|
||||
submissions.sendNext(submission)
|
||||
|
||||
whenReady(unhandledF) { unhandled =>
|
||||
unhandled should have size 1
|
||||
unhandled should contain(
|
||||
TrackedCommandKey(submissionId, commandId) -> submission.context
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"the stream completes" should {
|
||||
|
||||
"expose internal state as materialized value" in {
|
||||
|
||||
val Handle(submissions, results, unhandledF, _) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
val otherSubmissionId = "otherSubmissionId"
|
||||
val otherCommandId = "otherCommandId"
|
||||
|
||||
submissions.sendNext(newSubmission(otherSubmissionId, otherCommandId))
|
||||
|
||||
results.cancel()
|
||||
whenReady(unhandledF) { unhandled =>
|
||||
unhandled should have size 1
|
||||
unhandled should contain(
|
||||
TrackedCommandKey(otherSubmissionId, otherCommandId) -> submission.context
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"submission input is closed" should {
|
||||
|
||||
"complete the stage if no commands are tracked" in {
|
||||
val Handle(submissions, results, _, _) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
|
||||
submissions.sendComplete()
|
||||
|
||||
results.expectComplete()
|
||||
succeed
|
||||
}
|
||||
|
||||
"keep the stage if there are tracked commands" in {
|
||||
val Handle(submissions, results, _, _) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
|
||||
submissions.sendNext(submission)
|
||||
submissions.sendComplete()
|
||||
|
||||
results.expectNoMessage(1.second)
|
||||
succeed
|
||||
}
|
||||
}
|
||||
|
||||
"grpc error arrives for submission" should {
|
||||
|
||||
"output it as a completion if terminal" in {
|
||||
|
||||
val Handle(submissions, results, _, _) =
|
||||
runCommandTrackingFlow(Flow[C[CommandSubmission]].map {
|
||||
_.map(_ => Failure(new StatusRuntimeException(io.grpc.Status.RESOURCE_EXHAUSTED)))
|
||||
})
|
||||
|
||||
submissions.sendNext(submission)
|
||||
|
||||
results.expectNext(Ctx(context, Left(failureCompletion(Code.RESOURCE_EXHAUSTED))))
|
||||
succeed
|
||||
}
|
||||
|
||||
"swallow error if not terminal" in {
|
||||
|
||||
val Handle(submissions, results, _, completionStreamMock) =
|
||||
runCommandTrackingFlow(Flow[C[CommandSubmission]].map {
|
||||
_.map(_ => Failure(new StatusRuntimeException(io.grpc.Status.UNKNOWN)))
|
||||
})
|
||||
|
||||
submissions.sendNext(submission)
|
||||
|
||||
results.expectNoMessage(3.seconds)
|
||||
|
||||
completionStreamMock
|
||||
.send(
|
||||
CompletionStreamElement.CompletionElement(abortedCompletion, None)
|
||||
)
|
||||
.discard
|
||||
results.requestNext().value shouldEqual Left(
|
||||
failureCompletion(Code.ABORTED)
|
||||
)
|
||||
}
|
||||
|
||||
"swallow error if not terminal, then output completion when it arrives" in {
|
||||
|
||||
val Handle(submissions, results, _, completionStreamMock) =
|
||||
runCommandTrackingFlow(Flow[C[CommandSubmission]].map {
|
||||
_.map(_ => Failure(new StatusRuntimeException(io.grpc.Status.UNKNOWN)))
|
||||
})
|
||||
|
||||
submissions.sendNext(submission)
|
||||
|
||||
completionStreamMock
|
||||
.send(
|
||||
CompletionStreamElement.CompletionElement(abortedCompletion, None)
|
||||
)
|
||||
.discard
|
||||
results.requestNext().value shouldEqual Left(
|
||||
failureCompletion(Code.ABORTED)
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"no completion arrives" should {
|
||||
|
||||
"not timeout the command while MRT <= RT" in {
|
||||
val Handle(submissions, results, _, completionStreamMock) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
|
||||
submissions.sendNext(submission)
|
||||
|
||||
completionStreamMock
|
||||
.send(
|
||||
CompletionStreamElement.CheckpointElement(Checkpoint(Some(fromInstant(mrt))))
|
||||
)
|
||||
.discard
|
||||
|
||||
results.expectNoMessage(1.second)
|
||||
succeed
|
||||
}
|
||||
|
||||
"timeout the command when the timeout passes" in {
|
||||
val Handle(submissions, results, _, _) = runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
|
||||
submissions.sendNext(
|
||||
newSubmission(submissionId, commandId, timeout = Some(Duration.ofMillis(100)))
|
||||
)
|
||||
|
||||
results.expectNext(
|
||||
500.milliseconds,
|
||||
Ctx(context, Left(CompletionResponse.TimeoutResponse(commandId))),
|
||||
)
|
||||
succeed
|
||||
}
|
||||
|
||||
"use the maximum command timeout, if provided" in {
|
||||
val Handle(submissions, results, _, _) = runCommandTrackingFlow(
|
||||
allSubmissionsSuccessful,
|
||||
maximumCommandTimeout = Duration.ofMillis(500),
|
||||
)
|
||||
|
||||
submissions.sendNext(submission)
|
||||
|
||||
results.expectNext(
|
||||
1.second,
|
||||
Ctx(context, Left(CompletionResponse.TimeoutResponse(commandId))),
|
||||
)
|
||||
succeed
|
||||
}
|
||||
|
||||
"cap the timeout at the maximum command timeout" in {
|
||||
val Handle(submissions, results, _, _) = runCommandTrackingFlow(
|
||||
allSubmissionsSuccessful,
|
||||
maximumCommandTimeout = Duration.ofMillis(100),
|
||||
)
|
||||
|
||||
submissions.sendNext(
|
||||
newSubmission(submissionId, commandId, timeout = Some(Duration.ofSeconds(10)))
|
||||
)
|
||||
|
||||
results.expectNext(
|
||||
500.millis,
|
||||
Ctx(context, Left(CompletionResponse.TimeoutResponse(commandId))),
|
||||
)
|
||||
succeed
|
||||
}
|
||||
}
|
||||
|
||||
"successful completion arrives" should {
|
||||
|
||||
"output the completion" in {
|
||||
val Handle(submissions, results, _, completionStreamMock) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
|
||||
submissions.sendNext(submission)
|
||||
|
||||
completionStreamMock.send(successfulStreamCompletion(submissionId, commandId)).discard
|
||||
|
||||
results.expectNext(
|
||||
Ctx(
|
||||
context,
|
||||
Right(successCompletion()),
|
||||
)
|
||||
)
|
||||
succeed
|
||||
}
|
||||
|
||||
"after the timeout" in {
|
||||
val Handle(submissions, results, _, completionStreamMock) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
val timedOutSubmissionId = "timedOutSubmissionId"
|
||||
val timedOutCommandId = "timedOutCommandId"
|
||||
|
||||
submissions.sendNext(
|
||||
newSubmission(timedOutSubmissionId, timedOutCommandId, timeout = Some(shortDuration))
|
||||
)
|
||||
|
||||
results.expectNext(
|
||||
shortDuration.getSeconds.seconds * 3,
|
||||
Ctx(context, Left(CompletionResponse.TimeoutResponse(timedOutCommandId))),
|
||||
)
|
||||
|
||||
// since the command timed out before, the tracker shouldn't send the completion through
|
||||
completionStreamMock
|
||||
.send(
|
||||
successfulStreamCompletion(timedOutSubmissionId, timedOutCommandId)
|
||||
)
|
||||
.discard
|
||||
results.request(1)
|
||||
results.expectNoMessage()
|
||||
succeed
|
||||
}
|
||||
|
||||
"after another command has timed out" in {
|
||||
val Handle(submissions, results, _, completionStreamMock) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
val timedOutSubmissionId = "timedOutSubmissionId"
|
||||
val timedOutCommandId = "timedOutCommandId"
|
||||
val submitRequestShortDedupTime =
|
||||
newSubmission(timedOutSubmissionId, timedOutCommandId, timeout = Some(shortDuration))
|
||||
|
||||
// we send 2 requests
|
||||
submissions.sendNext(submitRequestShortDedupTime)
|
||||
submissions.sendNext(submission)
|
||||
|
||||
// the tracker observes the timeout before the completion, thus "consuming" the pull on the result output
|
||||
results.expectNext(
|
||||
3.seconds,
|
||||
Ctx(context, Left(CompletionResponse.TimeoutResponse(timedOutCommandId))),
|
||||
)
|
||||
// we now receive a completion
|
||||
completionStreamMock.send(successfulStreamCompletion(submissionId, commandId)).discard
|
||||
// because the out-of-band timeout completion consumed the previous pull on `results`,
|
||||
// we don't expect a message until we request one.
|
||||
// The order below is important to reproduce the issue described in DPP-285.
|
||||
results.expectNoMessage()
|
||||
results.request(1)
|
||||
results.expectNext(
|
||||
Ctx(context, Right(successCompletion()))
|
||||
)
|
||||
succeed
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"duplicate completion arrives for a particular command" should {
|
||||
|
||||
"output the completion only once" in {
|
||||
|
||||
val Handle(submissions, results, _, completionStreamMock) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
|
||||
submissions.sendNext(submission)
|
||||
|
||||
completionStreamMock.send(successfulStreamCompletion(submissionId, commandId)).discard
|
||||
completionStreamMock.send(successfulStreamCompletion(submissionId, commandId)).discard
|
||||
|
||||
results.expectNext(
|
||||
Ctx(context, Right(successCompletion()))
|
||||
)
|
||||
results.expectNoMessage(1.second)
|
||||
succeed
|
||||
}
|
||||
}
|
||||
|
||||
"failed completion arrives" should {
|
||||
|
||||
"output the completion" in {
|
||||
|
||||
val Handle(submissions, results, _, completionStreamMock) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
|
||||
submissions.sendNext(submission)
|
||||
|
||||
val status = StatusProto(Code.INVALID_ARGUMENT.value)
|
||||
val failedCompletion =
|
||||
Completion(
|
||||
commandId,
|
||||
Some(status),
|
||||
submissionId = submissionId,
|
||||
)
|
||||
completionStreamMock
|
||||
.send(CompletionStreamElement.CompletionElement(failedCompletion, None))
|
||||
.discard
|
||||
|
||||
results.expectNext(
|
||||
Ctx(
|
||||
context,
|
||||
Left(
|
||||
failureCompletion(Code.INVALID_ARGUMENT)
|
||||
),
|
||||
)
|
||||
)
|
||||
succeed
|
||||
}
|
||||
}
|
||||
|
||||
"a multitude of successful completions arrive for submitted commands" should {
|
||||
|
||||
"output all expected values" in {
|
||||
val cmdCount = 1000
|
||||
|
||||
val identifiers =
|
||||
1.to(cmdCount).map(_.toString).map(id => s"submission-$id" -> s"command-$id")
|
||||
|
||||
val Handle(submissions, results, _, completionStreamMock) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
|
||||
results.request(cmdCount.toLong - 1)
|
||||
|
||||
identifiers.foreach { case (submissionId, commandId) =>
|
||||
submissions.sendNext(submission.copy(value = commandWithIds(submissionId, commandId)))
|
||||
}
|
||||
identifiers.foreach { case (submissionId, commandId) =>
|
||||
completionStreamMock.send(successfulStreamCompletion(submissionId, commandId)).discard
|
||||
}
|
||||
|
||||
results.expectNextUnorderedN(identifiers.map { case (submissionId, commandId) =>
|
||||
val completionSuccess = successCompletion()
|
||||
Ctx(
|
||||
context,
|
||||
Right(
|
||||
completionSuccess.copy(completion =
|
||||
completionSuccess.completion
|
||||
.update(_.commandId := commandId, _.submissionId := submissionId)
|
||||
)
|
||||
),
|
||||
)
|
||||
})
|
||||
succeed
|
||||
}
|
||||
}
|
||||
|
||||
"successful completions arrive for the same command submitted twice with different submission IDs" should {
|
||||
|
||||
"output two successful responses" in {
|
||||
val Handle(submissions, results, _, completionStreamMock) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
|
||||
results.request(2)
|
||||
|
||||
val submissionId1 = "submissionId"
|
||||
val submissionId2 = "anotherSubmissionId"
|
||||
|
||||
submissions.sendNext(newSubmission(submissionId1, commandId))
|
||||
submissions.sendNext(newSubmission(submissionId2, commandId))
|
||||
|
||||
completionStreamMock.send(successfulStreamCompletion(submissionId1, commandId)).discard
|
||||
completionStreamMock.send(successfulStreamCompletion(submissionId2, commandId)).discard
|
||||
|
||||
results.expectNextUnordered(
|
||||
Ctx(context, Right(successCompletion(submissionId = submissionId1))),
|
||||
Ctx(context, Right(successCompletion(submissionId = submissionId2))),
|
||||
)
|
||||
succeed
|
||||
}
|
||||
}
|
||||
|
||||
"completions with empty and nonempty submission IDs arrive" should {
|
||||
|
||||
"ignore a completion with an empty submission ID and output a successful response" in {
|
||||
val Handle(submissions, results, _, completionStreamMock) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
|
||||
results.request(2)
|
||||
|
||||
submissions.sendNext(submission)
|
||||
|
||||
completionStreamMock.send(successfulStreamCompletion("", commandId)).discard
|
||||
completionStreamMock.send(successfulStreamCompletion(submissionId, commandId)).discard
|
||||
|
||||
results.expectNext(
|
||||
Ctx(context, Right(successCompletion(submissionId = submissionId)))
|
||||
)
|
||||
succeed
|
||||
}
|
||||
}
|
||||
|
||||
"completion stream disconnects" should {
|
||||
|
||||
"keep run and recover the completion subscription from a recent offset" in {
|
||||
val checkPointOffset = LedgerOffset(Absolute("checkpoint"))
|
||||
|
||||
val Handle(submissions, results, _, completionStreamMock) =
|
||||
runCommandTrackingFlow(allSubmissionsSuccessful)
|
||||
|
||||
def breakUntilOffsetArrives(): Future[Unit] =
|
||||
for {
|
||||
_ <- completionStreamMock.breakCompletionsStream()
|
||||
offset3 <- completionStreamMock.getLastOffset
|
||||
_ <-
|
||||
if (offset3 != checkPointOffset) breakUntilOffsetArrives()
|
||||
else Future.unit
|
||||
} yield ()
|
||||
|
||||
def sendCommand(submissionId: String, commandId: String) = {
|
||||
submissions.sendNext(submission.copy(value = commandWithIds(submissionId, commandId)))
|
||||
for {
|
||||
_ <- completionStreamMock.send(successfulStreamCompletion(submissionId, commandId))
|
||||
_ = results.request(1)
|
||||
_ = results.expectNext(
|
||||
Ctx(
|
||||
context,
|
||||
Right(
|
||||
successCompletion(commandId, submissionId)
|
||||
),
|
||||
)
|
||||
)
|
||||
} yield ()
|
||||
}
|
||||
|
||||
def checkOffset(expected: LedgerOffset) =
|
||||
for {
|
||||
offset <- completionStreamMock.getLastOffset
|
||||
} yield offset shouldEqual expected
|
||||
|
||||
def sendCheckPoint(offset: LedgerOffset) =
|
||||
for {
|
||||
_ <- completionStreamMock.send(checkPoint(offset))
|
||||
_ = results.request(1)
|
||||
} yield ()
|
||||
|
||||
for {
|
||||
_ <- checkOffset(LedgerOffset(Boundary(LEDGER_BEGIN)))
|
||||
_ <- sendCommand("submission-1", "command-1")
|
||||
_ <- sendCheckPoint(checkPointOffset)
|
||||
_ <- checkOffset(LedgerOffset(Boundary(LEDGER_BEGIN)))
|
||||
_ <- breakUntilOffsetArrives()
|
||||
_ <- checkOffset(checkPointOffset)
|
||||
_ <- sendCommand("submission-2", "command-2")
|
||||
} yield {
|
||||
succeed
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private def successCompletion(
|
||||
commandId: String = commandId,
|
||||
submissionId: String = submissionId,
|
||||
) =
|
||||
CompletionResponse.CompletionSuccess(
|
||||
Completion(commandId, Some(successStatus), submissionId = submissionId),
|
||||
None,
|
||||
)
|
||||
|
||||
private def failureCompletion(
|
||||
code: Code,
|
||||
message: String = "",
|
||||
submissionId: String = submissionId,
|
||||
): CompletionFailure =
|
||||
NotOkResponse(
|
||||
Completion(
|
||||
commandId = commandId,
|
||||
status = Some(StatusProto(code.value, message)),
|
||||
submissionId = submissionId,
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
private def commandWithIds(submissionId: String, commandId: String) = {
|
||||
val request = submission.value
|
||||
request.copy(commands =
|
||||
request.commands.copy(commandId = commandId, submissionId = submissionId)
|
||||
)
|
||||
}
|
||||
|
||||
private def successfulStreamCompletion(submissionId: String, commandId: String) =
|
||||
CompletionStreamElement.CompletionElement(
|
||||
Completion(commandId, Some(successStatus), submissionId = submissionId),
|
||||
None,
|
||||
)
|
||||
|
||||
private def checkPoint(ledgerOffset: LedgerOffset) =
|
||||
CompletionStreamElement.CheckpointElement(
|
||||
Checkpoint(
|
||||
Some(Timestamp(0, 0)),
|
||||
Some(ledgerOffset),
|
||||
)
|
||||
)
|
||||
|
||||
private def runCommandTrackingFlow(
|
||||
submissionFlow: Flow[
|
||||
Ctx[(Int, TrackedCommandKey), CommandSubmission],
|
||||
Ctx[(Int, TrackedCommandKey), Try[Empty]],
|
||||
NotUsed,
|
||||
],
|
||||
maximumCommandTimeout: Duration = Duration.ofSeconds(10),
|
||||
): Handle = {
|
||||
|
||||
val completionsMock = new CompletionStreamMock(loggerFactory)
|
||||
|
||||
val trackingFlow =
|
||||
CommandTrackerFlow[Int, NotUsed](
|
||||
commandSubmissionFlow = submissionFlow,
|
||||
createCommandCompletionSource = completionsMock.createCompletionsSource,
|
||||
startingOffset = LedgerOffset(Boundary(LEDGER_BEGIN)),
|
||||
maximumCommandTimeout = maximumCommandTimeout,
|
||||
timeoutDetectionPeriod = 1.millisecond,
|
||||
)
|
||||
|
||||
val handle = submissionSource
|
||||
.viaMat(trackingFlow)(Keep.both)
|
||||
.toMat(resultSink) { (l, r) =>
|
||||
Handle(l._1, r, l._2.trackingMat, completionsMock)
|
||||
}
|
||||
.run()
|
||||
handle.completions.request(1L)
|
||||
handle
|
||||
}
|
||||
}
|
@ -1,187 +0,0 @@
|
||||
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.canton.ledger.client.services.commands.tracker
|
||||
|
||||
import com.daml.error.GrpcStatuses
|
||||
import com.daml.grpc.GrpcStatus
|
||||
import com.daml.ledger.api.v1.completion.Completion
|
||||
import com.digitalasset.canton.BaseTest
|
||||
import com.digitalasset.canton.ledger.client.services.commands.tracker.CompletionResponse.*
|
||||
import com.google.protobuf.any.Any
|
||||
import com.google.rpc.error_details.{ErrorInfo, RequestInfo}
|
||||
import com.google.rpc.status.Status
|
||||
import com.google.rpc.{ErrorInfo as JavaErrorInfo, RequestInfo as JavaRequestInfo}
|
||||
import io.grpc
|
||||
import io.grpc.Status.Code
|
||||
import io.grpc.Status.Code.OK
|
||||
import io.grpc.protobuf
|
||||
import org.scalatest.wordspec.AnyWordSpec
|
||||
|
||||
import scala.jdk.CollectionConverters.*
|
||||
|
||||
class CompletionResponseTest extends AnyWordSpec with BaseTest {
|
||||
|
||||
"Completion response" when {
|
||||
|
||||
val commandId = "commandId"
|
||||
val completion = Completion(
|
||||
commandId = commandId,
|
||||
status = Some(Status(OK.value(), "message", Seq(Any()))),
|
||||
)
|
||||
|
||||
"convert to/from completion" should {
|
||||
|
||||
"match successful completion" in {
|
||||
val completionWithTransactionId = completion.update(_.transactionId := "transactionId")
|
||||
val response = CompletionResponse(completionWithTransactionId, None)
|
||||
response shouldBe a[Right[_, _]]
|
||||
CompletionResponse.toCompletion(response) shouldEqual completionWithTransactionId
|
||||
}
|
||||
|
||||
"match not ok status" in {
|
||||
val failedCodeCompletion = completion.update(_.status.code := Code.INTERNAL.value())
|
||||
val response =
|
||||
CompletionResponse(failedCodeCompletion, None)
|
||||
response should matchPattern { case Left(_: NotOkResponse) => }
|
||||
CompletionResponse.toCompletion(response) shouldEqual failedCodeCompletion
|
||||
}
|
||||
|
||||
"handle missing status" in {
|
||||
val noStatusCodeCompletion = completion.update(_.optionalStatus := None)
|
||||
val response =
|
||||
CompletionResponse(noStatusCodeCompletion, None)
|
||||
response should matchPattern { case Left(_: NoStatusInResponse) => }
|
||||
CompletionResponse.toCompletion(response) shouldEqual noStatusCodeCompletion
|
||||
|
||||
}
|
||||
|
||||
"handle timeout" in {
|
||||
CompletionResponse.toCompletion(Left(TimeoutResponse(commandId))) shouldEqual Completion(
|
||||
commandId = commandId,
|
||||
status = Some(
|
||||
Status(
|
||||
code = Code.ABORTED.value(),
|
||||
message = "Timeout",
|
||||
)
|
||||
),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
"convert to exception" should {
|
||||
|
||||
"convert queue completion failure" in {
|
||||
val exception =
|
||||
CompletionResponse.toException(
|
||||
QueueCompletionFailure(TimeoutResponse(commandId))
|
||||
)
|
||||
exception.getStatus.getCode shouldBe Code.DEADLINE_EXCEEDED
|
||||
}
|
||||
|
||||
"convert queue submit failure" in {
|
||||
val status = GrpcStatus.buildStatus(
|
||||
Map.empty,
|
||||
GrpcStatus.toJavaBuilder(grpc.Status.RESOURCE_EXHAUSTED),
|
||||
)
|
||||
val exception =
|
||||
CompletionResponse.toException(
|
||||
QueueSubmitFailure(status)
|
||||
)
|
||||
exception.getStatus.getCode shouldBe Code.RESOURCE_EXHAUSTED
|
||||
}
|
||||
|
||||
"include default metadata for status not ok" in {
|
||||
val exception = CompletionResponse.toException(
|
||||
QueueCompletionFailure(
|
||||
NotOkResponse(
|
||||
Completion(
|
||||
commandId = commandId,
|
||||
status = Some(
|
||||
Status(
|
||||
Code.CANCELLED.value(),
|
||||
details = Seq.empty,
|
||||
)
|
||||
),
|
||||
),
|
||||
None,
|
||||
)
|
||||
)
|
||||
)
|
||||
val status = protobuf.StatusProto.fromThrowable(exception)
|
||||
val packedErrorInfo = status.getDetails(0).unpack(classOf[JavaErrorInfo])
|
||||
packedErrorInfo.getMetadataOrThrow(GrpcStatuses.DefiniteAnswerKey) shouldEqual "false"
|
||||
}
|
||||
|
||||
"include metadata for status not ok" in {
|
||||
val errorInfo = ErrorInfo(
|
||||
metadata = Map(GrpcStatuses.DefiniteAnswerKey -> "true")
|
||||
)
|
||||
val exception = CompletionResponse.toException(
|
||||
QueueCompletionFailure(
|
||||
NotOkResponse(
|
||||
Completion(
|
||||
commandId = commandId,
|
||||
status = Some(
|
||||
Status(
|
||||
Code.CANCELLED.value(),
|
||||
details = Seq(
|
||||
Any.pack(
|
||||
errorInfo
|
||||
)
|
||||
),
|
||||
)
|
||||
),
|
||||
),
|
||||
None,
|
||||
)
|
||||
)
|
||||
)
|
||||
val status = protobuf.StatusProto.fromThrowable(exception)
|
||||
val packedErrorInfo = status.getDetails(0).unpack(classOf[JavaErrorInfo])
|
||||
packedErrorInfo.getMetadataOrThrow(GrpcStatuses.DefiniteAnswerKey) shouldEqual "true"
|
||||
}
|
||||
|
||||
"merge metadata for status not ok" in {
|
||||
val errorInfo = ErrorInfo(
|
||||
metadata = Map(GrpcStatuses.DefiniteAnswerKey -> "true")
|
||||
)
|
||||
val requestInfo = RequestInfo(requestId = "aRequestId")
|
||||
val exception = CompletionResponse.toException(
|
||||
QueueCompletionFailure(
|
||||
NotOkResponse(
|
||||
Completion(
|
||||
commandId = commandId,
|
||||
status = Some(
|
||||
Status(
|
||||
Code.INTERNAL.value(),
|
||||
details = Seq(
|
||||
Any.pack(errorInfo),
|
||||
Any.pack(requestInfo),
|
||||
),
|
||||
)
|
||||
),
|
||||
),
|
||||
None,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
val status = protobuf.StatusProto.fromThrowable(exception)
|
||||
status.getCode shouldBe Code.INTERNAL.value()
|
||||
val details = status.getDetailsList.asScala
|
||||
details.size shouldBe 2
|
||||
details.exists { detail =>
|
||||
detail.is(classOf[JavaErrorInfo]) && detail
|
||||
.unpack(classOf[JavaErrorInfo])
|
||||
.getMetadataOrThrow(GrpcStatuses.DefiniteAnswerKey) == "true"
|
||||
} shouldEqual true
|
||||
details.exists { detail =>
|
||||
detail.is(classOf[JavaRequestInfo]) && detail
|
||||
.unpack(classOf[JavaRequestInfo])
|
||||
.getRequestId == "aRequestId"
|
||||
} shouldEqual true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -318,112 +318,115 @@ final class RepairService(
|
||||
logger.info(
|
||||
s"Adding ${contracts.length} contracts to domain ${domain} with ignoreAlreadyAdded=${ignoreAlreadyAdded} and ignoreStakeholderCheck=${ignoreStakeholderCheck}"
|
||||
)
|
||||
lockAndAwaitEitherT(
|
||||
"repair.add", {
|
||||
for {
|
||||
// Ensure domain is configured but not connected to avoid race conditions.
|
||||
domainId <- aliasToUnconnectedDomainId(domain)
|
||||
_ <- EitherT.cond[Future](contracts.nonEmpty, (), "No contracts to add specified")
|
||||
if (contracts.isEmpty) {
|
||||
Either.right(logger.info("No contracts to add specified"))
|
||||
} else {
|
||||
lockAndAwaitEitherT(
|
||||
"repair.add", {
|
||||
for {
|
||||
// Ensure domain is configured but not connected to avoid race conditions.
|
||||
domainId <- aliasToUnconnectedDomainId(domain)
|
||||
|
||||
domain <- readDomainData(domainId)
|
||||
domain <- readDomainData(domainId)
|
||||
|
||||
filteredContracts <- contracts.parTraverseFilter(
|
||||
readRepairContractCurrentState(domain, _, ignoreAlreadyAdded)
|
||||
)
|
||||
filteredContracts <- contracts.parTraverseFilter(
|
||||
readRepairContractCurrentState(domain, _, ignoreAlreadyAdded)
|
||||
)
|
||||
|
||||
contractsByCreation = filteredContracts
|
||||
.groupBy(_.contract.ledgerCreateTime)
|
||||
.toList
|
||||
.sortBy { case (ts, _) =>
|
||||
ts
|
||||
}
|
||||
contractsByCreation = filteredContracts
|
||||
.groupBy(_.contract.ledgerCreateTime)
|
||||
.toList
|
||||
.sortBy { case (ts, _) =>
|
||||
ts
|
||||
}
|
||||
|
||||
_ <- PositiveInt
|
||||
.create(contractsByCreation.size)
|
||||
.fold(
|
||||
_ => EitherT.rightT[Future, String](logger.info("No contract needs to be added")),
|
||||
requestCountersToAllocate => {
|
||||
for {
|
||||
repair <- initRepairRequestAndVerifyPreconditions(
|
||||
domain,
|
||||
requestCountersToAllocate,
|
||||
)
|
||||
|
||||
contractsToAdd = repair.timesOfChange.zip(contractsByCreation)
|
||||
|
||||
// All referenced templates known and vetted
|
||||
_packagesVetted <- filteredContracts
|
||||
.map(
|
||||
_.contract.rawContractInstance.contractInstance.unversioned.template.packageId
|
||||
_ <- PositiveInt
|
||||
.create(contractsByCreation.size)
|
||||
.fold(
|
||||
_ => EitherT.rightT[Future, String](logger.info("No contract needs to be added")),
|
||||
requestCountersToAllocate => {
|
||||
for {
|
||||
repair <- initRepairRequestAndVerifyPreconditions(
|
||||
domain,
|
||||
requestCountersToAllocate,
|
||||
)
|
||||
.distinct
|
||||
.parTraverse_(packageVetted)
|
||||
|
||||
_uniqueKeysWithHostedMaintainerInContractsToAdd <- EitherTUtil.ifThenET(
|
||||
repair.domain.parameters.uniqueContractKeys
|
||||
) {
|
||||
val keysWithContractIdsF = filteredContracts
|
||||
.parTraverseFilter { repairContractWithCurrentState =>
|
||||
val contract = repairContractWithCurrentState.contract
|
||||
// Only check for duplicates where the participant hosts a maintainer
|
||||
getKeyIfOneMaintainerIsLocal(
|
||||
repair.domain.topologySnapshot,
|
||||
contract.metadata.maybeKeyWithMaintainers,
|
||||
participantId,
|
||||
).map { lfKeyO =>
|
||||
lfKeyO.flatMap(_ => contract.metadata.maybeKeyWithMaintainers).map {
|
||||
keyWithMaintainers =>
|
||||
keyWithMaintainers.globalKey -> contract.contractId
|
||||
contractsToAdd = repair.timesOfChange.zip(contractsByCreation)
|
||||
|
||||
// All referenced templates known and vetted
|
||||
_packagesVetted <- filteredContracts
|
||||
.map(
|
||||
_.contract.rawContractInstance.contractInstance.unversioned.template.packageId
|
||||
)
|
||||
.distinct
|
||||
.parTraverse_(packageVetted)
|
||||
|
||||
_uniqueKeysWithHostedMaintainerInContractsToAdd <- EitherTUtil.ifThenET(
|
||||
repair.domain.parameters.uniqueContractKeys
|
||||
) {
|
||||
val keysWithContractIdsF = filteredContracts
|
||||
.parTraverseFilter { repairContractWithCurrentState =>
|
||||
val contract = repairContractWithCurrentState.contract
|
||||
// Only check for duplicates where the participant hosts a maintainer
|
||||
getKeyIfOneMaintainerIsLocal(
|
||||
repair.domain.topologySnapshot,
|
||||
contract.metadata.maybeKeyWithMaintainers,
|
||||
participantId,
|
||||
).map { lfKeyO =>
|
||||
lfKeyO.flatMap(_ => contract.metadata.maybeKeyWithMaintainers).map {
|
||||
keyWithMaintainers =>
|
||||
keyWithMaintainers.globalKey -> contract.contractId
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
.map(x => x.groupBy { case (globalKey, _) => globalKey })
|
||||
EitherT(keysWithContractIdsF.map { keysWithContractIds =>
|
||||
val duplicates = keysWithContractIds.mapFilter { keyCoids =>
|
||||
if (keyCoids.lengthCompare(1) > 0) Some(keyCoids.map(_._2)) else None
|
||||
}
|
||||
Either.cond(
|
||||
duplicates.isEmpty,
|
||||
(),
|
||||
log(show"Cannot add multiple contracts for the same key(s): $duplicates"),
|
||||
)
|
||||
})
|
||||
}
|
||||
.map(x => x.groupBy { case (globalKey, _) => globalKey })
|
||||
EitherT(keysWithContractIdsF.map { keysWithContractIds =>
|
||||
val duplicates = keysWithContractIds.mapFilter { keyCoids =>
|
||||
if (keyCoids.lengthCompare(1) > 0) Some(keyCoids.map(_._2)) else None
|
||||
}
|
||||
Either.cond(
|
||||
duplicates.isEmpty,
|
||||
(),
|
||||
log(show"Cannot add multiple contracts for the same key(s): $duplicates"),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
hostedParties <- EitherT.right(
|
||||
filteredContracts
|
||||
.flatMap(_.witnesses)
|
||||
.distinct
|
||||
.parFilterA(hostsParty(repair.domain.topologySnapshot, participantId))
|
||||
.map(_.toSet)
|
||||
)
|
||||
hostedParties <- EitherT.right(
|
||||
filteredContracts
|
||||
.flatMap(_.witnesses)
|
||||
.distinct
|
||||
.parFilterA(hostsParty(repair.domain.topologySnapshot, participantId))
|
||||
.map(_.toSet)
|
||||
)
|
||||
|
||||
_ = logger.debug(s"Publishing ${filteredContracts.size} added contracts")
|
||||
_ = logger.debug(s"Publishing ${filteredContracts.size} added contracts")
|
||||
|
||||
contractsWithTimeOfChange = contractsToAdd.flatMap { case (toc, (_, cs)) =>
|
||||
cs.map(_ -> toc)
|
||||
}
|
||||
contractsWithTimeOfChange = contractsToAdd.flatMap { case (toc, (_, cs)) =>
|
||||
cs.map(_ -> toc)
|
||||
}
|
||||
|
||||
// Note the following purposely fails if any contract fails which results in not all contracts being processed.
|
||||
_ <- MonadUtil.sequentialTraverse(contractsWithTimeOfChange) {
|
||||
case (contract, timeOfChange) =>
|
||||
addContract(repair, ignoreStakeholderCheck)(contract, timeOfChange)
|
||||
}
|
||||
// Note the following purposely fails if any contract fails which results in not all contracts being processed.
|
||||
_ <- MonadUtil.sequentialTraverse(contractsWithTimeOfChange) {
|
||||
case (contract, timeOfChange) =>
|
||||
addContract(repair, ignoreStakeholderCheck)(contract, timeOfChange)
|
||||
}
|
||||
|
||||
// Publish added contracts upstream as created via the ledger api.
|
||||
_ <- EitherT.right(
|
||||
writeContractsAddedEvents(repair, hostedParties, contractsToAdd)
|
||||
)
|
||||
// Publish added contracts upstream as created via the ledger api.
|
||||
_ <- EitherT.right(
|
||||
writeContractsAddedEvents(repair, hostedParties, contractsToAdd)
|
||||
)
|
||||
|
||||
// If all has gone well, bump the clean head, effectively committing the changes to the domain.
|
||||
_ <- commitRepairs(repair)
|
||||
// If all has gone well, bump the clean head, effectively committing the changes to the domain.
|
||||
_ <- commitRepairs(repair)
|
||||
|
||||
} yield ()
|
||||
},
|
||||
)
|
||||
} yield ()
|
||||
},
|
||||
)
|
||||
} yield ()
|
||||
},
|
||||
)
|
||||
} yield ()
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/** Participant repair utility for manually purging (archiving) contracts in an offline fashion.
|
||||
|
@ -0,0 +1,70 @@
|
||||
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.canton.participant.ledger.api.client
|
||||
|
||||
import com.daml.ledger.javaapi.data.codegen.{
|
||||
Contract,
|
||||
ContractCompanion,
|
||||
ContractId,
|
||||
InterfaceCompanion,
|
||||
}
|
||||
import com.daml.ledger.javaapi.data.{
|
||||
CreatedEvent as JavaCreatedEvent,
|
||||
ExercisedEvent,
|
||||
Transaction as JavaTransaction,
|
||||
TransactionTree,
|
||||
}
|
||||
|
||||
import scala.jdk.CollectionConverters.*
|
||||
|
||||
object JavaDecodeUtil {
|
||||
def decodeCreated[TC](
|
||||
companion: ContractCompanion[TC, ?, ?]
|
||||
)(event: JavaCreatedEvent): Option[TC] =
|
||||
if (event.getTemplateId == companion.TEMPLATE_ID) {
|
||||
Some(companion.fromCreatedEvent(event))
|
||||
} else None
|
||||
|
||||
def decodeCreated[Id, View](
|
||||
companion: InterfaceCompanion[?, Id, View]
|
||||
)(event: JavaCreatedEvent): Option[Contract[Id, View]] =
|
||||
if (event.getInterfaceViews.containsKey(companion.TEMPLATE_ID)) {
|
||||
Some(companion.fromCreatedEvent(event))
|
||||
} else None
|
||||
|
||||
def decodeAllCreated[TC](
|
||||
companion: ContractCompanion[TC, ?, ?]
|
||||
)(transaction: JavaTransaction): Seq[TC] = {
|
||||
for {
|
||||
event <- transaction.getEvents.asScala.toList
|
||||
eventP = event.toProtoEvent
|
||||
created <- if (eventP.hasCreated) Seq(eventP.getCreated) else Seq()
|
||||
a <- decodeCreated(companion)(JavaCreatedEvent.fromProto(created)).toList
|
||||
} yield a
|
||||
}
|
||||
|
||||
def decodeArchivedExercise[TCid](
|
||||
companion: ContractCompanion[?, TCid, ?]
|
||||
)(event: ExercisedEvent): Option[TCid] =
|
||||
Option.when(event.getTemplateId == companion.TEMPLATE_ID && event.isConsuming)(
|
||||
companion.toContractId(new ContractId((event.getContractId)))
|
||||
)
|
||||
|
||||
def treeToCreated(transaction: TransactionTree): Seq[JavaCreatedEvent] =
|
||||
for {
|
||||
event <- transaction.getEventsById.values.asScala.toSeq
|
||||
created <- event match {
|
||||
case created: JavaCreatedEvent => Seq(created)
|
||||
case _ => Seq.empty
|
||||
}
|
||||
} yield created
|
||||
|
||||
def decodeAllCreatedTree[TC](
|
||||
companion: ContractCompanion[TC, ?, ?]
|
||||
)(transaction: TransactionTree): Seq[TC] =
|
||||
for {
|
||||
created <- treeToCreated(transaction)
|
||||
a <- decodeCreated(companion)(created).toList
|
||||
} yield a
|
||||
}
|
@ -44,7 +44,6 @@ import com.digitalasset.canton.participant.sync.{
|
||||
ConnectedDomainsLookup,
|
||||
SyncDomain,
|
||||
TransactionRoutingError,
|
||||
TransactionRoutingErrorWithDomain,
|
||||
}
|
||||
import com.digitalasset.canton.protocol.WellFormedTransaction.WithoutSuffixes
|
||||
import com.digitalasset.canton.protocol.*
|
||||
@ -78,7 +77,7 @@ class DomainRouter(
|
||||
Map[LfContractId, SerializableContract],
|
||||
) => EitherT[Future, TransactionRoutingError, FutureUnlessShutdown[TransactionSubmitted]],
|
||||
contractsTransferer: ContractsTransfer,
|
||||
snapshotProvider: DomainId => Either[TransactionRoutingError, TopologySnapshot],
|
||||
snapshotProvider: DomainId => Either[UnableToQueryTopologySnapshot.Failed, TopologySnapshot],
|
||||
serializableContractAuthenticator: SerializableContractAuthenticator,
|
||||
autoTransferTransaction: Boolean,
|
||||
domainSelectorFactory: DomainSelectorFactory,
|
||||
@ -193,16 +192,16 @@ class DomainRouter(
|
||||
private def allInformeesOnDomain(
|
||||
informees: Set[LfPartyId]
|
||||
)(domainId: DomainId)(implicit traceContext: TraceContext): Future[Boolean] = {
|
||||
snapshotProvider(domainId) match {
|
||||
case Left(err) =>
|
||||
snapshotProvider(domainId).bimap(
|
||||
(err: UnableToQueryTopologySnapshot.Failed) => {
|
||||
logger.warn(
|
||||
s"Unable to get topology snapshot to check whether informees are hosted on the domain: $err"
|
||||
)
|
||||
Future.successful(false)
|
||||
case Right(topologySnapshot) =>
|
||||
topologySnapshot.allHaveActiveParticipants(informees, _.isActive).value.map(_.isRight)
|
||||
}
|
||||
}
|
||||
},
|
||||
_.allHaveActiveParticipants(informees, _.isActive).value.map(_.isRight),
|
||||
)
|
||||
}.merge
|
||||
|
||||
private def chooseDomainForMultiDomain(
|
||||
domainSelector: DomainSelector
|
||||
@ -237,8 +236,10 @@ class DomainRouter(
|
||||
|
||||
val allContractsHaveDomainData: Boolean = inputContractsDomainData.withoutDomainData.isEmpty
|
||||
val contractData = inputContractsDomainData.withDomainData
|
||||
val contractsDomainNotConnected = contractData.filterNot { contractData =>
|
||||
snapshotProvider(contractData.domain).map(_ => true).getOrElse(false)
|
||||
val contractsDomainNotConnected = contractData.filter { contractData =>
|
||||
snapshotProvider(contractData.domain).left.exists { _: UnableToQueryTopologySnapshot.Failed =>
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
// Check that at least one submitter is a stakeholder so that we can transfer the contract if needed. This check
|
||||
@ -378,7 +379,7 @@ object DomainRouter {
|
||||
|
||||
private def domainStateProvider(connectedDomains: ConnectedDomainsLookup)(domain: DomainId)(
|
||||
implicit traceContext: TraceContext
|
||||
): Either[TransactionRoutingErrorWithDomain, (TopologySnapshot, ProtocolVersion)] =
|
||||
): Either[UnableToQueryTopologySnapshot.Failed, (TopologySnapshot, ProtocolVersion)] =
|
||||
connectedDomains
|
||||
.get(domain)
|
||||
.toRight(UnableToQueryTopologySnapshot.Failed(domain))
|
||||
|
@ -103,6 +103,7 @@ class InMemorySyncDomainPersistentStateX(
|
||||
new InMemoryTopologyStoreX(
|
||||
DomainStore(domainId.item),
|
||||
loggerFactory,
|
||||
timeouts,
|
||||
)
|
||||
|
||||
override val domainOutboxQueue = new DomainOutboxQueue(loggerFactory)
|
||||
|
@ -79,6 +79,7 @@ class QueueBasedDomainOutboxXTest
|
||||
val target = new InMemoryTopologyStoreX(
|
||||
TopologyStoreId.DomainStore(DefaultTestIdentities.domainId),
|
||||
loggerFactory,
|
||||
timeouts,
|
||||
)
|
||||
val queue = new DomainOutboxQueue(loggerFactory)
|
||||
val manager = new DomainTopologyManagerX(
|
||||
|
@ -80,10 +80,12 @@ class StoreBasedDomainOutboxXTest
|
||||
val source = new InMemoryTopologyStoreX(
|
||||
TopologyStoreId.AuthorizedStore,
|
||||
loggerFactory,
|
||||
timeouts,
|
||||
)
|
||||
val target = new InMemoryTopologyStoreX(
|
||||
TopologyStoreId.DomainStore(DefaultTestIdentities.domainId),
|
||||
loggerFactory,
|
||||
timeouts,
|
||||
)
|
||||
val manager = new AuthorizedTopologyManagerX(
|
||||
clock,
|
||||
|
@ -109,6 +109,12 @@ class SuppressingLogger private[logging] (
|
||||
)(implicit c: ClassTag[T], pos: source.Position): Assertion =
|
||||
assertLogs(checkThrowable[T](the[Throwable] thrownBy within), assertions: _*)
|
||||
|
||||
def assertThrowsAndLogsSuppressing[T <: Throwable](rule: SuppressionRule)(
|
||||
within: => Any,
|
||||
assertions: (LogEntry => Assertion)*
|
||||
)(implicit c: ClassTag[T], pos: source.Position): Assertion =
|
||||
assertLogs(rule)(checkThrowable[T](the[Throwable] thrownBy within), assertions: _*)
|
||||
|
||||
def assertThrowsAndLogsAsync[T <: Throwable](
|
||||
within: => Future[_],
|
||||
assertion: T => Assertion,
|
||||
|
@ -207,6 +207,7 @@ class TestingIdentityFactoryX(
|
||||
val store = new InMemoryTopologyStoreX(
|
||||
TopologyStoreId.AuthorizedStore,
|
||||
loggerFactory,
|
||||
DefaultProcessingTimeouts.testing,
|
||||
)
|
||||
|
||||
// Compute default participant permissions to be the highest granted to an individual party
|
||||
|
Loading…
Reference in New Issue
Block a user