mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-19 16:57:40 +03:00
Make interpretation cost available for LedgerWriter/WriteService (#6515)
* Measure time of interpretation and store it in CommandExecutionResult. * Added parameters for passing in interpretation time for WriteService and LedgerWriter. * Code tidying. * Added CommitMetadata parameter to LedgerWriter. * Thread through interpretation time. Aggregate interpretation times for a batch. * Take max of interpretation times. * Calculate interpretation time in nanos. * Moved CommitMetadata into separate file. CHANGELOG_BEGIN CHANGELOG_END * Apply suggestions from code review Co-authored-by: Samir Talwar <samir.talwar@digitalasset.com> * Delegate deprecated methods to method with new signature. * Code tidying. * Suppress deprecation warnings. * Made interpretation cost optional in CommitMetadata. * Test that we populate interpretation time. * Code tidying. * Apply suggestions from code review Co-authored-by: Samir Talwar <samir.talwar@digitalasset.com> * Code tidying. * Some more tests. Do not return interpretation cost for a batch if it only contains non-transaction submissions. * Reformatted. * Reformatted. * Named arbitrary interpretation cost. * Reverted changes for BatchingLedgerWriter. * Always drop commit metadata for batches and don't report interpretation cost. * More specific expectations. * Include authorization check and blinding in interpretation time. Co-authored-by: Samir Talwar <samir.talwar@digitalasset.com>
This commit is contained in:
parent
99a9662781
commit
ccfb0ac94e
@ -46,7 +46,7 @@ import java.nio.file.{Files, Path, Paths}
|
||||
*
|
||||
* This class is thread safe as long `nextRandomInt` is.
|
||||
*/
|
||||
final class Engine(config: Engine.Config) {
|
||||
class Engine(config: Engine.Config) {
|
||||
private[this] val compiledPackages = ConcurrentCompiledPackages()
|
||||
private[this] val preprocessor = new preprocessing.Preprocessor(compiledPackages)
|
||||
private[this] var profileDir: Option[Path] = None
|
||||
@ -73,7 +73,7 @@ final class Engine(config: Engine.Config) {
|
||||
*
|
||||
*
|
||||
* [[transactionSeed]] is the master hash used to derive node and contractId discriminator.
|
||||
* If let undefined, no discriminator will be generated.
|
||||
* If left undefined, no discriminator will be generated.
|
||||
*
|
||||
* This method does NOT perform authorization checks; ResultDone can contain a transaction that's not well-authorized.
|
||||
*
|
||||
@ -308,10 +308,9 @@ final class Engine(config: Engine.Config) {
|
||||
return Result.needPackage(
|
||||
pkgId,
|
||||
pkg => {
|
||||
compiledPackages.addPackage(pkgId, pkg).flatMap {
|
||||
case _ =>
|
||||
callback(compiledPackages)
|
||||
interpretLoop(machine, time)
|
||||
compiledPackages.addPackage(pkgId, pkg).flatMap { _ =>
|
||||
callback(compiledPackages)
|
||||
interpretLoop(machine, time)
|
||||
}
|
||||
}
|
||||
)
|
||||
@ -331,13 +330,11 @@ final class Engine(config: Engine.Config) {
|
||||
case SResultNeedKey(gk, _, cb) =>
|
||||
return ResultNeedKey(
|
||||
gk,
|
||||
(
|
||||
result =>
|
||||
if (cb(SKeyLookupResult(result)))
|
||||
interpretLoop(machine, time)
|
||||
else
|
||||
ResultError(Error(s"dependency error: couldn't find key ${gk.key}"))
|
||||
)
|
||||
result =>
|
||||
if (cb(SKeyLookupResult(result)))
|
||||
interpretLoop(machine, time)
|
||||
else
|
||||
ResultError(Error(s"dependency error: couldn't find key ${gk.key}"))
|
||||
)
|
||||
|
||||
case _: SResultScenarioCommit =>
|
||||
@ -371,9 +368,9 @@ final class Engine(config: Engine.Config) {
|
||||
case Some(profileDir) =>
|
||||
val hash = meta.nodeSeeds(0)._2.toHexString
|
||||
val desc = Engine.profileDesc(tx)
|
||||
machine.profile.name = s"${desc}-${hash.substring(0, 6)}"
|
||||
machine.profile.name = s"$desc-${hash.substring(0, 6)}"
|
||||
val profileFile =
|
||||
profileDir.resolve(Paths.get(s"${meta.submissionTime}-${desc}-${hash}.json"))
|
||||
profileDir.resolve(Paths.get(s"${meta.submissionTime}-$desc-$hash.json"))
|
||||
machine.profile.writeSpeedscopeJson(profileFile)
|
||||
}
|
||||
ResultDone((tx, meta))
|
||||
@ -398,7 +395,7 @@ final class Engine(config: Engine.Config) {
|
||||
def preloadPackage(pkgId: PackageId, pkg: Package): Result[Unit] =
|
||||
compiledPackages.addPackage(pkgId, pkg)
|
||||
|
||||
def setProfileDir(optProfileDir: Option[Path]) = {
|
||||
def setProfileDir(optProfileDir: Option[Path]): Unit = {
|
||||
optProfileDir match {
|
||||
case None =>
|
||||
compiledPackages.profilingMode = speedy.Compiler.NoProfile
|
||||
@ -424,13 +421,13 @@ object Engine {
|
||||
allowedOutputTransactionVersions: VersionRange[transaction.TransactionVersion],
|
||||
) extends NoCopy
|
||||
|
||||
val StableConfig =
|
||||
val StableConfig: Config =
|
||||
Config.assertBuild(
|
||||
allowedOutputValueVersions = ValueVersions.SupportedStableVersions,
|
||||
allowedOutputTransactionVersions = transaction.TransactionVersions.SupportedStableVersions
|
||||
)
|
||||
|
||||
val DevConfig =
|
||||
val DevConfig: Config =
|
||||
new Config(
|
||||
allowedOutputValueVersions = ValueVersions.SupportedDevVersions,
|
||||
allowedOutputTransactionVersions = TransactionVersions.SupportedDevVersions,
|
||||
@ -473,7 +470,7 @@ object Engine {
|
||||
private def profileDesc(tx: Tx.Transaction): String = {
|
||||
if (tx.roots.length == 1) {
|
||||
val makeDesc = (kind: String, tmpl: Ref.Identifier, extra: Option[String]) =>
|
||||
s"${kind}:${tmpl.qualifiedName.name}${extra.map(extra => s":${extra}").getOrElse("")}"
|
||||
s"$kind:${tmpl.qualifiedName.name}${extra.map(extra => s":$extra").getOrElse("")}"
|
||||
tx.nodes.get(tx.roots(0)).toList.head match {
|
||||
case create: NodeCreate[_, _] => makeDesc("create", create.coinst.template, None)
|
||||
case exercise: NodeExercises[_, _, _] =>
|
||||
@ -486,6 +483,6 @@ object Engine {
|
||||
}
|
||||
}
|
||||
|
||||
def DevEngine() = new Engine(Engine.DevConfig)
|
||||
def DevEngine(): Engine = new Engine(Engine.DevConfig)
|
||||
|
||||
}
|
||||
|
@ -40,7 +40,11 @@ final class InMemoryLedgerReaderWriter(
|
||||
metrics: Metrics)(implicit materializer: Materializer, executionContext: ExecutionContext)
|
||||
extends LedgerReader
|
||||
with LedgerWriter {
|
||||
override def commit(correlationId: String, envelope: Bytes): Future[SubmissionResult] =
|
||||
override def commit(
|
||||
correlationId: String,
|
||||
envelope: Bytes,
|
||||
metadata: CommitMetadata,
|
||||
): Future[SubmissionResult] =
|
||||
ledgerStateAccess
|
||||
.inTransaction { ledgerStateOperations =>
|
||||
committer
|
||||
|
@ -5,6 +5,7 @@ package com.daml.ledger.on.memory
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
|
||||
import com.daml.ledger.participant.state.kvutils.api.CommitMetadata
|
||||
import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult}
|
||||
import com.daml.ledger.validator.{BatchedValidatingCommitter, LedgerStateOperations}
|
||||
import com.daml.lf.data.Ref
|
||||
@ -44,10 +45,12 @@ class InMemoryLedgerReaderWriterSpec
|
||||
new Metrics(new MetricRegistry)
|
||||
)
|
||||
|
||||
instance.commit("correlation ID", ByteString.copyFromUtf8("some bytes")).map { actual =>
|
||||
verify(mockDispatcher, times(0)).signalNewHead(anyInt())
|
||||
actual should be(a[SubmissionResult.InternalError])
|
||||
}
|
||||
instance
|
||||
.commit("correlation ID", ByteString.copyFromUtf8("some bytes"), CommitMetadata.Empty)
|
||||
.map { actual =>
|
||||
verify(mockDispatcher, times(0)).signalNewHead(anyInt())
|
||||
actual should be(a[SubmissionResult.InternalError])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,7 +15,12 @@ import com.daml.ledger.api.health.{HealthStatus, Healthy}
|
||||
import com.daml.ledger.on.sql.SqlLedgerReaderWriter._
|
||||
import com.daml.ledger.on.sql.queries.Queries
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlLogEntryId, DamlStateValue}
|
||||
import com.daml.ledger.participant.state.kvutils.api.{LedgerReader, LedgerRecord, LedgerWriter}
|
||||
import com.daml.ledger.participant.state.kvutils.api.{
|
||||
CommitMetadata,
|
||||
LedgerReader,
|
||||
LedgerRecord,
|
||||
LedgerWriter
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.{Bytes, KVOffset}
|
||||
import com.daml.ledger.participant.state.v1._
|
||||
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
|
||||
@ -90,7 +95,11 @@ final class SqlLedgerReaderWriter(
|
||||
)
|
||||
.map { case (_, entry) => entry }
|
||||
|
||||
override def commit(correlationId: String, envelope: Bytes): Future[SubmissionResult] =
|
||||
override def commit(
|
||||
correlationId: String,
|
||||
envelope: Bytes,
|
||||
metadata: CommitMetadata,
|
||||
): Future[SubmissionResult] =
|
||||
committer.commit(correlationId, envelope, participantId)
|
||||
|
||||
private object SqlLedgerStateAccess extends LedgerStateAccess[Index] {
|
||||
|
@ -16,11 +16,18 @@ final class TimedWriteService(delegate: WriteService, metrics: Metrics) extends
|
||||
override def submitTransaction(
|
||||
submitterInfo: SubmitterInfo,
|
||||
transactionMeta: TransactionMeta,
|
||||
transaction: SubmittedTransaction
|
||||
transaction: SubmittedTransaction,
|
||||
estimatedInterpretationCost: Long
|
||||
): CompletionStage[SubmissionResult] =
|
||||
Timed.completionStage(
|
||||
metrics.daml.services.write.submitTransaction,
|
||||
delegate.submitTransaction(submitterInfo, transactionMeta, transaction))
|
||||
delegate.submitTransaction(
|
||||
submitterInfo,
|
||||
transactionMeta,
|
||||
transaction,
|
||||
estimatedInterpretationCost,
|
||||
),
|
||||
)
|
||||
|
||||
override def uploadPackages(
|
||||
submissionId: SubmissionId,
|
||||
|
@ -10,6 +10,9 @@ load(
|
||||
da_scala_library(
|
||||
name = "participant-state",
|
||||
srcs = glob(["src/main/scala/com/daml/ledger/participant/state/v1/**/*.scala"]),
|
||||
plugins = [
|
||||
"@maven//:com_github_ghik_silencer_plugin_2_12_11",
|
||||
],
|
||||
resources = glob(["src/main/resources/**/*"]),
|
||||
tags = ["maven_coordinates=com.daml:participant-state:__VERSION__"],
|
||||
visibility = [
|
||||
@ -26,6 +29,7 @@ da_scala_library(
|
||||
"//language-support/scala/bindings",
|
||||
"//ledger/ledger-api-health",
|
||||
"//ledger/participant-state/protobuf:ledger_configuration_java_proto",
|
||||
"@maven//:com_github_ghik_silencer_lib_2_12_11",
|
||||
"@maven//:com_google_protobuf_protobuf_java",
|
||||
"@maven//:com_typesafe_akka_akka_actor_2_12",
|
||||
"@maven//:com_typesafe_akka_akka_stream_2_12",
|
||||
|
@ -20,6 +20,9 @@ load("@os_info//:os_info.bzl", "is_windows")
|
||||
da_scala_library(
|
||||
name = "kvutils",
|
||||
srcs = glob(["src/main/scala/**/*.scala"]),
|
||||
plugins = [
|
||||
"@maven//:com_github_ghik_silencer_plugin_2_12_11",
|
||||
],
|
||||
tags = ["maven_coordinates=com.daml:participant-state-kvutils:__VERSION__"],
|
||||
visibility = [
|
||||
"//visibility:public",
|
||||
@ -43,6 +46,7 @@ da_scala_library(
|
||||
"//ledger/participant-state",
|
||||
"//ledger/participant-state/protobuf:ledger_configuration_java_proto",
|
||||
"//libs-scala/contextualized-logging",
|
||||
"@maven//:com_github_ghik_silencer_lib_2_12_11",
|
||||
"@maven//:com_google_guava_guava",
|
||||
"@maven//:com_google_protobuf_protobuf_java",
|
||||
"@maven//:com_typesafe_akka_akka_actor_2_12",
|
||||
|
@ -37,7 +37,10 @@ class BatchingLedgerWriter(val queue: BatchingQueue, val writer: LedgerWriter)(
|
||||
private val logger = ContextualizedLogger.get(getClass)
|
||||
private val queueHandle = queue.run(commitBatch)
|
||||
|
||||
override def commit(correlationId: String, envelope: kvutils.Bytes): Future[SubmissionResult] =
|
||||
override def commit(
|
||||
correlationId: String,
|
||||
envelope: kvutils.Bytes,
|
||||
metadata: CommitMetadata): Future[SubmissionResult] =
|
||||
queueHandle
|
||||
.offer(
|
||||
DamlSubmissionBatch.CorrelatedSubmission.newBuilder
|
||||
@ -69,11 +72,11 @@ class BatchingLedgerWriter(val queue: BatchingQueue, val writer: LedgerWriter)(
|
||||
.build
|
||||
val envelope = Envelope.enclose(batch)
|
||||
writer
|
||||
.commit(correlationId, envelope)
|
||||
.commit(correlationId, envelope, CommitMetadata.Empty)
|
||||
.map {
|
||||
case SubmissionResult.Acknowledged => ()
|
||||
case err =>
|
||||
logger.error(s"Batch dropped as commit failed: $err")
|
||||
case error =>
|
||||
logger.error(s"Batch dropped as commit failed: $error")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,25 @@
|
||||
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.participant.state.kvutils.api
|
||||
|
||||
/**
|
||||
* Exposes metadata about the commit.
|
||||
* The methods may lazily evaluate.
|
||||
*/
|
||||
sealed trait CommitMetadata {
|
||||
|
||||
/**
|
||||
* @return estimated interpretation cost for a transaction; None in case of non-transaction
|
||||
* submissions
|
||||
*/
|
||||
def estimatedInterpretationCost: Option[Long]
|
||||
}
|
||||
|
||||
object CommitMetadata {
|
||||
val Empty: CommitMetadata =
|
||||
SimpleCommitMetadata(estimatedInterpretationCost = None)
|
||||
}
|
||||
|
||||
final case class SimpleCommitMetadata(override val estimatedInterpretationCost: Option[Long])
|
||||
extends CommitMetadata
|
@ -47,8 +47,15 @@ class KeyValueParticipantState(
|
||||
override def submitTransaction(
|
||||
submitterInfo: SubmitterInfo,
|
||||
transactionMeta: TransactionMeta,
|
||||
transaction: SubmittedTransaction): CompletionStage[SubmissionResult] =
|
||||
writerAdapter.submitTransaction(submitterInfo, transactionMeta, transaction)
|
||||
transaction: SubmittedTransaction,
|
||||
estimatedInterpretationCost: Long,
|
||||
): CompletionStage[SubmissionResult] =
|
||||
writerAdapter.submitTransaction(
|
||||
submitterInfo,
|
||||
transactionMeta,
|
||||
transaction,
|
||||
estimatedInterpretationCost,
|
||||
)
|
||||
|
||||
override def submitConfiguration(
|
||||
maxRecordTime: Time.Timestamp,
|
||||
|
@ -24,6 +24,7 @@ class KeyValueParticipantStateWriter(writer: LedgerWriter, metrics: Metrics) ext
|
||||
submitterInfo: SubmitterInfo,
|
||||
transactionMeta: TransactionMeta,
|
||||
transaction: SubmittedTransaction,
|
||||
estimatedInterpretationCost: Long,
|
||||
): CompletionStage[SubmissionResult] = {
|
||||
val submission =
|
||||
keyValueSubmission.transactionToSubmission(
|
||||
@ -31,7 +32,9 @@ class KeyValueParticipantStateWriter(writer: LedgerWriter, metrics: Metrics) ext
|
||||
transactionMeta,
|
||||
transaction,
|
||||
)
|
||||
commit(correlationId = submitterInfo.commandId, submission = submission)
|
||||
val metadata = SimpleCommitMetadata(
|
||||
estimatedInterpretationCost = Some(estimatedInterpretationCost))
|
||||
commit(correlationId = submitterInfo.commandId, submission = submission, metadata = metadata)
|
||||
}
|
||||
|
||||
override def uploadPackages(
|
||||
@ -78,6 +81,8 @@ class KeyValueParticipantStateWriter(writer: LedgerWriter, metrics: Metrics) ext
|
||||
|
||||
private def commit(
|
||||
correlationId: String,
|
||||
submission: DamlSubmission): CompletionStage[SubmissionResult] =
|
||||
FutureConverters.toJava(writer.commit(correlationId, Envelope.enclose(submission)))
|
||||
submission: DamlSubmission,
|
||||
metadata: CommitMetadata = CommitMetadata.Empty,
|
||||
): CompletionStage[SubmissionResult] =
|
||||
FutureConverters.toJava(writer.commit(correlationId, Envelope.enclose(submission), metadata))
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ package com.daml.ledger.participant.state.kvutils.api
|
||||
import com.daml.ledger.api.health.ReportsHealth
|
||||
import com.daml.ledger.participant.state.kvutils.Bytes
|
||||
import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult}
|
||||
import com.github.ghik.silencer.silent
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
@ -27,8 +28,21 @@ trait LedgerWriter extends ReportsHealth {
|
||||
*
|
||||
* @param correlationId correlation ID to be used for logging purposes
|
||||
* @param envelope opaque submission; may be compressed
|
||||
* @param metadata metadata associated to this particular commit
|
||||
* @return future for sending the submission; for possible results see
|
||||
* [[com.daml.ledger.participant.state.v1.SubmissionResult]]
|
||||
*/
|
||||
def commit(correlationId: String, envelope: Bytes): Future[SubmissionResult]
|
||||
@silent("deprecated")
|
||||
def commit(
|
||||
correlationId: String,
|
||||
envelope: Bytes,
|
||||
metadata: CommitMetadata,
|
||||
): Future[SubmissionResult] =
|
||||
commit(correlationId, envelope)
|
||||
|
||||
@deprecated("Will be removed in 1.4.0", "1.3.0")
|
||||
def commit(
|
||||
correlationId: String,
|
||||
envelope: Bytes,
|
||||
): Future[SubmissionResult] = commit(correlationId, envelope, CommitMetadata.Empty)
|
||||
}
|
||||
|
@ -15,8 +15,15 @@ class TimedLedgerWriter(delegate: LedgerWriter, metrics: Metrics) extends Ledger
|
||||
override def participantId: ParticipantId =
|
||||
delegate.participantId
|
||||
|
||||
override def commit(correlationId: String, envelope: Bytes): Future[SubmissionResult] =
|
||||
Timed.future(metrics.daml.kvutils.writer.commit, delegate.commit(correlationId, envelope))
|
||||
override def commit(
|
||||
correlationId: String,
|
||||
envelope: Bytes,
|
||||
metadata: CommitMetadata,
|
||||
): Future[SubmissionResult] =
|
||||
Timed.future(
|
||||
metrics.daml.kvutils.writer.commit,
|
||||
delegate.commit(correlationId, envelope, metadata),
|
||||
)
|
||||
|
||||
override def currentHealth(): HealthStatus =
|
||||
delegate.currentHealth()
|
||||
|
@ -65,7 +65,11 @@ package object api {
|
||||
|
||||
override def participantId: ParticipantId = writer.participantId
|
||||
|
||||
override def commit(correlationId: String, envelope: Bytes): Future[SubmissionResult] =
|
||||
writer.commit(correlationId, envelope)
|
||||
override def commit(
|
||||
correlationId: String,
|
||||
envelope: Bytes,
|
||||
metadata: CommitMetadata,
|
||||
): Future[SubmissionResult] =
|
||||
writer.commit(correlationId, envelope, metadata)
|
||||
}
|
||||
}
|
||||
|
@ -298,7 +298,11 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(
|
||||
_ <- ps.allocateParty(hint = Some(alice), None, newSubmissionId()).toScala
|
||||
(offset1, _) <- waitForNextUpdate(ps, None)
|
||||
_ <- ps
|
||||
.submitTransaction(submitterInfo(rt, alice), transactionMeta(rt), emptyTransaction)
|
||||
.submitTransaction(
|
||||
submitterInfo(rt, alice),
|
||||
transactionMeta(rt),
|
||||
emptyTransaction,
|
||||
DefaultInterpretationCost)
|
||||
.toScala
|
||||
(offset2, _) <- waitForNextUpdate(ps, Some(offset1))
|
||||
} yield {
|
||||
@ -317,6 +321,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(
|
||||
submitterInfo(rt, alice, commandIds._1),
|
||||
transactionMeta(rt),
|
||||
emptyTransaction,
|
||||
DefaultInterpretationCost,
|
||||
)
|
||||
.toScala
|
||||
(offset2, update2) <- waitForNextUpdate(ps, Some(offset1))
|
||||
@ -326,6 +331,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(
|
||||
submitterInfo(rt, alice, commandIds._1),
|
||||
transactionMeta(rt),
|
||||
emptyTransaction,
|
||||
DefaultInterpretationCost,
|
||||
)
|
||||
.toScala
|
||||
result4 <- ps
|
||||
@ -333,6 +339,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(
|
||||
submitterInfo(rt, alice, commandIds._2),
|
||||
transactionMeta(rt),
|
||||
emptyTransaction,
|
||||
DefaultInterpretationCost,
|
||||
)
|
||||
.toScala
|
||||
(offset3, update3) <- waitForNextUpdate(ps, Some(offset2))
|
||||
@ -363,14 +370,16 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(
|
||||
.submitTransaction(
|
||||
submitterInfo(rt, alice, "X1"),
|
||||
transactionMeta(rt),
|
||||
emptyTransaction)
|
||||
emptyTransaction,
|
||||
DefaultInterpretationCost)
|
||||
.toScala
|
||||
(offset2, _) <- waitForNextUpdate(ps, Some(offset1))
|
||||
result3 <- ps
|
||||
.submitTransaction(
|
||||
submitterInfo(rt, alice, "X2"),
|
||||
transactionMeta(rt),
|
||||
emptyTransaction)
|
||||
emptyTransaction,
|
||||
DefaultInterpretationCost)
|
||||
.toScala
|
||||
(offset3, update3) <- waitForNextUpdate(ps, Some(offset2))
|
||||
results = Seq(result1, result2, result3)
|
||||
@ -403,6 +412,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(
|
||||
submitterInfo(rt, unallocatedParty),
|
||||
transactionMeta(rt),
|
||||
emptyTransaction,
|
||||
DefaultInterpretationCost,
|
||||
)
|
||||
.toScala
|
||||
(offset2, update2) <- waitForNextUpdate(ps, Some(offset1))
|
||||
@ -429,6 +439,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(
|
||||
submitterInfo(rt, party = newParty),
|
||||
transactionMeta(rt),
|
||||
emptyTransaction,
|
||||
DefaultInterpretationCost,
|
||||
)
|
||||
.toScala
|
||||
(offset4, update4) <- waitForNextUpdate(ps, Some(offset3))
|
||||
@ -670,6 +681,7 @@ object ParticipantStateIntegrationSpecBase {
|
||||
type ParticipantState = ReadService with WriteService
|
||||
|
||||
private val IdleTimeout: FiniteDuration = 5.seconds
|
||||
private val DefaultInterpretationCost = 0L
|
||||
|
||||
private val emptyTransaction: SubmittedTransaction =
|
||||
Tx.SubmittedTransaction(TransactionBuilder.Empty)
|
||||
|
@ -36,7 +36,7 @@ class BatchingLedgerWriterSpec
|
||||
val handle = mock[RunningBatchingQueueHandle]
|
||||
when(handle.alive).thenReturn(false)
|
||||
val queue = mock[BatchingQueue]
|
||||
when(queue.run(any[BatchingQueue.CommitBatchFunction]())(any[Materializer]()))
|
||||
when(queue.run(any[BatchingQueue.CommitBatchFunction]())(any[Materializer]))
|
||||
.thenReturn(handle)
|
||||
val writer = mock[LedgerWriter]
|
||||
val batchingWriter =
|
||||
@ -54,11 +54,15 @@ class BatchingLedgerWriterSpec
|
||||
LoggingContext.newLoggingContext { implicit logCtx =>
|
||||
new BatchingLedgerWriter(immediateBatchingQueue, mockWriter)
|
||||
}
|
||||
val expected = createExpectedBatch(aCorrelationId -> aSubmission)
|
||||
val expectedBatch = createExpectedBatch(aCorrelationId -> aSubmission)
|
||||
for {
|
||||
submissionResult <- batchingWriter.commit(aCorrelationId, aSubmission)
|
||||
submissionResult <- batchingWriter.commit(aCorrelationId, aSubmission, someCommitMetadata)
|
||||
} yield {
|
||||
verify(mockWriter).commit(anyString(), ArgumentMatchers.eq(expected))
|
||||
val expectedCommitMetadata = SimpleCommitMetadata(estimatedInterpretationCost = None)
|
||||
verify(mockWriter).commit(
|
||||
anyString(),
|
||||
ArgumentMatchers.eq(expectedBatch),
|
||||
ArgumentMatchers.eq(expectedCommitMetadata))
|
||||
submissionResult should be(SubmissionResult.Acknowledged)
|
||||
}
|
||||
}
|
||||
@ -71,12 +75,12 @@ class BatchingLedgerWriterSpec
|
||||
new BatchingLedgerWriter(immediateBatchingQueue, mockWriter)
|
||||
}
|
||||
for {
|
||||
result1 <- batchingWriter.commit("test1", aSubmission)
|
||||
result2 <- batchingWriter.commit("test2", aSubmission)
|
||||
result3 <- batchingWriter.commit("test3", aSubmission)
|
||||
result1 <- batchingWriter.commit("test1", aSubmission, someCommitMetadata)
|
||||
result2 <- batchingWriter.commit("test2", aSubmission, someCommitMetadata)
|
||||
result3 <- batchingWriter.commit("test3", aSubmission, someCommitMetadata)
|
||||
} yield {
|
||||
verify(mockWriter, times(3))
|
||||
.commit(anyString(), any[kvutils.Bytes])
|
||||
.commit(anyString(), any[kvutils.Bytes], any[CommitMetadata])
|
||||
all(Seq(result1, result2, result3)) should be(SubmissionResult.Acknowledged)
|
||||
batchingWriter.currentHealth should be(HealthStatus.healthy)
|
||||
}
|
||||
@ -87,8 +91,9 @@ class BatchingLedgerWriterSpec
|
||||
}
|
||||
|
||||
object BatchingLedgerWriterSpec {
|
||||
val aCorrelationId = "aCorrelationId"
|
||||
val aSubmission = ByteString.copyFromUtf8("a submission")
|
||||
private val aCorrelationId = "aCorrelationId"
|
||||
private val aSubmission = ByteString.copyFromUtf8("a submission")
|
||||
private val someCommitMetadata = SimpleCommitMetadata(estimatedInterpretationCost = Some(123L))
|
||||
|
||||
def immediateBatchingQueue()(implicit executionContext: ExecutionContext): BatchingQueue =
|
||||
new BatchingQueue {
|
||||
@ -111,10 +116,14 @@ object BatchingLedgerWriterSpec {
|
||||
captor: Option[ArgumentCaptor[kvutils.Bytes]] = None,
|
||||
submissionResult: SubmissionResult = SubmissionResult.Acknowledged): LedgerWriter = {
|
||||
val writer = mock[LedgerWriter]
|
||||
when(writer.commit(anyString(), captor.map(_.capture()).getOrElse(any[kvutils.Bytes]())))
|
||||
when(
|
||||
writer.commit(
|
||||
anyString(),
|
||||
captor.map(_.capture()).getOrElse(any[kvutils.Bytes]),
|
||||
any[CommitMetadata]))
|
||||
.thenReturn(Future.successful(SubmissionResult.Acknowledged))
|
||||
when(writer.participantId).thenReturn(v1.ParticipantId.assertFromString("test-participant"))
|
||||
when(writer.currentHealth).thenReturn(HealthStatus.healthy)
|
||||
when(writer.currentHealth()).thenReturn(HealthStatus.healthy)
|
||||
writer
|
||||
}
|
||||
|
||||
|
@ -43,9 +43,10 @@ class KeyValueParticipantStateWriterSpec extends WordSpec with Matchers {
|
||||
instance.submitTransaction(
|
||||
submitterInfo(recordTime, aParty, expectedCorrelationId),
|
||||
transactionMeta(recordTime),
|
||||
anEmptyTransaction)
|
||||
anEmptyTransaction,
|
||||
anInterpretationCost)
|
||||
|
||||
verify(writer, times(1)).commit(anyString(), any[Bytes]())
|
||||
verify(writer, times(1)).commit(anyString(), any[Bytes], any[CommitMetadata])
|
||||
verifyEnvelope(transactionCaptor.getValue)(_.hasTransactionEntry)
|
||||
correlationIdCaptor.getValue should be(expectedCorrelationId)
|
||||
}
|
||||
@ -57,7 +58,7 @@ class KeyValueParticipantStateWriterSpec extends WordSpec with Matchers {
|
||||
|
||||
instance.uploadPackages(aSubmissionId, List.empty, sourceDescription = None)
|
||||
|
||||
verify(writer, times(1)).commit(anyString(), any[Bytes]())
|
||||
verify(writer, times(1)).commit(anyString(), any[Bytes], any[CommitMetadata])
|
||||
verifyEnvelope(packageUploadCaptor.getValue)(_.hasPackageUploadEntry)
|
||||
}
|
||||
|
||||
@ -68,7 +69,7 @@ class KeyValueParticipantStateWriterSpec extends WordSpec with Matchers {
|
||||
|
||||
instance.submitConfiguration(newRecordTime().addMicros(10000), aSubmissionId, aConfiguration)
|
||||
|
||||
verify(writer, times(1)).commit(anyString(), any[Bytes]())
|
||||
verify(writer, times(1)).commit(anyString(), any[Bytes], any[CommitMetadata])
|
||||
verifyEnvelope(configurationCaptor.getValue)(_.hasConfigurationSubmission)
|
||||
}
|
||||
|
||||
@ -79,7 +80,7 @@ class KeyValueParticipantStateWriterSpec extends WordSpec with Matchers {
|
||||
|
||||
instance.allocateParty(hint = None, displayName = None, aSubmissionId)
|
||||
|
||||
verify(writer, times(1)).commit(anyString(), any[Bytes]())
|
||||
verify(writer, times(1)).commit(anyString(), any[Bytes], any[CommitMetadata])
|
||||
verifyEnvelope(partyAllocationCaptor.getValue)(_.hasPartyAllocationEntry)
|
||||
}
|
||||
}
|
||||
@ -107,11 +108,14 @@ object KeyValueParticipantStateWriterSpec {
|
||||
maxDeduplicationTime = Duration.ofDays(1),
|
||||
)
|
||||
|
||||
private val anInterpretationCost = 123L
|
||||
|
||||
private def createWriter(
|
||||
envelopeCaptor: ArgumentCaptor[Bytes],
|
||||
correlationIdCaptor: ArgumentCaptor[String] = captor[String]): LedgerWriter = {
|
||||
val writer = mock[LedgerWriter]
|
||||
when(writer.commit(correlationIdCaptor.capture(), envelopeCaptor.capture()))
|
||||
when(
|
||||
writer.commit(correlationIdCaptor.capture(), envelopeCaptor.capture(), any[CommitMetadata]))
|
||||
.thenReturn(Future.successful(SubmissionResult.Acknowledged))
|
||||
when(writer.participantId).thenReturn(v1.ParticipantId.assertFromString("test-participant"))
|
||||
writer
|
||||
|
@ -6,6 +6,7 @@ package com.daml.ledger.participant.state.v1
|
||||
import java.util.concurrent.CompletionStage
|
||||
|
||||
import com.daml.ledger.api.health.ReportsHealth
|
||||
import com.github.ghik.silencer.silent
|
||||
|
||||
/** An interface to change a ledger via a participant.
|
||||
*
|
||||
@ -80,21 +81,34 @@ trait WriteService
|
||||
* time for submitting and validating large transactions before they are
|
||||
* timestamped with their record time.
|
||||
*
|
||||
* @param submitterInfo : the information provided by the submitter for
|
||||
* correlating this submission with its acceptance or rejection on the
|
||||
* associated [[ReadService]].
|
||||
* @param transactionMeta : the meta-data accessible to all consumers of the
|
||||
* transaction. See [[TransactionMeta]] for more information.
|
||||
* @param transaction : the submitted transaction. This transaction can contain local
|
||||
* contract-ids that need suffixing. The participant state may have to
|
||||
* suffix those contract-ids in order to guaranteed their global
|
||||
* uniqueness. See the Contract Id specification for more detail
|
||||
* daml-lf/spec/contract-id.rst.
|
||||
* @param submitterInfo the information provided by the submitter for
|
||||
* correlating this submission with its acceptance or rejection on the
|
||||
* associated [[ReadService]].
|
||||
* @param transactionMeta the meta-data accessible to all consumers of the transaction.
|
||||
* See [[TransactionMeta]] for more information.
|
||||
* @param transaction the submitted transaction. This transaction can contain local
|
||||
* contract-ids that need suffixing. The participant state may have to
|
||||
* suffix those contract-ids in order to guaranteed their global
|
||||
* uniqueness. See the Contract Id specification for more detail
|
||||
* daml-lf/spec/contract-id.rst.
|
||||
* @param estimatedInterpretationCost Estimated cost of interpretation that may be used for
|
||||
* handling submitted transactions differently.
|
||||
* @return an async result of a SubmissionResult
|
||||
*/
|
||||
@silent("deprecated")
|
||||
def submitTransaction(
|
||||
submitterInfo: SubmitterInfo,
|
||||
transactionMeta: TransactionMeta,
|
||||
transaction: SubmittedTransaction,
|
||||
): CompletionStage[SubmissionResult]
|
||||
estimatedInterpretationCost: Long,
|
||||
): CompletionStage[SubmissionResult] =
|
||||
submitTransaction(submitterInfo, transactionMeta, transaction)
|
||||
|
||||
@deprecated("Will be removed in 1.4.0", since = "1.3.0")
|
||||
def submitTransaction(
|
||||
submitterInfo: SubmitterInfo,
|
||||
transactionMeta: TransactionMeta,
|
||||
transaction: SubmittedTransaction,
|
||||
): CompletionStage[SubmissionResult] =
|
||||
submitTransaction(submitterInfo, transactionMeta, transaction, 0)
|
||||
}
|
||||
|
@ -9,19 +9,21 @@ import com.daml.lf.transaction.{Transaction => Tx}
|
||||
/**
|
||||
* The result of command execution.
|
||||
*
|
||||
* @param submitterInfo The submitter info
|
||||
* @param transactionMeta The transaction meta-data
|
||||
* @param dependsOnLedgerTime True if the output of command execution depends in any way
|
||||
* on the ledger time, as specified through
|
||||
* [[com.daml.lf.command.Commands.ledgerEffectiveTime]].
|
||||
* If this value is false, then the ledger time of the resulting
|
||||
* transaction ([[TransactionMeta.ledgerEffectiveTime]]) can safely be
|
||||
* changed after command interpretation.
|
||||
* @param transaction The transaction
|
||||
* @param submitterInfo The submitter info
|
||||
* @param transactionMeta The transaction meta-data
|
||||
* @param transaction The transaction
|
||||
* @param dependsOnLedgerTime True if the output of command execution depends in any way
|
||||
* on the ledger time, as specified through
|
||||
* [[com.daml.lf.command.Commands.ledgerEffectiveTime]].
|
||||
* If this value is false, then the ledger time of the resulting
|
||||
* transaction ([[TransactionMeta.ledgerEffectiveTime]]) can safely be
|
||||
* changed after command interpretation.
|
||||
* @param interpretationTimeNanos Wall-clock time that interpretation took for the engine.
|
||||
*/
|
||||
final case class CommandExecutionResult(
|
||||
submitterInfo: SubmitterInfo,
|
||||
transactionMeta: TransactionMeta,
|
||||
transaction: Tx.SubmittedTransaction,
|
||||
dependsOnLedgerTime: Boolean,
|
||||
interpretationTimeNanos: Long,
|
||||
)
|
||||
|
@ -45,15 +45,18 @@ final class StoreBackedCommandExecutor(
|
||||
)(
|
||||
implicit ec: ExecutionContext,
|
||||
logCtx: LoggingContext,
|
||||
): Future[Either[ErrorCause, CommandExecutionResult]] =
|
||||
consume(commands.submitter, engine.submit(commands.commands, participant, submissionSeed))
|
||||
): Future[Either[ErrorCause, CommandExecutionResult]] = {
|
||||
val start = System.nanoTime()
|
||||
val submissionResult = engine.submit(commands.commands, participant, submissionSeed)
|
||||
consume(commands.submitter, submissionResult)
|
||||
.map { submission =>
|
||||
(for {
|
||||
result <- submission
|
||||
(updateTx, meta) = result
|
||||
_ <- Blinding
|
||||
.checkAuthorizationAndBlind(updateTx, Set(commands.submitter))
|
||||
} yield
|
||||
} yield {
|
||||
val interpretationTimeNanos = System.nanoTime() - start
|
||||
CommandExecutionResult(
|
||||
submitterInfo = SubmitterInfo(
|
||||
commands.submitter,
|
||||
@ -72,8 +75,11 @@ final class StoreBackedCommandExecutor(
|
||||
),
|
||||
transaction = updateTx,
|
||||
dependsOnLedgerTime = meta.dependsOnTime,
|
||||
)).left.map(ErrorCause.DamlLf)
|
||||
interpretationTimeNanos = interpretationTimeNanos
|
||||
)
|
||||
}).left.map(ErrorCause.DamlLf)
|
||||
}
|
||||
}
|
||||
|
||||
// Concurrent map of promises to request each package only once.
|
||||
private val packagePromises: ConcurrentHashMap[Ref.PackageId, Promise[Option[Package]]] =
|
||||
|
@ -256,7 +256,12 @@ final class ApiSubmissionService private (
|
||||
): Future[SubmissionResult] = {
|
||||
metrics.daml.commands.validSubmissions.mark()
|
||||
writeService
|
||||
.submitTransaction(result.submitterInfo, result.transactionMeta, result.transaction)
|
||||
.submitTransaction(
|
||||
result.submitterInfo,
|
||||
result.transactionMeta,
|
||||
result.transaction,
|
||||
result.interpretationTimeNanos,
|
||||
)
|
||||
.toScala
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,63 @@
|
||||
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.platform.apiserver.execution
|
||||
|
||||
import com.daml.ledger.api.domain.Commands
|
||||
import com.daml.ledger.participant.state.index.v2.{ContractStore, IndexPackagesService}
|
||||
import com.daml.lf.crypto.Hash
|
||||
import com.daml.lf.data.Ref.ParticipantId
|
||||
import com.daml.lf.data.{ImmArray, Ref, Time}
|
||||
import com.daml.lf.engine.{Engine, ResultDone}
|
||||
import com.daml.lf.transaction.Transaction
|
||||
import com.daml.lf.transaction.test.TransactionBuilder
|
||||
import com.daml.logging.LoggingContext
|
||||
import com.daml.metrics.Metrics
|
||||
import org.mockito.ArgumentMatchers._
|
||||
import org.mockito.Mockito.when
|
||||
import org.scalatest.mockito.MockitoSugar
|
||||
import org.scalatest.{AsyncWordSpec, Matchers}
|
||||
|
||||
class StoreBackedCommandExecutorSpec extends AsyncWordSpec with MockitoSugar with Matchers {
|
||||
private val emptyTransaction =
|
||||
Transaction.SubmittedTransaction(TransactionBuilder.Empty)
|
||||
private val emptyTransactionMetadata = Transaction.Metadata(
|
||||
submissionSeed = None,
|
||||
submissionTime = Time.Timestamp.now(),
|
||||
usedPackages = Set.empty,
|
||||
dependsOnTime = false,
|
||||
nodeSeeds = ImmArray.empty,
|
||||
byKeyNodes = ImmArray.empty)
|
||||
|
||||
"execute" should {
|
||||
"add interpretation time to result" in {
|
||||
val mockEngine = mock[Engine]
|
||||
when(mockEngine.submit(any[com.daml.lf.command.Commands], any[ParticipantId], any[Hash]))
|
||||
.thenReturn(
|
||||
ResultDone[(Transaction.SubmittedTransaction, Transaction.Metadata)](
|
||||
(emptyTransaction, emptyTransactionMetadata)
|
||||
)
|
||||
)
|
||||
val instance = new StoreBackedCommandExecutor(
|
||||
mockEngine,
|
||||
Ref.ParticipantId.assertFromString("anId"),
|
||||
mock[IndexPackagesService],
|
||||
mock[ContractStore],
|
||||
mock[Metrics])
|
||||
val mockDomainCommands = mock[Commands]
|
||||
val mockLfCommands = mock[com.daml.lf.command.Commands]
|
||||
when(mockLfCommands.ledgerEffectiveTime).thenReturn(Time.Timestamp.now())
|
||||
when(mockDomainCommands.workflowId).thenReturn(None)
|
||||
when(mockDomainCommands.commands).thenReturn(mockLfCommands)
|
||||
|
||||
LoggingContext.newLoggingContext { implicit context =>
|
||||
instance.execute(mockDomainCommands, Hash.hashPrivateKey("a key")).map { actual =>
|
||||
actual.right.foreach { actualResult =>
|
||||
actualResult.interpretationTimeNanos should be > 0L
|
||||
}
|
||||
succeed
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user