mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 17:28:46 +03:00
Support interfaces for benchtool [DPP-1163] (#14864)
* Support interfaces for benchtool [DPP-1163] CHANGELOG_BEGIN CHANGELOG_END
This commit is contained in:
parent
0c66368073
commit
0b5a333dc2
@ -61,8 +61,8 @@ da_scala_library(
|
||||
"//libs-scala/resources-akka",
|
||||
"//libs-scala/resources-grpc",
|
||||
"//libs-scala/timer-utils",
|
||||
"//ledger/test-common:benchtool-tests-%s.scala" % "1.14", # TODO: make the LF version configurable
|
||||
"//ledger/test-common:dar-files-%s-lib" % "1.14", # TODO: make the LF version configurable
|
||||
"//ledger/test-common:benchtool-tests-%s.scala" % "1.15", # TODO: make the LF version configurable
|
||||
"//ledger/test-common:dar-files-%s-lib" % "1.15", # TODO: make the LF version configurable
|
||||
"@maven//:io_dropwizard_metrics_metrics_core",
|
||||
"@maven//:io_grpc_grpc_api",
|
||||
"@maven//:io_grpc_grpc_core",
|
||||
@ -85,14 +85,18 @@ da_scala_library(
|
||||
deps = [
|
||||
":ledger-api-bench-tool-lib",
|
||||
"//bazel_tools/runfiles:scala_runfiles",
|
||||
"//daml-lf/engine",
|
||||
"//daml-lf/language",
|
||||
"//daml-lf/transaction",
|
||||
"//ledger-api/testing-utils",
|
||||
"//libs-scala/ports",
|
||||
"//ledger/test-common:dar-files-%s-lib" % "1.14",
|
||||
"//ledger/test-common:dar-files-%s-lib" % "1.15",
|
||||
"//language-support/scala/bindings",
|
||||
"//ledger-api/rs-grpc-bridge",
|
||||
"//ledger/ledger-runner-common",
|
||||
"//ledger/participant-integration-api",
|
||||
"//ledger/sandbox-on-x",
|
||||
"//ledger/sandbox-on-x:sandbox-on-x-test-lib",
|
||||
"//ledger/test-common:dar-files-default-lib",
|
||||
"@maven//:io_dropwizard_metrics_metrics_core",
|
||||
"@maven//:org_slf4j_slf4j_api",
|
||||
],
|
||||
@ -106,7 +110,7 @@ da_scala_test_suite(
|
||||
),
|
||||
data = [
|
||||
"//daml-lf/encoder:testing-dars",
|
||||
"//ledger/test-common:benchtool-tests-default.dar",
|
||||
"//ledger/test-common:benchtool-tests-%s.dar" % "1.15",
|
||||
"//ledger/test-common/test-certificates",
|
||||
],
|
||||
scala_deps = [
|
||||
@ -130,8 +134,8 @@ da_scala_test_suite(
|
||||
"//ledger/test-common",
|
||||
"//libs-scala/postgresql-testing",
|
||||
"//libs-scala/timer-utils",
|
||||
"//ledger/test-common:benchtool-tests-%s.scala" % "1.14",
|
||||
"//ledger/test-common:dar-files-%s-lib" % "1.14",
|
||||
"//ledger/test-common:benchtool-tests-%s.scala" % "1.15",
|
||||
"//ledger/test-common:dar-files-%s-lib" % "1.15",
|
||||
"//daml-script/runner:script-runner-lib",
|
||||
"//language-support/scala/bindings",
|
||||
"//ledger-api/rs-grpc-bridge",
|
||||
|
@ -13,19 +13,29 @@ import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig.{
|
||||
TransactionsStreamConfig,
|
||||
}
|
||||
import com.daml.ledger.api.benchtool.submission.AllocatedParties
|
||||
import com.daml.ledger.client.binding.Primitive.TemplateId
|
||||
import com.daml.ledger.test.benchtool.Foo.{Foo1, Foo2, Foo3}
|
||||
import com.daml.ledger.test.benchtool.InterfaceSubscription.{FooI1, FooI2, FooI3}
|
||||
import scalaz.syntax.tag._
|
||||
|
||||
class ConfigEnricher(allocatedParties: AllocatedParties) {
|
||||
|
||||
private def toTemplateId[T](templateId: TemplateId[T]): (String, String) = {
|
||||
val id = templateId.unwrap
|
||||
id.entityName -> s"${id.packageId}:${id.moduleName}:${id.entityName}"
|
||||
}
|
||||
|
||||
private val interfaceNameToFullyQualifiedNameMap: Map[String, String] = List(
|
||||
FooI1.id,
|
||||
FooI2.id,
|
||||
FooI3.id,
|
||||
).map(toTemplateId).toMap
|
||||
|
||||
private val templateNameToFullyQualifiedNameMap: Map[String, String] = List(
|
||||
Foo1.id,
|
||||
Foo2.id,
|
||||
Foo3.id,
|
||||
).map { templateId =>
|
||||
val id = templateId.unwrap
|
||||
id.entityName -> s"${id.packageId}:${id.moduleName}:${id.entityName}"
|
||||
}.toMap
|
||||
).map(toTemplateId).toMap
|
||||
|
||||
def enrichStreamConfig(
|
||||
streamConfig: StreamConfig
|
||||
@ -77,8 +87,11 @@ class ConfigEnricher(allocatedParties: AllocatedParties) {
|
||||
filter: PartyNamePrefixFilter
|
||||
): List[PartyFilter] = {
|
||||
val convertedTemplates = filter.templates.map(convertTemplate)
|
||||
val convertedInterfaces = filter.interfaces.map(convertInterface)
|
||||
val convertedParties = convertPartySet(filter.partyNamePrefix)
|
||||
convertedParties.map(party => PartyFilter(party = party, templates = convertedTemplates))
|
||||
convertedParties.map(party =>
|
||||
PartyFilter(party = party, templates = convertedTemplates, interfaces = convertedInterfaces)
|
||||
)
|
||||
}
|
||||
|
||||
private def convertPartySet(partySetName: String): List[String] =
|
||||
@ -103,6 +116,7 @@ class ConfigEnricher(allocatedParties: AllocatedParties) {
|
||||
StreamConfig.PartyFilter(
|
||||
party = convertParty(filter.party),
|
||||
templates = filter.templates.map(convertTemplate),
|
||||
interfaces = filter.interfaces.map(convertInterface),
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -110,4 +124,7 @@ class ConfigEnricher(allocatedParties: AllocatedParties) {
|
||||
def convertTemplate(shortTemplateName: String): String =
|
||||
templateNameToFullyQualifiedNameMap.getOrElse(shortTemplateName, shortTemplateName)
|
||||
|
||||
def convertInterface(shortInterfaceName: String): String =
|
||||
interfaceNameToFullyQualifiedNameMap.getOrElse(shortInterfaceName, shortInterfaceName)
|
||||
|
||||
}
|
||||
|
@ -372,7 +372,9 @@ object Cli {
|
||||
.split('@')
|
||||
.toList match {
|
||||
case party :: templates =>
|
||||
Right(WorkflowConfig.StreamConfig.PartyFilter(party, templates))
|
||||
Right(
|
||||
WorkflowConfig.StreamConfig.PartyFilter(party, templates, List.empty)
|
||||
) // Interfaces are not supported via Cli
|
||||
case _ => Left("Filter cannot be empty")
|
||||
}
|
||||
}
|
||||
|
@ -105,11 +105,16 @@ object WorkflowConfig {
|
||||
|
||||
object StreamConfig {
|
||||
|
||||
final case class PartyFilter(party: String, templates: List[String] = List.empty)
|
||||
final case class PartyFilter(
|
||||
party: String,
|
||||
templates: List[String] = List.empty,
|
||||
interfaces: List[String] = List.empty,
|
||||
)
|
||||
|
||||
final case class PartyNamePrefixFilter(
|
||||
partyNamePrefix: String,
|
||||
templates: List[String] = List.empty,
|
||||
interfaces: List[String] = List.empty,
|
||||
)
|
||||
|
||||
final case class TransactionsStreamConfig(
|
||||
|
@ -49,16 +49,30 @@ object WorkflowConfigParser {
|
||||
}
|
||||
|
||||
implicit val partyFilterDecoder: Decoder[StreamConfig.PartyFilter] =
|
||||
Decoder.forProduct2(
|
||||
"party",
|
||||
"templates",
|
||||
)(StreamConfig.PartyFilter.apply)
|
||||
(c: HCursor) => {
|
||||
for {
|
||||
party <- c.downField("party").as[String]
|
||||
templates <- c.downField("templates").as[Option[List[String]]]
|
||||
interfaces <- c.downField("interfaces").as[Option[List[String]]]
|
||||
} yield StreamConfig.PartyFilter(
|
||||
party,
|
||||
templates.getOrElse(List.empty),
|
||||
interfaces.getOrElse(List.empty),
|
||||
)
|
||||
}
|
||||
|
||||
implicit val partySetTemplateFilterDecoder: Decoder[StreamConfig.PartyNamePrefixFilter] =
|
||||
Decoder.forProduct2(
|
||||
"party_name_prefix",
|
||||
"templates",
|
||||
)(StreamConfig.PartyNamePrefixFilter.apply)
|
||||
(c: HCursor) => {
|
||||
for {
|
||||
partyNamePrefix <- c.downField("party_name_prefix").as[String]
|
||||
templates <- c.downField("templates").as[Option[List[String]]]
|
||||
interfaces <- c.downField("interfaces").as[Option[List[String]]]
|
||||
} yield StreamConfig.PartyNamePrefixFilter(
|
||||
partyNamePrefix,
|
||||
templates.getOrElse(List.empty),
|
||||
interfaces.getOrElse(List.empty),
|
||||
)
|
||||
}
|
||||
|
||||
implicit val transactionStreamDecoder: Decoder[StreamConfig.TransactionsStreamConfig] =
|
||||
Decoder.forProduct8(
|
||||
|
@ -4,7 +4,12 @@
|
||||
package com.daml.ledger.api.benchtool.services
|
||||
|
||||
import com.daml.ledger.api.benchtool.config.WorkflowConfig
|
||||
import com.daml.ledger.api.v1.transaction_filter.{Filters, InclusiveFilters, TransactionFilter}
|
||||
import com.daml.ledger.api.v1.transaction_filter.{
|
||||
Filters,
|
||||
InclusiveFilters,
|
||||
InterfaceFilter,
|
||||
TransactionFilter,
|
||||
}
|
||||
import com.daml.ledger.api.v1.value.Identifier
|
||||
|
||||
object StreamFilters {
|
||||
@ -20,13 +25,20 @@ object StreamFilters {
|
||||
private def toTransactionFilter(
|
||||
filter: WorkflowConfig.StreamConfig.PartyFilter
|
||||
): Either[String, (String, Filters)] =
|
||||
(filter.templates match {
|
||||
case Nil =>
|
||||
((filter.templates, filter.interfaces) match {
|
||||
case (Nil, Nil) =>
|
||||
Right(Filters.defaultInstance)
|
||||
case templateIds =>
|
||||
templateIdentifiers(templateIds).map { identifiers =>
|
||||
case (templateIds, interfaceIds) =>
|
||||
for {
|
||||
tplIds <- templateIdentifiers(templateIds)
|
||||
ifaceIds <- templateIdentifiers(interfaceIds)
|
||||
} yield {
|
||||
val interfaceFilters =
|
||||
ifaceIds.map(interfaceId => InterfaceFilter(Some(interfaceId), true))
|
||||
Filters.defaultInstance.withInclusive(
|
||||
InclusiveFilters.defaultInstance.addAllTemplateIds(identifiers)
|
||||
InclusiveFilters.defaultInstance
|
||||
.addAllTemplateIds(tplIds)
|
||||
.addAllInterfaceFilters(interfaceFilters)
|
||||
)
|
||||
}
|
||||
}).map(templateFilters => filter.party -> templateFilters)
|
||||
|
@ -4,13 +4,13 @@
|
||||
package com.daml.ledger.api.benchtool
|
||||
|
||||
import java.io.File
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.bazeltools.BazelRunfiles.rlocation
|
||||
import com.daml.ledger.api.benchtool.metrics.MetricsManager.NoOpMetricsManager
|
||||
import com.daml.ledger.api.benchtool.services.LedgerApiServices
|
||||
import com.daml.ledger.api.benchtool.submission.{CommandSubmitter, Names, PartyAllocating}
|
||||
import com.daml.ledger.test.BenchtoolTestDar
|
||||
import com.daml.lf.language.LanguageVersion
|
||||
import com.daml.platform.sandbox.fixture.SandboxFixture
|
||||
import org.scalatest.Suite
|
||||
|
||||
@ -23,6 +23,10 @@ trait BenchtoolSandboxFixture extends SandboxFixture {
|
||||
new File(rlocation(BenchtoolTestDar.path))
|
||||
)
|
||||
|
||||
override def config = super.config.copy(
|
||||
engine = super.config.engine.copy(allowedLanguageVersions = LanguageVersion.EarlyAccessVersions)
|
||||
)
|
||||
|
||||
def benchtoolFixture()(implicit
|
||||
ec: ExecutionContext
|
||||
): Future[(LedgerApiServices, Names, CommandSubmitter)] = {
|
||||
|
@ -0,0 +1,43 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.api.benchtool.submission
|
||||
|
||||
import com.daml.ledger.api.benchtool.util.ObserverWithResult
|
||||
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
object ActiveContractsObserver {
|
||||
def apply(expectedTemplateNames: Set[String]): ActiveContractsObserver =
|
||||
new ActiveContractsObserver(
|
||||
logger = LoggerFactory.getLogger(getClass),
|
||||
expectedTemplateNames = expectedTemplateNames,
|
||||
)
|
||||
}
|
||||
|
||||
/** Collects information about create events from ACS.
|
||||
*/
|
||||
class ActiveContractsObserver(logger: Logger, expectedTemplateNames: Set[String])
|
||||
extends ObserverWithResult[GetActiveContractsResponse, ObservedEvents](logger) {
|
||||
|
||||
private val createEvents = collection.mutable.ArrayBuffer[ObservedCreateEvent]()
|
||||
|
||||
override def streamName: String = "dummy-stream-name"
|
||||
|
||||
override def onNext(value: GetActiveContractsResponse): Unit =
|
||||
for {
|
||||
created <- value.activeContracts
|
||||
} {
|
||||
createEvents.addOne(ObservedCreateEvent(created))
|
||||
}
|
||||
|
||||
override def completeWith(): Future[ObservedEvents] = Future.successful(
|
||||
ObservedEvents(
|
||||
expectedTemplateNames = expectedTemplateNames,
|
||||
createEvents = createEvents.toList,
|
||||
exerciseEvents = List.empty,
|
||||
)
|
||||
)
|
||||
}
|
@ -3,15 +3,22 @@
|
||||
|
||||
package com.daml.ledger.api.benchtool.submission
|
||||
|
||||
case class ObservedCreateEvent(templateName: String, createArgumentsSerializedSize: Int)
|
||||
case class ObservedCreateEvent(
|
||||
templateName: String,
|
||||
createArgumentsSerializedSize: Int,
|
||||
interfaceViews: Seq[ObservedInterfaceView],
|
||||
)
|
||||
|
||||
object ObservedCreateEvent {
|
||||
def apply(created: com.daml.ledger.api.v1.event.CreatedEvent): ObservedCreateEvent = {
|
||||
val argsSize = created.createArguments.fold(0)(_.serializedSize)
|
||||
val templateName =
|
||||
created.templateId.getOrElse(sys.error(s"Expected templateId in $created")).entityName
|
||||
|
||||
ObservedCreateEvent(
|
||||
templateName = templateName,
|
||||
createArgumentsSerializedSize = argsSize,
|
||||
interfaceViews = created.interfaceViews.map(ObservedInterfaceView.apply),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,16 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.api.benchtool.submission
|
||||
|
||||
case class ObservedInterfaceView(interfaceName: String, serializedSize: Int)
|
||||
object ObservedInterfaceView {
|
||||
def apply(interfaceView: com.daml.ledger.api.v1.event.InterfaceView): ObservedInterfaceView = {
|
||||
val interfaceName =
|
||||
interfaceView.interfaceId
|
||||
.getOrElse(sys.error(s"Expected interfaceId in $interfaceView"))
|
||||
.entityName
|
||||
val serializedSize = interfaceView.serializedSize
|
||||
ObservedInterfaceView(interfaceName, serializedSize)
|
||||
}
|
||||
}
|
@ -48,14 +48,17 @@ class ConfigEnricherSpec extends AnyFlatSpec with Matchers {
|
||||
PartyFilter(
|
||||
party = "Obs-0",
|
||||
templates = templates,
|
||||
interfaces = List.empty,
|
||||
),
|
||||
PartyFilter(
|
||||
party = "Sig-0",
|
||||
templates = templates,
|
||||
interfaces = List.empty,
|
||||
),
|
||||
PartyFilter(
|
||||
party = "UnknownParty-0",
|
||||
templates = templates,
|
||||
interfaces = List.empty,
|
||||
),
|
||||
),
|
||||
partyNamePrefixFilterO = Some(
|
||||
@ -71,22 +74,27 @@ class ConfigEnricherSpec extends AnyFlatSpec with Matchers {
|
||||
PartyFilter(
|
||||
party = "Obs-0-foo-123",
|
||||
templates = enrichedTemplates,
|
||||
interfaces = List.empty,
|
||||
),
|
||||
PartyFilter(
|
||||
party = "Sig-0-foo-123",
|
||||
templates = enrichedTemplates,
|
||||
interfaces = List.empty,
|
||||
),
|
||||
PartyFilter(
|
||||
party = "UnknownParty-0",
|
||||
templates = enrichedTemplates,
|
||||
interfaces = List.empty,
|
||||
),
|
||||
PartyFilter(
|
||||
party = "MyParty-0-foo-123",
|
||||
templates = enrichedTemplates,
|
||||
interfaces = List.empty,
|
||||
),
|
||||
PartyFilter(
|
||||
party = "MyParty-1-foo-123",
|
||||
templates = enrichedTemplates,
|
||||
interfaces = List.empty,
|
||||
),
|
||||
),
|
||||
partyNamePrefixFilterO = None,
|
||||
|
@ -88,7 +88,7 @@ class CliSpec extends AnyWordSpec with Matchers with OptionValues with TableDriv
|
||||
"cli argument" -> "stream config",
|
||||
s"stream-type=transactions,name=$name,filters=$party1" -> TransactionsStreamConfig(
|
||||
name = name,
|
||||
filters = List(PartyFilter(party1, Nil)),
|
||||
filters = List(PartyFilter(party1, Nil, Nil)),
|
||||
beginOffset = None,
|
||||
endOffset = None,
|
||||
objectives = None,
|
||||
@ -97,7 +97,7 @@ class CliSpec extends AnyWordSpec with Matchers with OptionValues with TableDriv
|
||||
),
|
||||
s"stream-type=transaction-trees,name=$name,filters=$party1" -> TransactionTreesStreamConfig(
|
||||
name = name,
|
||||
filters = List(PartyFilter(party1, Nil)),
|
||||
filters = List(PartyFilter(party1, Nil, Nil)),
|
||||
beginOffset = None,
|
||||
endOffset = None,
|
||||
objectives = None,
|
||||
@ -106,7 +106,7 @@ class CliSpec extends AnyWordSpec with Matchers with OptionValues with TableDriv
|
||||
),
|
||||
s"stream-type=active-contracts,name=$name,filters=$party1" -> ActiveContractsStreamConfig(
|
||||
name = name,
|
||||
filters = List(PartyFilter(party1, Nil)),
|
||||
filters = List(PartyFilter(party1, Nil, Nil)),
|
||||
objectives = None,
|
||||
maxItemCount = None,
|
||||
timeoutInSecondsO = None,
|
||||
@ -140,9 +140,9 @@ class CliSpec extends AnyWordSpec with Matchers with OptionValues with TableDriv
|
||||
// each party filter separated by '+' and each template in a filter separated by '@'
|
||||
val filters = s"$party1+$party2@$template1@$template2+$party3@$template2"
|
||||
val filtersList = List(
|
||||
PartyFilter(party1, List()),
|
||||
PartyFilter(party2, List(template1, template2)),
|
||||
PartyFilter(party3, List(template2)),
|
||||
PartyFilter(party1, List(), List()),
|
||||
PartyFilter(party2, List(template1, template2), List()),
|
||||
PartyFilter(party3, List(template2), List()),
|
||||
)
|
||||
val cases = Table(
|
||||
"cli argument" -> "stream config",
|
||||
@ -197,7 +197,7 @@ class CliSpec extends AnyWordSpec with Matchers with OptionValues with TableDriv
|
||||
forAll(cases) { (argument, offset) =>
|
||||
val streamConfig = TransactionsStreamConfig(
|
||||
name = name,
|
||||
filters = List(PartyFilter(party, Nil)),
|
||||
filters = List(PartyFilter(party, Nil, Nil)),
|
||||
beginOffset = Some(offset),
|
||||
endOffset = None,
|
||||
objectives = None,
|
||||
@ -231,7 +231,7 @@ class CliSpec extends AnyWordSpec with Matchers with OptionValues with TableDriv
|
||||
forAll(cases) { (argument, offset) =>
|
||||
val streamConfig = TransactionsStreamConfig(
|
||||
name = name,
|
||||
filters = List(PartyFilter(party, Nil)),
|
||||
filters = List(PartyFilter(party, Nil, Nil)),
|
||||
beginOffset = None,
|
||||
endOffset = Some(offset),
|
||||
objectives = None,
|
||||
@ -277,7 +277,7 @@ class CliSpec extends AnyWordSpec with Matchers with OptionValues with TableDriv
|
||||
forAll(cases) { (argument, objectives) =>
|
||||
val streamConfig = TransactionsStreamConfig(
|
||||
name = name,
|
||||
filters = List(PartyFilter(party, Nil)),
|
||||
filters = List(PartyFilter(party, Nil, Nil)),
|
||||
beginOffset = None,
|
||||
endOffset = None,
|
||||
objectives = Some(objectives),
|
||||
@ -306,7 +306,7 @@ class CliSpec extends AnyWordSpec with Matchers with OptionValues with TableDriv
|
||||
forAll(cases) { (argument, objectives) =>
|
||||
val streamConfig = ActiveContractsStreamConfig(
|
||||
name = name,
|
||||
filters = List(PartyFilter(party, Nil)),
|
||||
filters = List(PartyFilter(party, Nil, Nil)),
|
||||
objectives = Some(objectives),
|
||||
maxItemCount = None,
|
||||
timeoutInSecondsO = None,
|
||||
|
@ -219,25 +219,25 @@ class WorkflowConfigParserSpec extends AnyWordSpec with Matchers {
|
||||
"parse transactions stream configuration" in {
|
||||
val yaml =
|
||||
"""streams:
|
||||
| - type: transactions
|
||||
| name: stream-1
|
||||
| filters:
|
||||
| - party: Obs-2
|
||||
| templates:
|
||||
| - Foo1
|
||||
| - Foo3
|
||||
| filter_by_party_set:
|
||||
| party_name_prefix: My-Party
|
||||
| templates: [Foo1, Foo2]
|
||||
| begin_offset: foo
|
||||
| end_offset: bar
|
||||
| objectives:
|
||||
| max_delay_seconds: 123
|
||||
| min_consumption_speed: 2.34
|
||||
| min_item_rate: 12
|
||||
| max_item_rate: 34
|
||||
| max_stream_duration: 56
|
||||
|""".stripMargin
|
||||
| - type: transactions
|
||||
| name: stream-1
|
||||
| filters:
|
||||
| - party: Obs-2
|
||||
| templates:
|
||||
| - Foo1
|
||||
| - Foo3
|
||||
| filter_by_party_set:
|
||||
| party_name_prefix: My-Party
|
||||
| templates: [Foo1, Foo2]
|
||||
| begin_offset: foo
|
||||
| end_offset: bar
|
||||
| objectives:
|
||||
| max_delay_seconds: 123
|
||||
| min_consumption_speed: 2.34
|
||||
| min_item_rate: 12
|
||||
| max_item_rate: 34
|
||||
| max_stream_duration: 56
|
||||
|""".stripMargin
|
||||
parseYaml(yaml) shouldBe Right(
|
||||
WorkflowConfig(
|
||||
submission = None,
|
||||
@ -513,6 +513,109 @@ class WorkflowConfigParserSpec extends AnyWordSpec with Matchers {
|
||||
}
|
||||
}
|
||||
|
||||
"parse stream configuration with interface filters" in {
|
||||
val yaml =
|
||||
"""streams:
|
||||
| - type: transactions
|
||||
| name: stream-1
|
||||
| filters:
|
||||
| - party: Obs-2
|
||||
| interfaces:
|
||||
| - FooInterface
|
||||
| begin_offset: foo
|
||||
| end_offset: bar
|
||||
| objectives:
|
||||
| min_consumption_speed: 2.34
|
||||
| min_item_rate: 12""".stripMargin
|
||||
parseYaml(yaml) shouldBe Right(
|
||||
WorkflowConfig(
|
||||
submission = None,
|
||||
streams = List(
|
||||
WorkflowConfig.StreamConfig.TransactionsStreamConfig(
|
||||
name = "stream-1",
|
||||
filters = List(
|
||||
WorkflowConfig.StreamConfig.PartyFilter(
|
||||
party = "Obs-2",
|
||||
interfaces = List("FooInterface"),
|
||||
)
|
||||
),
|
||||
beginOffset = Some(offset("foo")),
|
||||
endOffset = Some(offset("bar")),
|
||||
objectives = Some(
|
||||
WorkflowConfig.StreamConfig.TransactionObjectives(
|
||||
maxDelaySeconds = None,
|
||||
minConsumptionSpeed = Some(2.34),
|
||||
minItemRate = Some(12),
|
||||
maxItemRate = None,
|
||||
)
|
||||
),
|
||||
maxItemCount = None,
|
||||
timeoutInSecondsO = None,
|
||||
)
|
||||
),
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
"parse filter_by_party_set interfaces" in {
|
||||
val yaml =
|
||||
"""streams:
|
||||
| - type: transactions
|
||||
| name: stream-1
|
||||
| filters:
|
||||
| - party: Obs-2
|
||||
| templates:
|
||||
| - Foo1
|
||||
| - Foo3
|
||||
| filter_by_party_set:
|
||||
| party_name_prefix: My-Party
|
||||
| interfaces: [FooInterface]
|
||||
| begin_offset: foo
|
||||
| end_offset: bar
|
||||
| objectives:
|
||||
| max_delay_seconds: 123
|
||||
| min_consumption_speed: 2.34
|
||||
| min_item_rate: 12
|
||||
| max_item_rate: 34
|
||||
| max_stream_duration: 56
|
||||
|""".stripMargin
|
||||
parseYaml(yaml) shouldBe Right(
|
||||
WorkflowConfig(
|
||||
submission = None,
|
||||
streams = List(
|
||||
WorkflowConfig.StreamConfig.TransactionsStreamConfig(
|
||||
name = "stream-1",
|
||||
filters = List(
|
||||
WorkflowConfig.StreamConfig.PartyFilter(
|
||||
party = "Obs-2",
|
||||
templates = List("Foo1", "Foo3"),
|
||||
)
|
||||
),
|
||||
partyNamePrefixFilterO = Some(
|
||||
PartyNamePrefixFilter(
|
||||
partyNamePrefix = "My-Party",
|
||||
interfaces = List("FooInterface"),
|
||||
)
|
||||
),
|
||||
beginOffset = Some(offset("foo")),
|
||||
endOffset = Some(offset("bar")),
|
||||
objectives = Some(
|
||||
WorkflowConfig.StreamConfig.TransactionObjectives(
|
||||
maxDelaySeconds = Some(123),
|
||||
minConsumptionSpeed = Some(2.34),
|
||||
minItemRate = Some(12),
|
||||
maxItemRate = Some(34),
|
||||
maxTotalStreamRuntimeDurationInMs = Some(56),
|
||||
)
|
||||
),
|
||||
maxItemCount = None,
|
||||
timeoutInSecondsO = None,
|
||||
)
|
||||
),
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
def parseYaml(yaml: String): Either[WorkflowConfigParser.ParserError, WorkflowConfig] =
|
||||
WorkflowConfigParser.parse(new StringReader(yaml))
|
||||
|
||||
|
@ -56,6 +56,7 @@ class FibonacciCommandSubmitterITSpec
|
||||
WorkflowConfig.StreamConfig.PartyFilter(
|
||||
party = allocatedParties.signatory.toString,
|
||||
templates = List.empty,
|
||||
interfaces = List.empty,
|
||||
)
|
||||
),
|
||||
beginOffset = None,
|
||||
|
@ -121,6 +121,7 @@ class FooCommandSubmitterITSpec
|
||||
WorkflowConfig.StreamConfig.PartyFilter(
|
||||
party = party.toString,
|
||||
templates = List.empty,
|
||||
interfaces = List.empty,
|
||||
)
|
||||
),
|
||||
beginOffset = None,
|
||||
|
@ -0,0 +1,114 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.api.benchtool.submission
|
||||
|
||||
import com.daml.ledger.api.benchtool.{BenchtoolSandboxFixture, ConfigEnricher}
|
||||
import com.daml.ledger.api.benchtool.config.WorkflowConfig
|
||||
import com.daml.ledger.api.benchtool.services.LedgerApiServices
|
||||
import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll
|
||||
import com.daml.ledger.client.binding
|
||||
import org.scalatest.AppendedClues
|
||||
import org.scalatest.flatspec.AsyncFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
class InterfaceSubscriptionITSpec
|
||||
extends AsyncFlatSpec
|
||||
with BenchtoolSandboxFixture
|
||||
with SuiteResourceManagementAroundAll
|
||||
with Matchers
|
||||
with AppendedClues {
|
||||
|
||||
it should "make interface subscriptions exposed to the benchtool" in {
|
||||
|
||||
val foo1Config = WorkflowConfig.FooSubmissionConfig.ContractDescription(
|
||||
template = "Foo1",
|
||||
weight = 1,
|
||||
payloadSizeBytes = 100,
|
||||
)
|
||||
val foo2Config = WorkflowConfig.FooSubmissionConfig.ContractDescription(
|
||||
template = "Foo2",
|
||||
weight = 1,
|
||||
payloadSizeBytes = 100,
|
||||
)
|
||||
val foo3Config = WorkflowConfig.FooSubmissionConfig.ContractDescription(
|
||||
template = "Foo3",
|
||||
weight = 1,
|
||||
payloadSizeBytes = 100,
|
||||
)
|
||||
|
||||
val config = WorkflowConfig.FooSubmissionConfig(
|
||||
numberOfInstances = 100,
|
||||
numberOfObservers = 2,
|
||||
numberOfDivulgees = 0,
|
||||
numberOfExtraSubmitters = 0,
|
||||
uniqueParties = false,
|
||||
instanceDistribution = List(
|
||||
foo1Config,
|
||||
foo2Config,
|
||||
foo3Config,
|
||||
),
|
||||
nonConsumingExercises = None,
|
||||
consumingExercises = None,
|
||||
applicationIds = List.empty,
|
||||
)
|
||||
|
||||
for {
|
||||
(apiServices, names, submitter) <- benchtoolFixture()
|
||||
allocatedParties <- submitter.prepare(config)
|
||||
configDesugaring = new ConfigEnricher(allocatedParties)
|
||||
tested = new FooSubmission(
|
||||
submitter = submitter,
|
||||
maxInFlightCommands = 1,
|
||||
submissionBatchSize = 5,
|
||||
submissionConfig = config,
|
||||
allocatedParties = allocatedParties,
|
||||
names = names,
|
||||
)
|
||||
_ <- tested.performSubmission()
|
||||
observedEvents <- observer(
|
||||
configDesugaring = configDesugaring,
|
||||
apiServices = apiServices,
|
||||
party = allocatedParties.signatory,
|
||||
)
|
||||
} yield {
|
||||
observedEvents.createEvents.forall(_.interfaceViews.nonEmpty) shouldBe true
|
||||
observedEvents.createEvents
|
||||
.flatMap(_.interfaceViews)
|
||||
.forall(_.serializedSize > 0) shouldBe true
|
||||
observedEvents.createEvents
|
||||
.flatMap(_.interfaceViews)
|
||||
.map(_.interfaceName)
|
||||
.toSet shouldBe Set("FooI2", "FooI1", "FooI3")
|
||||
}
|
||||
}
|
||||
|
||||
private def observer(
|
||||
configDesugaring: ConfigEnricher,
|
||||
apiServices: LedgerApiServices,
|
||||
party: binding.Primitive.Party,
|
||||
): Future[ObservedEvents] = {
|
||||
val config = WorkflowConfig.StreamConfig.ActiveContractsStreamConfig(
|
||||
name = "dummy-name",
|
||||
filters = List(
|
||||
WorkflowConfig.StreamConfig.PartyFilter(
|
||||
party = party.toString,
|
||||
templates = List.empty,
|
||||
interfaces = List("FooI2", "FooI1", "FooI3"),
|
||||
)
|
||||
),
|
||||
objectives = None,
|
||||
maxItemCount = None,
|
||||
timeoutInSecondsO = None,
|
||||
)
|
||||
apiServices.activeContractsService.getActiveContracts(
|
||||
config = configDesugaring
|
||||
.enrichStreamConfig(config)
|
||||
.asInstanceOf[WorkflowConfig.StreamConfig.ActiveContractsStreamConfig],
|
||||
observer = ActiveContractsObserver(Set("Foo1", "Foo2", "Foo3")),
|
||||
)
|
||||
}
|
||||
|
||||
}
|
@ -147,6 +147,7 @@ class NonStakeholderInformeesITSpec
|
||||
WorkflowConfig.StreamConfig.PartyFilter(
|
||||
party = party.toString,
|
||||
templates = List.empty,
|
||||
interfaces = List.empty,
|
||||
)
|
||||
),
|
||||
beginOffset = None,
|
||||
@ -164,6 +165,7 @@ class NonStakeholderInformeesITSpec
|
||||
WorkflowConfig.StreamConfig.PartyFilter(
|
||||
party = party.toString,
|
||||
templates = List.empty,
|
||||
interfaces = List.empty,
|
||||
)
|
||||
),
|
||||
beginOffset = None,
|
||||
|
@ -123,6 +123,7 @@ class PartySetsITSpec
|
||||
WorkflowConfig.StreamConfig.PartyFilter(
|
||||
party = party,
|
||||
templates = filterByTemplates,
|
||||
interfaces = List.empty,
|
||||
)
|
||||
),
|
||||
partyNamePrefixFilterO = filterByPartyNamePrefixO.map(partyNamePrefix =>
|
||||
|
@ -0,0 +1,63 @@
|
||||
-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
-- SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
module InterfaceSubscription where
|
||||
|
||||
import Foo
|
||||
|
||||
data FooData = FooData
|
||||
with
|
||||
signatory : Party
|
||||
observers : [Party]
|
||||
payload : Text
|
||||
keyId: Text
|
||||
templateName: Text
|
||||
deriving (Eq, Show)
|
||||
|
||||
-- FooI1 is exposing the most simple case of the interface views - just copying the data from within the template
|
||||
interface FooI1 where
|
||||
viewtype FooData
|
||||
|
||||
interface instance FooI1 for Foo.Foo1 where
|
||||
view = FooData with templateName = "Foo1", ..
|
||||
|
||||
|
||||
foo2ToFooData : Foo.Foo2 -> FooData
|
||||
foo2ToFooData Foo.Foo2{..} = FooData with templateName = "Foo2", ..
|
||||
|
||||
fooDataToFoo2 : FooData -> Foo.Foo2
|
||||
fooDataToFoo2 FooData{..}
|
||||
| templateName == "Foo2" = Foo.Foo2 {..}
|
||||
| otherwise = error "fooDataToFoo2 called non non-foo2"
|
||||
|
||||
foo2Roundtrip : Int -> Foo.Foo2 -> Foo.Foo2
|
||||
foo2Roundtrip n x
|
||||
| n <= 0 = x
|
||||
| otherwise = foo2Roundtrip (n - 1) (fooDataToFoo2 $ foo2ToFooData x)
|
||||
|
||||
-- FooI2 is exposing a FooData view through 10 round-trips in the recursion calls
|
||||
interface FooI2 where
|
||||
viewtype FooData
|
||||
|
||||
interface instance FooI2 for Foo.Foo2 where
|
||||
view = foo2ToFooData $ foo2Roundtrip 10 this
|
||||
|
||||
foo3ToFooData : Foo.Foo3 -> FooData
|
||||
foo3ToFooData Foo.Foo3{..} = FooData with templateName = "Foo3", ..
|
||||
|
||||
fooDataToFoo3 : FooData -> Foo.Foo3
|
||||
fooDataToFoo3 FooData{..}
|
||||
| templateName == "Foo3" = Foo.Foo3 {..}
|
||||
| otherwise = error "fooDataToFoo3 called non non-foo3"
|
||||
|
||||
foo3Roundtrip : Int -> Foo.Foo3 -> Foo.Foo3
|
||||
foo3Roundtrip n x
|
||||
| n <= 0 = x
|
||||
| otherwise = foo3Roundtrip (n - 1) (fooDataToFoo3 $ foo3ToFooData x)
|
||||
|
||||
-- FooI3 is exposing a FooData view through 100 round-trips in the recursion calls
|
||||
interface FooI3 where
|
||||
viewtype FooData
|
||||
|
||||
interface instance FooI3 for Foo.Foo3 where
|
||||
view = foo3ToFooData $ foo3Roundtrip 100 this
|
Loading…
Reference in New Issue
Block a user