[SoX with conflict checking] Implement party allocation validation for transactions conflict checks [DPP-837] (#12283)

* Implement party allocation validation for transactions conflict checks

CHANGELOG_BEGIN
CHANGELOG_END

* Propagate loggingContext to all processing stages
This commit is contained in:
tudor-da 2022-01-06 13:04:15 +01:00 committed by GitHub
parent bb92c463e7
commit d4ebce6f44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 113 additions and 34 deletions

View File

@ -82,6 +82,7 @@ da_scala_test_suite(
"//ledger/ledger-configuration",
"//ledger/ledger-offset",
"//ledger/participant-state",
"//libs-scala/contextualized-logging",
"//libs-scala/logging-entries",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:org_mockito_mockito_core",
@ -159,7 +160,6 @@ conformance_test(
"--verbose",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--exclude=ClosedWorldIT",
"--exclude=ConfigManagementServiceIT:CMConcurrentSetConflicting",
# Exclude offset command deduplication tests as
# Sandbox-on-X has support only for participant side deduplication,

View File

@ -10,6 +10,7 @@ case class BridgeConfig(
conflictCheckingEnabled: Boolean,
maxDedupSeconds: Int,
submissionBufferSize: Int,
implicitPartyAllocation: Boolean,
)
object BridgeConfigProvider extends ConfigProvider[BridgeConfig] {
@ -29,6 +30,14 @@ object BridgeConfigProvider extends ConfigProvider[BridgeConfig] {
.text("Enables the ledger-side submission conflict checking.")
.action((_, c) => c.copy(extra = c.extra.copy(conflictCheckingEnabled = true)))
parser
.opt[Boolean](name = "implicit-party-allocation")
.optional()
.action((x, c) => c.copy(extra = c.extra.copy(implicitPartyAllocation = x)))
.text(
s"When referring to a party that doesn't yet exist on the ledger, the participant will implicitly allocate that party."
+ s" You can optionally disable this behavior to bring participant into line with other ledgers."
)
()
}
@ -37,5 +46,6 @@ object BridgeConfigProvider extends ConfigProvider[BridgeConfig] {
conflictCheckingEnabled = false,
maxDedupSeconds = 30,
submissionBufferSize = 500,
implicitPartyAllocation = false,
)
}

View File

@ -5,7 +5,6 @@ package com.daml.ledger.sandbox
import java.util.UUID
import java.util.concurrent.{Executors, TimeUnit}
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.Materializer
@ -44,7 +43,7 @@ import com.daml.platform.apiserver.{
StandaloneApiServer,
StandaloneIndexService,
}
import com.daml.platform.configuration.ServerRole
import com.daml.platform.configuration.{PartyConfiguration, ServerRole}
import com.daml.platform.indexer.StandaloneIndexerServer
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.{DbSupport, LfValueTranslationCache}
@ -248,7 +247,7 @@ object SandboxOnXRunner {
config = apiServerConfig,
commandConfig = config.commandConfig,
submissionConfig = config.submissionConfig,
partyConfig = BridgeConfigProvider.partyConfig(config),
partyConfig = PartyConfiguration(config.extra.implicitPartyAllocation),
optWriteService = Some(writeService),
authService = BridgeConfigProvider.authService(config),
healthChecks = healthChecksWithIndexer + ("write" -> writeService),

View File

@ -11,10 +11,15 @@ import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.participant.state.v2.Update.CommandRejected.FinalReason
import com.daml.ledger.participant.state.v2.{CompletionInfo, Update}
import com.daml.ledger.sandbox.bridge.ConflictCheckingLedgerBridge._
import com.daml.ledger.sandbox.bridge.LedgerBridge.{fromOffset, successMapper, toOffset}
import com.daml.ledger.sandbox.bridge.LedgerBridge.{
fromOffset,
partyAllocationSuccessMapper,
successMapper,
toOffset,
}
import com.daml.ledger.sandbox.bridge.SequencerState.LastUpdatedAt
import com.daml.ledger.sandbox.domain.Rejection._
import com.daml.ledger.sandbox.domain.Submission.Transaction
import com.daml.ledger.sandbox.domain.Submission.{AllocateParty, Transaction}
import com.daml.ledger.sandbox.domain._
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
@ -27,6 +32,7 @@ import com.daml.platform.apiserver.execution.MissingContracts
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.appendonlydao.events._
import java.util.UUID
import scala.concurrent.{ExecutionContext, Future}
import scala.util.chaining._
import scala.util.{Failure, Success, Try}
@ -35,14 +41,16 @@ private[sandbox] class ConflictCheckingLedgerBridge(
participantId: Ref.ParticipantId,
indexService: IndexService,
initialLedgerEnd: Offset,
allocatedPartiesAtInitialization: Set[Ref.Party],
bridgeMetrics: BridgeMetrics,
errorFactories: ErrorFactories,
validatePartyAllocation: Boolean,
servicesThreadPoolSize: Int,
)(implicit
loggingContext: LoggingContext,
servicesExecutionContext: ExecutionContext,
servicesExecutionContext: ExecutionContext
) extends LedgerBridge {
private[this] implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
@volatile private var allocatedParties = allocatedPartiesAtInitialization
def flow: Flow[Submission, (Offset, Update), NotUsed] =
Flow[Submission]
@ -67,6 +75,7 @@ private[sandbox] class ConflictCheckingLedgerBridge(
transaction.transaction.updatedContractKeys,
transaction.transaction.consumedContracts,
Blinding.blind(transaction),
transaction.informees,
transactionSubmission,
)
)
@ -74,7 +83,7 @@ private[sandbox] class ConflictCheckingLedgerBridge(
.map(
withErrorLogger(submitterInfo.submissionId)(
invalidInputFromParticipantRejection(submitterInfo.toCompletionInfo())(_)
)
)(transactionSubmission.loggingContext, logger)
)
),
)
@ -89,7 +98,7 @@ private[sandbox] class ConflictCheckingLedgerBridge(
Timed.future(
bridgeMetrics.Stages.tagWithLedgerEnd,
indexService
.currentLedgerEnd()
.currentLedgerEnd()(preparedSubmission.submission.loggingContext)
.map(ledgerEnd =>
Right(ApiOffset.assertFromString(ledgerEnd.value) -> preparedSubmission)
),
@ -109,6 +118,7 @@ private[sandbox] class ConflictCheckingLedgerBridge(
_,
_,
blindingInfo,
transactionInformees,
originalSubmission,
),
)
@ -127,7 +137,7 @@ private[sandbox] class ConflictCheckingLedgerBridge(
originalSubmission.transactionMeta.ledgerEffectiveTime,
divulged = blindingInfo.divulgence.keySet,
).flatMap {
case Right(_) => validatePartyAllocation(originalSubmission)
case Right(_) => validateParties(originalSubmission, transactionInformees)
case rejection => Future.successful(rejection)
}.flatMap {
case Right(_) => validateKeyUsages(originalSubmission, keyInputs)
@ -136,7 +146,7 @@ private[sandbox] class ConflictCheckingLedgerBridge(
},
)
.map(_.map(_ => validated))
}
}(originalSubmission.loggingContext, logger)
case Right(validated) => Future.successful(Right(validated))
}
@ -154,10 +164,10 @@ private[sandbox] class ConflictCheckingLedgerBridge(
val update = in match {
case Left(rejection) => rejection
case Right((_, NoOpPreparedSubmission(other))) =>
successMapper(other, offsetIdx, participantId)
case Right((_, NoOpPreparedSubmission(submission))) =>
processNonTransactionSubmission(offsetIdx, submission)
case Right((noConflictUpTo, txSubmission: PreparedTransactionSubmission)) =>
val submitterInfo = txSubmission.originalSubmission.submitterInfo
val submitterInfo = txSubmission.submission.submitterInfo
withErrorLogger(submitterInfo.submissionId) { implicit errorLogger =>
conflictCheckWithInFlight(
@ -166,7 +176,7 @@ private[sandbox] class ConflictCheckingLedgerBridge(
keyInputs = txSubmission.keyInputs,
inputContracts = txSubmission.inputContracts,
)
}.fold(
}(txSubmission.submission.loggingContext, logger).fold(
toCommandRejectedUpdate(_, submitterInfo.toCompletionInfo()),
{ _ =>
// Update the sequencer state
@ -174,7 +184,7 @@ private[sandbox] class ConflictCheckingLedgerBridge(
.dequeue(noConflictUpTo)
.enqueue(newOffset, txSubmission.updatedKeys, txSubmission.consumedContracts)
successMapper(txSubmission.originalSubmission, offsetIdx, participantId)
successMapper(txSubmission.submission, offsetIdx, participantId)
},
)
}
@ -185,6 +195,20 @@ private[sandbox] class ConflictCheckingLedgerBridge(
}
}
private def processNonTransactionSubmission(offsetIndex: Long, submission: Submission): Update =
submission match {
case AllocateParty(hint, displayName, submissionId) =>
val party = Ref.Party.assertFromString(hint.getOrElse(UUID.randomUUID().toString))
if (allocatedParties(party))
logger.warn(
s"Found duplicate party submission with ID $party for submissionId ${Some(submissionId)}"
)(submission.loggingContext)
allocatedParties = allocatedParties + party
partyAllocationSuccessMapper(party, displayName, submissionId, participantId)
case other => successMapper(other, offsetIndex, participantId)
}
private def conflictCheckWithInFlight(
keysState: Map[Key, (Option[ContractId], LastUpdatedAt)],
consumedContractsState: Set[ContractId],
@ -220,15 +244,26 @@ private[sandbox] class ConflictCheckingLedgerBridge(
}
}
private def validatePartyAllocation(
transaction: Submission.Transaction
private def validateParties(
transaction: Submission.Transaction,
transactionInformees: Set[Ref.Party],
)(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
): AsyncValidation[Unit] = {
val _ = (transaction, contextualizedErrorLogger)
// TODO SoX: Implement
Future.successful(Right(()))
}
): AsyncValidation[Unit] =
// This check which is O(n) in the number of transaction informees does not warrant a separate async dispatch
Future.successful {
if (validatePartyAllocation) {
val unallocatedInformees = transactionInformees diff allocatedParties
Either.cond(
unallocatedInformees.isEmpty,
(),
toCommandRejectedUpdate(
UnallocatedParties(unallocatedInformees.toSet)(errorFactories),
transaction.submitterInfo.toCompletionInfo(),
),
)
} else Right(())
}
private def checkTimeModel(
transaction: Transaction
@ -251,7 +286,7 @@ private[sandbox] class ConflictCheckingLedgerBridge(
Future.successful(Right(()))
else
indexService
.lookupMaximumLedgerTime(referredContracts)
.lookupMaximumLedgerTime(referredContracts)(transaction.loggingContext)
.transform {
case Failure(MissingContracts(missingContractIds)) =>
Success(
@ -302,7 +337,7 @@ private[sandbox] class ConflictCheckingLedgerBridge(
case Right(_) =>
indexService
// TODO SoX: Perform lookup more efficiently and do not use a readers-based lookup
.lookupContractKey(transaction.transaction.informees, key)
.lookupContractKey(transaction.transaction.informees, key)(transaction.loggingContext)
.map { lookupResult =>
(inputState, lookupResult) match {
case (LfTransaction.NegativeKeyLookup, Some(actual)) =>

View File

@ -13,6 +13,7 @@ import com.daml.ledger.participant.state.v2.Update
import com.daml.ledger.resources.ResourceOwner
import com.daml.ledger.sandbox.BridgeConfig
import com.daml.ledger.sandbox.domain.Submission
import com.daml.lf.data.Ref.ParticipantId
import com.daml.lf.data.{Ref, Time}
import com.daml.lf.transaction.{CommittedTransaction, TransactionNodeStatistics}
import com.daml.logging.LoggingContext
@ -41,15 +42,20 @@ object LedgerBridge {
if (config.extra.conflictCheckingEnabled)
for {
initialLedgerEnd <- ResourceOwner.forFuture(() => indexService.currentLedgerEnd())
allocatedPartiesAtInitialization <- ResourceOwner.forFuture(() =>
indexService.listKnownParties().map(_.map(_.party).toSet)
)
conflictCheckingLedgerBridge = new ConflictCheckingLedgerBridge(
participantId = participantConfig.participantId,
indexService = indexService,
initialLedgerEnd =
Offset.fromHexString(Ref.HexString.assertFromString(initialLedgerEnd.value)),
allocatedPartiesAtInitialization = allocatedPartiesAtInitialization,
bridgeMetrics = bridgeMetrics,
errorFactories = ErrorFactories(
new ErrorCodesVersionSwitcher(config.enableSelfServiceErrorCodes)
),
validatePartyAllocation = !config.extra.implicitPartyAllocation,
servicesThreadPoolSize = servicesThreadPoolSize,
)
} yield conflictCheckingLedgerBridge
@ -68,6 +74,20 @@ object LedgerBridge {
)
}
private[bridge] def partyAllocationSuccessMapper(
party: Ref.Party,
displayName: Option[String],
submissionId: Ref.SubmissionId,
participantId: ParticipantId,
): Update.PartyAddedToParticipant =
Update.PartyAddedToParticipant(
party = party,
displayName = displayName.getOrElse(party),
participantId = participantId,
recordTime = Time.Timestamp.now(),
submissionId = Some(submissionId),
)
def successMapper(submission: Submission, index: Long, participantId: Ref.ParticipantId): Update =
submission match {
case s: Submission.AllocateParty =>

View File

@ -4,13 +4,16 @@
package com.daml.ledger.sandbox.bridge
import com.daml.ledger.sandbox.domain.Submission
import com.daml.lf.data.Ref
import com.daml.lf.transaction.BlindingInfo
import com.daml.lf.transaction.Transaction.{KeyInput => TxKeyInput}
import com.daml.platform.store.appendonlydao.events
import com.daml.platform.store.appendonlydao.events.Key
// A submission that has been prepared for conflict checking
sealed trait PreparedSubmission extends Product with Serializable
sealed trait PreparedSubmission extends Product with Serializable {
def submission: Submission
}
// A transaction submission bundled with all its precomputed effects.
final case class PreparedTransactionSubmission(
@ -19,7 +22,8 @@ final case class PreparedTransactionSubmission(
updatedKeys: Map[Key, Option[events.ContractId]],
consumedContracts: Set[events.ContractId],
blindingInfo: BlindingInfo,
originalSubmission: Submission.Transaction,
transactionInformees: Set[Ref.Party],
submission: Submission.Transaction,
) extends PreparedSubmission
// A no-op prepared submission for update types that do not need

View File

@ -8,8 +8,11 @@ import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.v2.{SubmitterInfo, TransactionMeta}
import com.daml.lf.data.{Ref, Time}
import com.daml.lf.transaction.SubmittedTransaction
import com.daml.logging.LoggingContext
private[sandbox] sealed trait Submission extends Product with Serializable
private[sandbox] sealed trait Submission extends Product with Serializable {
def loggingContext: LoggingContext
}
private[sandbox] object Submission {
final case class Transaction(
@ -17,20 +20,27 @@ private[sandbox] object Submission {
transactionMeta: TransactionMeta,
transaction: SubmittedTransaction,
estimatedInterpretationCost: Long,
) extends Submission
)(implicit val loggingContext: LoggingContext)
extends Submission
final case class Config(
maxRecordTime: Time.Timestamp,
submissionId: Ref.SubmissionId,
config: Configuration,
) extends Submission
)(implicit val loggingContext: LoggingContext)
extends Submission
final case class AllocateParty(
hint: Option[Ref.Party],
displayName: Option[String],
submissionId: Ref.SubmissionId,
) extends Submission
)(implicit val loggingContext: LoggingContext)
extends Submission
final case class UploadPackages(
submissionId: Ref.SubmissionId,
archives: List[Archive],
sourceDescription: Option[String],
) extends Submission
)(implicit val loggingContext: LoggingContext)
extends Submission
}

View File

@ -13,6 +13,7 @@ import com.daml.lf.crypto
import com.daml.lf.data.{Bytes, ImmArray, Ref, Time}
import com.daml.lf.transaction._
import com.daml.lf.value.Value.{ContractId, ValueNone}
import com.daml.logging.LoggingContext
import org.mockito.MockitoSugar
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
@ -68,7 +69,7 @@ class BridgeWriteServiceTest extends AnyFlatSpec with MockitoSugar with Matchers
transactionMeta,
transaction = tx,
estimatedInterpretationCost = 0,
)
)(LoggingContext.ForTesting)
val expected = TransactionNodeStatistics(tx)