[DPP-1144] Retrieving a Daml packageId from Ledger API server for Benchtool (#15400)

changelog_begin
changelog_end
This commit is contained in:
pbatko-da 2022-11-03 11:46:51 +01:00 committed by GitHub
parent d8f907b928
commit 553674a850
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 178 additions and 24 deletions

View File

@ -47,6 +47,8 @@ da_scala_library(
],
visibility = ["//visibility:public"],
deps = [
"//daml-lf/archive:daml_lf_archive_reader",
"//daml-lf/language",
"//language-support/scala/bindings",
"//ledger-service/cli-opts",
"//ledger-service/jwt",

View File

@ -12,17 +12,20 @@ import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig.{
TransactionTreesStreamConfig,
TransactionsStreamConfig,
}
import com.daml.ledger.api.benchtool.submission.AllocatedParties
import com.daml.ledger.api.benchtool.submission.{AllocatedParties, BenchtoolTestsPackageInfo}
import com.daml.ledger.client.binding.Primitive.TemplateId
import com.daml.ledger.test.benchtool.Foo.{Foo1, Foo2, Foo3}
import com.daml.ledger.test.benchtool.InterfaceSubscription.{FooI1, FooI2, FooI3}
import scalaz.syntax.tag._
class ConfigEnricher(allocatedParties: AllocatedParties) {
class ConfigEnricher(
allocatedParties: AllocatedParties,
packageInfo: BenchtoolTestsPackageInfo,
) {
private def toTemplateId[T](templateId: TemplateId[T]): (String, String) = {
val id = templateId.unwrap
id.entityName -> s"${id.packageId}:${id.moduleName}:${id.entityName}"
id.entityName -> s"${packageInfo.packageId}:${id.moduleName}:${id.entityName}"
}
private val interfaceNameToFullyQualifiedNameMap: Map[String, String] = List(

View File

@ -117,36 +117,34 @@ class LedgerApiBenchTool(
)
for {
_ <- regularUserSetupStep(adminServices)
allocatedParties <- {
(allocatedParties, benchtoolTestsPackageInfo) <- {
config.workflow.submission match {
case None =>
logger.info(s"No submission defined. Skipping.")
logger.info("No submission config found; skipping the command submission step")
for {
existingParties <- partyAllocating.lookupExistingParties()
} yield AllocatedParties.forExistingParties(
parties = existingParties.toList,
partySetPrefixO = {
val partySetPrefixes =
config.workflow.streams.flatMap(_.partySetPrefix.iterator).distinct
require(
partySetPrefixes.size <= 1,
s"Found more than one observer party set! ${partySetPrefixes}",
allocatedParties <- SubmittedDataAnalyzing.determineAllocatedParties(
config.workflow,
partyAllocating,
)
partySetPrefixes.headOption
},
benchtoolDamlPackageInfo <- SubmittedDataAnalyzing.determineBenchtoolTestsPackageId(
regularUserServices.packageService
)
} yield {
(allocatedParties, benchtoolDamlPackageInfo)
}
case Some(submissionConfig) =>
logger.info("Submission config found; command submission will be performed")
submissionStep(
regularUserServices = regularUserServices,
adminServices = adminServices,
submissionConfig = submissionConfig,
metricRegistry = metricRegistry,
partyAllocating = partyAllocating,
)
).map(_ -> BenchtoolTestsPackageInfo.StaticDefault)
}
}
configEnricher = new ConfigEnricher(allocatedParties)
configEnricher = new ConfigEnricher(allocatedParties, benchtoolTestsPackageInfo)
updatedStreamConfigs = config.workflow.streams.map(streamsConfig =>
configEnricher.enrichStreamConfig(streamsConfig)
)

View File

@ -0,0 +1,96 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool
import com.daml.ledger.api.benchtool.config.WorkflowConfig
import com.daml.ledger.api.benchtool.services.PackageService
import com.daml.ledger.api.benchtool.submission.BenchtoolTestsPackageInfo.BenchtoolTestsPackageName
import com.daml.ledger.api.benchtool.submission.{
AllocatedParties,
BenchtoolTestsPackageInfo,
PartyAllocating,
}
import com.daml.ledger.api.v1.package_service.GetPackageResponse
import com.daml.lf.data.Ref
import com.daml.lf.language.Ast
import org.slf4j.{Logger, LoggerFactory}
import scala.concurrent.{ExecutionContext, Future}
/** Contains utilities for retrieving useful facts
* from data already submitted to a Ledger API server.
* (The motivating use case are the benchmarks that do not perform a submission step on their own
* and for that reason cannot statically determine these facts.)
*/
object SubmittedDataAnalyzing {
private[benchtool] val logger: Logger = LoggerFactory.getLogger(getClass)
def determineAllocatedParties(
workflowConfig: WorkflowConfig,
partyAllocating: PartyAllocating,
)(implicit ec: ExecutionContext): Future[AllocatedParties] = {
logger.info("Analyzing existing parties..")
for {
existingParties <- {
logger.info("Analyzing existing parties..")
partyAllocating.lookupExistingParties()
}
} yield {
AllocatedParties.forExistingParties(
parties = existingParties.toList,
partySetPrefixO = {
val partySetPrefixes =
workflowConfig.streams.flatMap(_.partySetPrefix.iterator).distinct
require(
partySetPrefixes.size <= 1,
s"Found more than one observer party set! ${partySetPrefixes}",
)
partySetPrefixes.headOption
},
)
}
}
def determineBenchtoolTestsPackageId(
packageService: PackageService
)(implicit ec: ExecutionContext): Future[BenchtoolTestsPackageInfo] = {
logger.info("Analyzing existing Daml packages..")
for {
packageIds: Seq[String] <- packageService.listPackages().map(_.packageIds)
getPackageResponses: Seq[GetPackageResponse] <- Future.sequence(
packageIds.map(packageId => packageService.getPackage(packageId = packageId))
)
} yield {
val packageNamesToPackageIds: Seq[(String, String)] = for {
getPackageResponse <- getPackageResponses
} yield {
val packageId = getPackageResponse.hash
val packageName = decodePackageName(
archivePayloadBytes = getPackageResponse.archivePayload.toByteArray,
pkgId = Ref.PackageId.assertFromString(packageId),
)
packageName -> packageId
}
val candidatesPackageIds =
packageNamesToPackageIds.collect { case (BenchtoolTestsPackageName, pkgId) => pkgId }
if (candidatesPackageIds.size > 1) {
logger.warn(s"Found more than one Daml package with name '$BenchtoolTestsPackageName'")
}
val packageId = candidatesPackageIds.headOption.getOrElse(
sys.error(s"Could not find a Daml package with name '$BenchtoolTestsPackageName'")
)
BenchtoolTestsPackageInfo(packageId = packageId)
}
}
private def decodePackageName(archivePayloadBytes: Array[Byte], pkgId: Ref.PackageId): String = {
val pkg: Ast.Package = com.daml.lf.archive
.archivePayloadDecoder(pkgId, onlySerializableDataDefs = false)
.assertFromByteArray(archivePayloadBytes)
._2
pkg.metadata.fold[String]("")(_.name)
}
}

View File

@ -32,6 +32,7 @@ class LedgerApiServices(
)
val packageManagementService =
new PackageManagementService(channel, authorizationToken = authorizationToken)
val packageService = new PackageService(channel, authorizationToken = authorizationToken)
val partyManagementService =
new PartyManagementService(channel, authorizationToken = authorizationToken)
val transactionService =

View File

@ -0,0 +1,23 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.services
import com.daml.ledger.api.benchtool.AuthorizationHelper
import com.daml.ledger.api.v1.package_service._
import io.grpc.Channel
import scala.concurrent.Future
class PackageService(channel: Channel, authorizationToken: Option[String]) {
private val service =
AuthorizationHelper.maybeAuthedService(authorizationToken)(PackageServiceGrpc.stub(channel))
def getPackage(packageId: String): Future[GetPackageResponse] = {
service.getPackage(GetPackageRequest(packageId = packageId))
}
def listPackages(): Future[ListPackagesResponse] = {
service.listPackages(ListPackagesRequest())
}
}

View File

@ -0,0 +1,20 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.benchtool.submission
import com.daml.ledger.test.benchtool.Foo.Foo1
import scalaz.syntax.tag._
case class BenchtoolTestsPackageInfo(
packageId: String
)
object BenchtoolTestsPackageInfo {
val BenchtoolTestsPackageName = "benchtool-tests"
// The packageId obtained from the compiled Scala bindings
val StaticDefault: BenchtoolTestsPackageInfo =
BenchtoolTestsPackageInfo(packageId = Foo1.id.unwrap.packageId)
}

View File

@ -8,7 +8,11 @@ import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig.{
PartyNamePrefixFilter,
TransactionsStreamConfig,
}
import com.daml.ledger.api.benchtool.submission.{AllocatedParties, AllocatedPartySet}
import com.daml.ledger.api.benchtool.submission.{
AllocatedParties,
AllocatedPartySet,
BenchtoolTestsPackageInfo,
}
import com.daml.ledger.client.binding.Primitive
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
@ -37,7 +41,8 @@ class ConfigEnricherSpec extends AnyFlatSpec with Matchers {
List("MyParty-0", "MyParty-1").map(makeParty),
)
),
)
),
BenchtoolTestsPackageInfo.StaticDefault,
)
val templates: List[String] = List("otherTemplate", "Foo1")
val foo1Id = com.daml.ledger.test.benchtool.Foo.Foo1.id.unwrap

View File

@ -58,7 +58,10 @@ class InterfaceSubscriptionITSpec
for {
(apiServices, names, submitter) <- benchtoolFixture()
allocatedParties <- submitter.prepare(config)
configDesugaring = new ConfigEnricher(allocatedParties)
configDesugaring = new ConfigEnricher(
allocatedParties,
BenchtoolTestsPackageInfo.StaticDefault,
)
tested = new FooSubmission(
submitter = submitter,
maxInFlightCommands = 1,

View File

@ -66,7 +66,10 @@ class PartySetsITSpec
for {
(apiServices, names, submitter) <- benchtoolFixture()
allocatedParties <- submitter.prepare(submissionConfig)
configDesugaring = new ConfigEnricher(allocatedParties)
configDesugaring = new ConfigEnricher(
allocatedParties,
BenchtoolTestsPackageInfo.StaticDefault,
)
tested = new FooSubmission(
submitter = submitter,
maxInFlightCommands = 1,