Connect LedgerAPI party and package management with KVUtils (#1985)

* connenct LedgerAPI party and package management with KVUtils

* formatting

* address review comments

* add participantId parameter to sandbox and indexer

* annotate TODO's with github issue numbers
This commit is contained in:
mziolekda 2019-07-05 17:31:17 +02:00 committed by mergify[bot]
parent f7657159e4
commit 7e3d580768
33 changed files with 479 additions and 97 deletions

View File

@ -10,7 +10,9 @@ import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import com.daml.ledger.participant.state.kvutils.InMemoryKVParticipantState
import com.daml.ledger.participant.state.kvutils.v2.ParticipantStateConversion
import com.daml.ledger.participant.state.v2.ParticipantId
import com.digitalasset.daml.lf.archive.DarReader
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml_lf.DamlLf.Archive
import com.digitalasset.platform.index.cli.Cli
import com.digitalasset.platform.index.{StandaloneIndexServer, StandaloneIndexerServer}
@ -25,6 +27,10 @@ object ReferenceServer extends App {
val config = Cli.parse(args).getOrElse(sys.exit(1))
// Name of this participant
// TODO: Pass this info in command-line (See issue #2025)
val participantId: ParticipantId = Ref.LedgerString.assertFromString("in-memory-participant")
implicit val system: ActorSystem = ActorSystem("indexed-kvutils")
implicit val materializer: ActorMaterializer = ActorMaterializer(
ActorMaterializerSettings(system)
@ -33,7 +39,7 @@ object ReferenceServer extends App {
Supervision.Stop
})
val ledger = new InMemoryKVParticipantState()
val ledger = new InMemoryKVParticipantState(participantId)
val readService = ParticipantStateConversion.V1ToV2Rread(ledger)
val writeService = ParticipantStateConversion.V1ToV2Write(ledger)

View File

@ -13,9 +13,9 @@ import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import com.daml.ledger.api.server.damlonx.Server
import com.daml.ledger.participant.state.index.v1.impl.reference.ReferenceIndexService
import com.daml.ledger.participant.state.kvutils.InMemoryKVParticipantState
import com.daml.ledger.participant.state.v1.{LedgerInitialConditions, Offset, ReadService, Update}
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.archive.DarReader
import com.digitalasset.daml.lf.data.ImmArray
import com.digitalasset.daml.lf.data.{ImmArray, Ref}
import com.digitalasset.daml.lf.transaction.GenTransaction
import com.digitalasset.daml_lf.DamlLf.Archive
import com.digitalasset.platform.common.util.DirectExecutionContext
@ -33,6 +33,10 @@ object ReferenceServer extends App {
val config = Cli.parse(args).getOrElse(sys.exit(1))
// Name of this participant
// TODO: Pass this info in command-line (See issue #2025)
val participantId: ParticipantId = Ref.LedgerString.assertFromString("in-memory-participant")
// Initialize Akka and log exceptions in flows.
implicit val system: ActorSystem = ActorSystem("ReferenceServer")
implicit val materializer: ActorMaterializer = ActorMaterializer(
@ -42,7 +46,7 @@ object ReferenceServer extends App {
Supervision.Stop
})
val ledger = new InMemoryKVParticipantState
val ledger = new InMemoryKVParticipantState(participantId)
//val ledger = new Ledger(timeModel, tsb)
def archivesFromDar(file: File): List[Archive] = {
@ -66,7 +70,8 @@ object ReferenceServer extends App {
.foreach { initialConditions =>
val indexService = ReferenceIndexService(
participantReadService = if (config.badServer) BadReadService(ledger) else ledger,
initialConditions = initialConditions
initialConditions = initialConditions,
participantId = participantId
)
val server = Server(

View File

@ -130,6 +130,8 @@ object Server {
)
val packageService = DamlOnXPackageService(indexService, ledgerId)
val packageManagementService = DamlOnXPackageManagementService(writeService, indexService)
val partyManagementService = DamlOnXPartyManagementService(writeService, indexService)
val timeService = new DamlOnXTimeService(indexService)
@ -147,6 +149,8 @@ object Server {
commandService,
identityService,
packageService,
packageManagementService,
partyManagementService,
timeService,
configurationService,
reflectionService

View File

@ -0,0 +1,89 @@
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.server.damlonx.services
import java.io.{File, FileOutputStream}
import java.util.zip.ZipFile
import com.daml.ledger.participant.state.index.v1.{PackagesService => IndexPackageService}
import com.daml.ledger.participant.state.v1.{UploadPackagesResult, WritePackagesService}
import com.digitalasset.daml.lf.archive.DarReader
import org.slf4j.{Logger, LoggerFactory}
import com.digitalasset.api.util.TimestampConversion.fromInstant
import com.digitalasset.daml_lf.DamlLf.Archive
import com.digitalasset.ledger.api.v1.admin.package_management_service.PackageManagementServiceGrpc.PackageManagementService
import com.digitalasset.ledger.api.v1.admin.package_management_service._
import com.digitalasset.platform.api.grpc.GrpcApiService
import com.digitalasset.platform.server.api.validation.ErrorFactories
import com.digitalasset.platform.common.util.DirectExecutionContext
import io.grpc.ServerServiceDefinition
import scala.compat.java8.FutureConverters
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
class DamlOnXPackageManagementService(
writeService: WritePackagesService,
indexService: IndexPackageService)
extends PackageManagementService
with GrpcApiService {
protected val logger: Logger = LoggerFactory.getLogger(PackageManagementService.getClass)
implicit val ec: ExecutionContext = DirectExecutionContext
override def close(): Unit = ()
override def bindService(): ServerServiceDefinition =
PackageManagementServiceGrpc.bindService(this, DirectExecutionContext)
override def listKnownPackages(
request: ListKnownPackagesRequest): Future[ListKnownPackagesResponse] = {
indexService.listPackageDetails().map { pkgs =>
ListKnownPackagesResponse(pkgs.toSeq.map {
case (id, details) =>
PackageDetails(
id,
details.size,
Some(fromInstant(details.knownSince.toInstant)),
details.sourceDescription.getOrElse(""))
})
}
}
override def uploadDarFile(request: UploadDarFileRequest): Future[UploadDarFileResponse] = {
val resultT = for {
file <- Try(File.createTempFile("uploadDarFile", ".dar"))
fos <- Try(new FileOutputStream(file))
_ <- Try(fos.write(request.darFile.toByteArray))
dar <- DarReader { case (_, x) => Try(Archive.parseFrom(x)) }.readArchive(new ZipFile(file))
} yield {
fos.close()
writeService.uploadPackages(dar.all, None)
}
resultT.fold(
err => Future.failed(ErrorFactories.invalidArgument(err.getMessage)),
res =>
FutureConverters
.toScala(res)
.flatMap {
case UploadPackagesResult.Ok =>
Future.successful(UploadDarFileResponse())
case r @ UploadPackagesResult.InvalidPackage(_) =>
Future.failed(ErrorFactories.invalidArgument(r.description))
case r @ UploadPackagesResult.ParticipantNotAuthorized =>
Future.failed(ErrorFactories.permissionDenied(r.description))
case r @ UploadPackagesResult.NotSupported =>
Future.failed(ErrorFactories.unimplemented(r.description))
}
)
}
}
object DamlOnXPackageManagementService {
def apply(writeService: WritePackagesService, indexService: IndexPackageService): GrpcApiService =
new DamlOnXPackageManagementService(writeService, indexService)
with PackageManagementServiceLogging
}

View File

@ -0,0 +1,82 @@
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.server.damlonx.services
import akka.stream.Materializer
import com.daml.ledger.participant.state.index.v1.{
PartyManagementService => IndexPartyManagementService
}
import com.daml.ledger.participant.state.v1.{PartyAllocationResult, WritePartyService}
import com.digitalasset.grpc.adapter.ExecutionSequencerFactory
import com.digitalasset.ledger.api.v1.admin.party_management_service.PartyManagementServiceGrpc.PartyManagementService
import com.digitalasset.ledger.api.v1.admin.party_management_service._
import com.digitalasset.platform.api.grpc.GrpcApiService
import com.digitalasset.platform.common.util.DirectExecutionContext
import com.digitalasset.platform.server.api.validation.ErrorFactories
import io.grpc.ServerServiceDefinition
import org.slf4j.LoggerFactory
import scala.compat.java8.FutureConverters
import scala.concurrent.{ExecutionContext, Future}
class DamlOnXPartyManagementService private (
writeService: WritePartyService,
indexService: IndexPartyManagementService)
extends PartyManagementService
with GrpcApiService {
protected val logger = LoggerFactory.getLogger(this.getClass)
implicit val ec: ExecutionContext = DirectExecutionContext
override def close(): Unit = ()
override def bindService(): ServerServiceDefinition =
PartyManagementServiceGrpc.bindService(this, DirectExecutionContext)
override def getParticipantId(
request: GetParticipantIdRequest): Future[GetParticipantIdResponse] =
indexService.getParticipantId
.map(pid => GetParticipantIdResponse(pid.toString))
private[this] def mapPartyDetails(
details: com.digitalasset.ledger.api.domain.PartyDetails): PartyDetails =
PartyDetails(details.party, details.displayName.getOrElse(""), details.isLocal)
override def listKnownParties(
request: ListKnownPartiesRequest): Future[ListKnownPartiesResponse] =
indexService.listParties
.map(ps => ListKnownPartiesResponse(ps.map(mapPartyDetails)))
override def allocateParty(request: AllocatePartyRequest): Future[AllocatePartyResponse] = {
val party = if (request.partyIdHint.isEmpty) None else Some(request.partyIdHint)
val displayName = if (request.displayName.isEmpty) None else Some(request.displayName)
FutureConverters
.toScala(
writeService
.allocateParty(party, displayName))
.flatMap {
case PartyAllocationResult.Ok(details) =>
Future.successful(AllocatePartyResponse(Some(mapPartyDetails(details))))
case r @ PartyAllocationResult.AlreadyExists =>
Future.failed(ErrorFactories.invalidArgument(r.description))
case r @ PartyAllocationResult.InvalidName(_) =>
Future.failed(ErrorFactories.invalidArgument(r.description))
case r @ PartyAllocationResult.ParticipantNotAuthorized =>
Future.failed(ErrorFactories.permissionDenied(r.description))
case r @ PartyAllocationResult.NotSupported =>
Future.failed(ErrorFactories.unimplemented(r.description))
}
}
}
object DamlOnXPartyManagementService {
def apply(writeService: WritePartyService, indexService: IndexPartyManagementService)(
implicit ec: ExecutionContext,
esf: ExecutionSequencerFactory,
mat: Materializer): GrpcApiService =
new DamlOnXPartyManagementService(writeService, indexService) with PartyManagementServiceLogging
}

View File

@ -38,8 +38,7 @@ class GrpcTransactionService(
identifierResolver: IdentifierResolver)(
implicit protected val esf: ExecutionSequencerFactory,
protected val mat: Materializer)
extends ApiTransactionService
with TransactionServiceAkkaGrpc
extends TransactionServiceAkkaGrpc
with ProxyCloseable
with GrpcApiService
with ErrorFactories

View File

@ -3,6 +3,7 @@
package com.digitalasset.ledger.client.services.admin
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.data.Ref.Party
import com.digitalasset.ledger.api.domain.{ParticipantId, PartyDetails}
import com.digitalasset.ledger.api.v1.admin.party_management_service.{
@ -28,7 +29,7 @@ final class PartyManagementClient(partyManagementService: PartyManagementService
def getParticipantId(): Future[ParticipantId] =
partyManagementService
.getParticipantId(new GetParticipantIdRequest())
.map(r => ParticipantId(r.participantId))
.map(r => ParticipantId(Ref.LedgerString.assertFromString(r.participantId)))
def listKnownParties(): Future[List[PartyDetails]] =
partyManagementService

View File

@ -43,8 +43,7 @@ class GrpcTransactionService(
identifierResolver: IdentifierResolver)(
implicit protected val esf: ExecutionSequencerFactory,
protected val mat: Materializer)
extends ApiTransactionService
with TransactionServiceAkkaGrpc
extends TransactionServiceAkkaGrpc
with GrpcApiService
with ErrorFactories
with FieldValidations {

View File

@ -261,7 +261,7 @@ object domain {
sealed trait ParticipantIdTag
type ParticipantId = String @@ ParticipantIdTag
type ParticipantId = Ref.LedgerString @@ ParticipantIdTag
val ParticipantId: Tag.TagOf[ParticipantIdTag] = Tag.of[ParticipantIdTag]
sealed trait ApplicationIdTag

View File

@ -141,6 +141,7 @@ client_server_test(
server = "//ledger/api-server-damlonx/reference:reference",
server_args = [
"$(rlocation $TEST_WORKSPACE/$(rootpath //ledger/ledger-api-integration-tests:SemanticTests.dar))",
"$(rlocation $TEST_WORKSPACE/$(rootpath //ledger/sandbox:Test.dar))",
"--crt $(rlocation $TEST_WORKSPACE/$(rootpath testdata/server.crt))",
"--cacrt $(rlocation $TEST_WORKSPACE/$(rootpath testdata/ca.crt))",
"--pem $(rlocation $TEST_WORKSPACE/$(rootpath testdata/server.pem))",

View File

@ -16,7 +16,12 @@ import com.digitalasset.platform.tests.integration.ledger.api.commands.{
CommandTransactionChecksHighLevelIT,
CommandTransactionChecksLowLevelIT
}
import com.digitalasset.platform.tests.integration.ledger.api.{DivulgenceIT, TransactionServiceIT}
import com.digitalasset.platform.tests.integration.ledger.api.{
DivulgenceIT,
PackageManagementServiceIT,
PartyManagementServiceIT,
TransactionServiceIT
}
import com.digitalasset.platform.tests.integration.ledger.api.transaction.TransactionBackpressureIT
import org.scalatest.time.{Seconds, Span}
import org.scalatest.{Args, Suite}
@ -138,7 +143,10 @@ object LedgerApiTestTool {
commonConfig.withDarFile(resourceAsFile(semanticTestsResource))
}
)
Map(semanticTestsRunner)
Map(
semanticTestsRunner
)
}
private def optionalTests(
commonConfig: PlatformApplications.Config,
@ -209,12 +217,40 @@ object LedgerApiTestTool {
}
)
val packageManagementServiceIT = lazyInit(
"PackageManagementServiceIT",
name =>
new PackageManagementServiceIT {
override def suiteName: String = name
override def actorSystemName = s"${name}ToolActorSystem"
override def fixtureIdsEnabled: Set[LedgerBackend] = Set(LedgerBackend.RemoteApiProxy)
override def spanScaleFactor: Double = toolConfig.timeoutScaleFactor
override protected def config: Config =
commonConfig.withDarFile(resourceAsFile(integrationTestResource))
}
)
val partyManagementServiceIT = lazyInit(
"PartyManagementServiceIT",
name =>
new PartyManagementServiceIT {
override def suiteName: String = name
override def actorSystemName = s"${name}ToolActorSystem"
override def fixtureIdsEnabled: Set[LedgerBackend] = Set(LedgerBackend.RemoteApiProxy)
override def spanScaleFactor: Double = toolConfig.timeoutScaleFactor
override protected def config: Config =
commonConfig.withDarFile(resourceAsFile(integrationTestResource))
}
)
Map(
transactionServiceIT,
transactionBackpressureIT,
divulgenceIT,
commandTransactionChecksHighLevelIT,
commandTransactionChecksLowLevelIT
commandTransactionChecksLowLevelIT,
packageManagementServiceIT,
partyManagementServiceIT
)
}

View File

@ -37,5 +37,8 @@ da_scala_library(
"//visibility:public",
],
runtime_deps = [],
deps = compileDependencies + ["//3rdparty/jvm/org/scalaz:scalaz_core"],
deps = compileDependencies + [
"//3rdparty/jvm/org/scalaz:scalaz_core",
"//ledger/participant-state:participant-state",
],
)

View File

@ -3,16 +3,16 @@
package com.daml.ledger.participant.state.index.v1.impl.reference
import com.daml.ledger.participant.state.v1.{Offset, _}
import com.digitalasset.daml.lf.archive.Decode
import com.digitalasset.daml.lf.data.Ref.{PackageId, Party}
import com.digitalasset.daml.lf.data.Relation.Relation
import com.daml.ledger.participant.state.index.v1.PackageDetails
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref.PackageId
import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.daml.lf.engine.Blinding
import com.digitalasset.daml.lf.transaction.Node.{NodeCreate, NodeExercises}
import com.digitalasset.daml.lf.transaction.{BlindingInfo, Transaction}
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml_lf.DamlLf.Archive
import com.digitalasset.ledger.api.domain.PartyDetails
import com.digitalasset.platform.sandbox.stores.InMemoryActiveContracts
import org.slf4j.{Logger, LoggerFactory}
@ -20,6 +20,7 @@ import scala.collection.immutable.TreeMap
final case class IndexState(
ledgerId: LedgerId,
participantId: ParticipantId,
recordTime: Timestamp,
configuration: Configuration,
private val updateId: Option[Offset],
@ -31,8 +32,8 @@ final case class IndexState(
commandRejections: TreeMap[Offset, Update.CommandRejected],
// Uploaded packages.
packages: Map[PackageId, Archive],
packageKnownTo: Relation[PackageId, Party],
hostedParties: Set[Party]) {
packageDetails: Map[PackageId, PackageDetails],
knownParties: Set[PartyDetails]) {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
@ -70,22 +71,37 @@ final case class IndexState(
case u: Update.ConfigurationChanged =>
Right(state.copy(configuration = u.newConfiguration))
//party: Ref.Party, displayName: Option[String], isLocal: Boolean
case u: Update.PartyAddedToParticipant =>
Right(state.copy(hostedParties = state.hostedParties + u.party))
Right(
state.copy(
knownParties = state.knownParties + PartyDetails(
u.party,
Some(u.displayName),
u.participantId == state.participantId)))
case Update.PublicPackagesUploaded(
archives,
sourceDescription,
participantId,
recordTime) =>
uploadRecordTime) =>
val newPackages =
state.packages ++ archives.map(a => PackageId.assertFromString(a.getHash) -> a)
val decodedPackages = newPackages.mapValues(archive => Decode.decodeArchive(archive)._2)
val newPackageDetails =
state.packageDetails ++ archives.map(
a =>
PackageId.assertFromString(a.getHash) -> PackageDetails(
a.getPayload.size.toLong,
uploadRecordTime,
sourceDescription))
//val decodedPackages = newPackages.mapValues(archive => Decode.decodeArchive(archive)._2)
Right(
state
.copy(
packages = newPackages
packages = newPackages,
packageDetails = newPackageDetails
))
case u: Update.CommandRejected =>
@ -150,18 +166,20 @@ object IndexState {
case object NonMonotonicOffset extends InvariantViolation
case object SequencingError extends InvariantViolation
def initialState(lic: LedgerInitialConditions): IndexState = IndexState(
ledgerId = lic.ledgerId,
updateId = None,
beginning = None,
configuration = lic.config,
recordTime = lic.initialRecordTime,
txs = TreeMap.empty,
activeContracts = InMemoryActiveContracts.empty,
commandRejections = TreeMap.empty,
packages = Map.empty,
packageKnownTo = Map.empty,
hostedParties = Set.empty
)
def initialState(lic: LedgerInitialConditions, givenParticipantId: ParticipantId): IndexState =
IndexState(
ledgerId = lic.ledgerId,
participantId = givenParticipantId,
updateId = None,
beginning = None,
configuration = lic.config,
recordTime = lic.initialRecordTime,
txs = TreeMap.empty,
activeContracts = InMemoryActiveContracts.empty,
commandRejections = TreeMap.empty,
packages = Map.empty,
packageDetails = Map.empty,
knownParties = Set.empty
)
}

View File

@ -17,7 +17,7 @@ import com.digitalasset.daml.lf.data.{Ref, Time}
import com.digitalasset.daml.lf.transaction.Node.{NodeCreate, NodeExercises}
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml_lf.DamlLf
import com.digitalasset.ledger.api.domain.{LedgerOffset, TransactionFilter}
import com.digitalasset.ledger.api.domain.{LedgerOffset, PartyDetails, TransactionFilter}
import com.digitalasset.platform.akkastreams.dispatcher.SignalDispatcher
import com.digitalasset.platform.sandbox.stores.ActiveContracts
import org.slf4j.LoggerFactory
@ -27,7 +27,8 @@ import scala.concurrent.{ExecutionContext, Future, Promise}
final case class ReferenceIndexService(
participantReadService: participant.state.v1.ReadService,
initialConditions: LedgerInitialConditions)(implicit val mat: Materializer)
initialConditions: LedgerInitialConditions,
participantId: ParticipantId)(implicit val mat: Materializer)
extends participant.state.index.v1.IndexService
with AutoCloseable {
val logger = LoggerFactory.getLogger(this.getClass)
@ -37,7 +38,7 @@ final case class ReferenceIndexService(
object StateController {
private val stateChangeDispatcher = SignalDispatcher()
private val currentState: AtomicReference[IndexState] = new AtomicReference(
IndexState.initialState(initialConditions)
IndexState.initialState(initialConditions, participantId)
)
def updateState(f: IndexState => IndexState): Unit = {
@ -110,6 +111,11 @@ final case class ReferenceIndexService(
stateUpdateKillSwitch.shutdown()
}
override def listPackageDetails(): Future[Map[PackageId, PackageDetails]] =
futureWithState { state =>
Future.successful(state.packageDetails)
}
override def listPackages(): Future[Set[PackageId]] =
futureWithState { state =>
Future.successful(state.packages.keySet)
@ -122,6 +128,20 @@ final case class ReferenceIndexService(
)
}
def getParticipantId: Future[ParticipantId] =
futureWithState { state =>
Future.successful(
state.participantId
)
}
def listParties: Future[List[PartyDetails]] =
futureWithState { state =>
Future.successful(
state.knownParties.toList
)
}
override def getLedgerConfiguration(): Source[Configuration, NotUsed] =
Source
.fromFuture(futureWithState(state => Future.successful(state.configuration)))

View File

@ -12,3 +12,4 @@ trait IndexService
with ContractStore
with IdentityService
with TimeService
with PartyManagementService

View File

@ -13,6 +13,8 @@ import scala.concurrent.Future
* [[com.digitalasset.ledger.api.v1.package_service.PackageServiceGrpc.PackageService]]
*/
trait PackagesService {
def listPackageDetails(): Future[Map[PackageId, PackageDetails]]
def listPackages(): Future[Set[PackageId]]
def getPackage(packageId: PackageId): Future[Option[Archive]]

View File

@ -0,0 +1,19 @@
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.index.v1
import scala.concurrent.Future
import com.daml.ledger.participant.state.v1.ParticipantId
// TODO: This should go to participant-state.vX
import com.digitalasset.ledger.api.domain.PartyDetails
/**
* Serves as a backend to implement
* [[com.digitalasset.ledger.api.v1.admin.party_management_service.PartyManagementServiceGrpc]]
*/
trait PartyManagementService {
def getParticipantId: Future[ParticipantId]
def listParties: Future[List[PartyDetails]]
}

View File

@ -82,4 +82,20 @@ package object v1 {
takenAt: LedgerOffset.Absolute,
activeContracts: Source[(Option[WorkflowId], AcsUpdateEvent.Create), NotUsed])
/** Meta-data of a DAML-LF package
*
* @param size : The size of the archive payload, in bytes.
*
* @param knownSince : Indicates since when the package is known to
* the backing participant.
*
* @param sourceDescription : Optional description provided by the backing
* participant describing where it got the package from.
*
*/
final case class PackageDetails(
size: Long,
knownSince: Timestamp,
sourceDescription: Option[String])
}

View File

@ -3,7 +3,8 @@
package com.daml.ledger.participant.state.index.v2
import com.digitalasset.ledger.api.domain.{ParticipantId, PartyDetails}
import com.daml.ledger.participant.state.v2.ParticipantId
import com.digitalasset.ledger.api.domain.PartyDetails
import scala.concurrent.Future

View File

@ -17,7 +17,7 @@ import com.daml.ledger.participant.state.backport.TimeModel
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.v1.{UploadPackagesResult, _}
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.data.Ref.LedgerString
import com.digitalasset.daml.lf.data.Ref.{LedgerString, Party}
import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.daml.lf.engine.Engine
import com.digitalasset.daml_lf.DamlLf.Archive
@ -98,6 +98,7 @@ object InMemoryKVParticipantState {
* https://doc.akka.io/docs/akka/current/index-actors.html.
*/
class InMemoryKVParticipantState(
val participantId: ParticipantId,
val ledgerId: LedgerString.T = Ref.LedgerString.assertFromString(UUID.randomUUID.toString),
file: Option[File] = None)(implicit system: ActorSystem, mat: Materializer)
extends ReadService
@ -133,9 +134,6 @@ class InMemoryKVParticipantState(
*/
private val HEARTBEAT_INTERVAL = 5.seconds
// Name of this participant, ultimately pass this info in command-line
val participantId = "in-memory-participant"
/** Reference to the latest state of the in-memory ledger.
* This state is only updated by the [[CommitActor]], which processes submissions
* sequentially and non-concurrently.
@ -418,16 +416,34 @@ class InMemoryKVParticipantState(
override def allocateParty(
hint: Option[String],
displayName: Option[String]): CompletionStage[PartyAllocationResult] = {
hint.map(p => Party.fromString(p)) match {
case None =>
allocatePartyOnLedger(generateRandomId(), displayName)
case Some(Right(party)) =>
allocatePartyOnLedger(party, displayName)
case Some(Left(error)) =>
CompletableFuture.completedFuture(PartyAllocationResult.InvalidName(error))
}
}
private def allocatePartyOnLedger(
party: String,
displayName: Option[String]): CompletionStage[PartyAllocationResult] = {
val sId = submissionId.getAndIncrement().toString
val cf = new CompletableFuture[PartyAllocationResult]
matcherActorRef ! AddPartyAllocationRequest(sId, cf)
commitActorRef ! CommitSubmission(
allocateEntryId,
KeyValueSubmission.partyToSubmission(sId, hint, displayName, participantId)
allocateEntryId(),
KeyValueSubmission.partyToSubmission(sId, Some(party), displayName, participantId)
)
cf
}
private def generateRandomId(): Ref.Party =
Ref.Party.assertFromString(s"party-${UUID.randomUUID().toString.take(8)}")
/** Upload DAML-LF packages to the ledger */
override def uploadPackages(
archives: List[Archive],

View File

@ -6,6 +6,7 @@ package com.daml.ledger.participant.state.kvutils
import com.daml.ledger.participant.state.kvutils.Conversions._
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.data.Ref.{LedgerString, Party}
import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.ledger.api.domain.PartyDetails
@ -48,7 +49,7 @@ object KeyValueConsumption {
if (entry.getPackageUploadEntry.getSourceDescription.nonEmpty)
Some(entry.getPackageUploadEntry.getSourceDescription)
else None,
entry.getPackageUploadEntry.getParticipantId,
Ref.LedgerString.assertFromString(entry.getPackageUploadEntry.getParticipantId),
recordTime
)
)
@ -61,7 +62,7 @@ object KeyValueConsumption {
Update.PartyAddedToParticipant(
Party.assertFromString(entry.getPartyAllocationEntry.getParty),
entry.getPartyAllocationEntry.getDisplayName,
entry.getPartyAllocationEntry.getParticipantId,
Ref.LedgerString.assertFromString(entry.getPartyAllocationEntry.getParticipantId),
recordTime
)
)

View File

@ -21,6 +21,8 @@ class InMemoryKVParticipantStateIT extends AsyncWordSpec with AkkaBeforeAndAfter
private val emptyTransaction: SubmittedTransaction =
PartialTransaction.initial.finish.right.get
val participantId: ParticipantId = Ref.LedgerString.assertFromString("in-memory-participant")
def submitterInfo(rt: Timestamp) = SubmitterInfo(
submitter = Ref.Party.assertFromString("Alice"),
applicationId = Ref.LedgerString.assertFromString("tests"),
@ -38,7 +40,7 @@ class InMemoryKVParticipantStateIT extends AsyncWordSpec with AkkaBeforeAndAfter
// creation & teardown!
"return initial conditions" in {
val ps = new InMemoryKVParticipantState
val ps = new InMemoryKVParticipantState(participantId)
ps.getLedgerInitialConditions()
.runWith(Sink.head)
.map { _ =>
@ -48,7 +50,7 @@ class InMemoryKVParticipantStateIT extends AsyncWordSpec with AkkaBeforeAndAfter
}
"provide update after uploadPackages" in {
val ps = new InMemoryKVParticipantState
val ps = new InMemoryKVParticipantState(participantId)
val rt = ps.getNewRecordTime()
val sourceDescription = Some("provided by test")
@ -84,7 +86,7 @@ class InMemoryKVParticipantStateIT extends AsyncWordSpec with AkkaBeforeAndAfter
}
"duplicate package removed from update after uploadPackages" in {
val ps = new InMemoryKVParticipantState
val ps = new InMemoryKVParticipantState(participantId)
val rt = ps.getNewRecordTime()
val sourceDescription = Some("provided by test")
@ -123,7 +125,7 @@ class InMemoryKVParticipantStateIT extends AsyncWordSpec with AkkaBeforeAndAfter
}
"reject uploadPackages when archive is empty" in {
val ps = new InMemoryKVParticipantState
val ps = new InMemoryKVParticipantState(participantId)
val rt = ps.getNewRecordTime()
val sourceDescription = Some("provided by test")
@ -147,7 +149,7 @@ class InMemoryKVParticipantStateIT extends AsyncWordSpec with AkkaBeforeAndAfter
}
"provide update after allocateParty" in {
val ps = new InMemoryKVParticipantState
val ps = new InMemoryKVParticipantState(participantId)
val rt = ps.getNewRecordTime()
val hint = Some("Alice")
@ -180,8 +182,8 @@ class InMemoryKVParticipantStateIT extends AsyncWordSpec with AkkaBeforeAndAfter
}
}
"reject allocateParty when hint is empty" in {
val ps = new InMemoryKVParticipantState
"accept allocateParty when hint is empty" in {
val ps = new InMemoryKVParticipantState(participantId)
val rt = ps.getNewRecordTime()
val hint = None
@ -192,7 +194,7 @@ class InMemoryKVParticipantStateIT extends AsyncWordSpec with AkkaBeforeAndAfter
} yield {
ps.close()
result match {
case PartyAllocationResult.InvalidName(_) =>
case PartyAllocationResult.Ok(_) =>
succeed
case _ =>
fail("unexpected response to party allocation")
@ -208,8 +210,28 @@ class InMemoryKVParticipantStateIT extends AsyncWordSpec with AkkaBeforeAndAfter
// }).toScala
}
"reject allocateParty when hint contains invalid string for a party" in {
val ps = new InMemoryKVParticipantState(participantId)
val rt = ps.getNewRecordTime()
val hint = Some("Alice!@")
val displayName = Some("Alice Cooper")
for {
result <- ps.allocateParty(hint, displayName).toScala
} yield {
ps.close()
result match {
case PartyAllocationResult.InvalidName(_) =>
succeed
case _ =>
fail("unexpected response to party allocation")
}
}
}
"reject duplicate allocateParty" in {
val ps = new InMemoryKVParticipantState
val ps = new InMemoryKVParticipantState(participantId)
val rt = ps.getNewRecordTime()
val hint = Some("Alice")
@ -238,7 +260,7 @@ class InMemoryKVParticipantStateIT extends AsyncWordSpec with AkkaBeforeAndAfter
"provide update after transaction submission" in {
val ps = new InMemoryKVParticipantState
val ps = new InMemoryKVParticipantState(participantId)
val rt = ps.getNewRecordTime()
val waitForUpdateFuture =
@ -254,7 +276,7 @@ class InMemoryKVParticipantStateIT extends AsyncWordSpec with AkkaBeforeAndAfter
}
"reject duplicate commands" in {
val ps = new InMemoryKVParticipantState
val ps = new InMemoryKVParticipantState(participantId)
val rt = ps.getNewRecordTime()
val waitForUpdateFuture =
@ -281,7 +303,7 @@ class InMemoryKVParticipantStateIT extends AsyncWordSpec with AkkaBeforeAndAfter
}
"return second update with beginAfter=1" in {
val ps = new InMemoryKVParticipantState
val ps = new InMemoryKVParticipantState(participantId)
val rt = ps.getNewRecordTime()
val waitForUpdateFuture =

View File

@ -54,7 +54,7 @@ object Update {
final case class PartyAddedToParticipant(
party: Party,
displayName: String,
participantId: String,
participantId: ParticipantId,
recordTime: Timestamp)
extends Update {
override def description: String =
@ -90,7 +90,7 @@ object Update {
final case class PublicPackagesUploaded(
archives: List[DamlLf.Archive],
sourceDescription: Option[String],
participantId: String,
participantId: ParticipantId,
recordTime: Timestamp)
extends Update {
override def description: String =

View File

@ -60,6 +60,9 @@ package object v1 {
/** Identifier for the ledger, MUST match regexp [a-zA-Z0-9-]. */
type LedgerId = String
/** Identifier for the participant, MUST match regexp [a-zA-Z0-9-]. */
type ParticipantId = Ref.LedgerString
/** Identifiers for transactions.
* Currently unrestricted unicode (See issue #398). */
type TransactionId = Ref.TransactionIdString

View File

@ -50,6 +50,9 @@ package object v2 {
/** Identifier for the ledger, MUST match regexp [a-zA-Z0-9-]. */
type LedgerId = String
/** Identifier for the participant, MUST match regexp [a-zA-Z0-9-]. */
type ParticipantId = Ref.LedgerString
/** Identifiers for transactions.
* Currently unrestricted unicode (See issue #398). */
type TransactionId = Ref.TransactionIdString

View File

@ -8,8 +8,8 @@ import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.participant.state.v2.ReadService
import com.digitalasset.ledger.api.domain._
import com.daml.ledger.participant.state.v2.{ReadService, ParticipantId}
import com.digitalasset.ledger.api.domain.{ParticipantId => _, _}
import com.digitalasset.platform.common.util.{DirectExecutionContext => DEC}
import com.digitalasset.platform.sandbox.metrics.MetricsManager
import com.digitalasset.platform.sandbox.stores.LedgerBackedIndexService
@ -22,14 +22,18 @@ import com.digitalasset.platform.sandbox.stores.ledger.{
import scala.concurrent.Future
object PostgresIndex {
def apply(readService: ReadService, ledgerId: LedgerId, jdbcUrl: String)(
def apply(
readService: ReadService,
ledgerId: LedgerId,
participantId: ParticipantId,
jdbcUrl: String)(
implicit mat: Materializer,
mm: MetricsManager): Future[IndexService with AutoCloseable] =
Ledger
.postgresReadOnly(jdbcUrl, ledgerId)
.map { ledger =>
val contractStore = new SandboxContractStore(ledger)
new LedgerBackedIndexService(MeteredReadOnlyLedger(ledger), contractStore) {
new LedgerBackedIndexService(MeteredReadOnlyLedger(ledger), contractStore, participantId) {
override def getLedgerConfiguration(): Source[v2.LedgerConfiguration, NotUsed] =
readService.getLedgerInitialConditions().map { cond =>
v2.LedgerConfiguration(cond.config.timeModel.minTtl, cond.config.timeModel.maxTtl)

View File

@ -9,8 +9,9 @@ import java.time.Instant
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.daml.ledger.participant.state.v2.{ReadService, WriteService}
import com.daml.ledger.participant.state.v2.{ParticipantId, ReadService, WriteService}
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.engine.Engine
import com.digitalasset.grpc.adapter.ExecutionSequencerFactory
import com.digitalasset.ledger.api.domain
@ -18,8 +19,8 @@ import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.ledger.server.apiserver.{ApiServer, ApiServices, LedgerApiServer}
import com.digitalasset.platform.index.StandaloneIndexServer.{
asyncTolerance,
preloadPackages,
logger
logger,
preloadPackages
}
import com.digitalasset.platform.index.config.Config
import com.digitalasset.platform.sandbox.BuildInfo
@ -79,6 +80,10 @@ class StandaloneIndexServer(
readService: ReadService,
writeService: WriteService) {
// Name of this participant,
// TODO: Pass this info in command-line (See issue #2025)
val participantId: ParticipantId = Ref.LedgerString.assertFromString("postgress-participant")
case class ApiServerState(
ledgerId: LedgerId,
apiServer: ApiServer,
@ -128,7 +133,11 @@ class StandaloneIndexServer(
val initF = for {
cond <- readService.getLedgerInitialConditions().runWith(Sink.head)
indexService <- PostgresIndex(readService, domain.LedgerId(cond.ledgerId), config.jdbcUrl)
indexService <- PostgresIndex(
readService,
domain.LedgerId(cond.ledgerId),
participantId,
config.jdbcUrl)
} yield (cond.ledgerId, cond.config.timeModel, indexService)
val (actualLedgerId, timeModel, indexService) = Try(Await.result(initF, asyncTolerance))

View File

@ -8,8 +8,9 @@ import java.time.Instant
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.daml.ledger.participant.state.v2.ParticipantId
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.data.ImmArray
import com.digitalasset.daml.lf.data.{ImmArray, Ref}
import com.digitalasset.daml.lf.engine.Engine
import com.digitalasset.grpc.adapter.ExecutionSequencerFactory
import com.digitalasset.ledger.api.domain.LedgerId
@ -82,6 +83,10 @@ object SandboxServer {
class SandboxServer(actorSystemName: String, config: => SandboxConfig) extends AutoCloseable {
// Name of this participant
// TODO: Pass this info in command-line (See issue #2025)
val participantId: ParticipantId = Ref.LedgerString.assertFromString("sandbox-participant")
case class ApiServerState(
ledgerId: LedgerId,
apiServer: ApiServer,
@ -187,6 +192,7 @@ class SandboxServer(actorSystemName: String, config: => SandboxConfig) extends A
case Some(jdbcUrl) =>
"postgres" -> SandboxIndexAndWriteService.postgres(
ledgerId,
participantId,
jdbcUrl,
config.timeModel,
timeProvider,
@ -201,6 +207,7 @@ class SandboxServer(actorSystemName: String, config: => SandboxConfig) extends A
"in-memory" -> Future.successful(
SandboxIndexAndWriteService.inMemory(
ledgerId,
participantId,
config.timeModel,
timeProvider,
acs,

View File

@ -17,7 +17,8 @@ import com.digitalasset.platform.api.grpc.GrpcApiService
import com.google.protobuf.timestamp.Timestamp
import com.digitalasset.platform.common.util.{DirectExecutionContext => DE}
import com.digitalasset.platform.server.api.validation.ErrorFactories
import io.grpc.{BindableService, ServerServiceDefinition}
import io.grpc.ServerServiceDefinition
import org.slf4j.{Logger, LoggerFactory}
import scala.compat.java8.FutureConverters
import scala.concurrent.Future
@ -29,6 +30,8 @@ class ApiPackageManagementService(
extends PackageManagementService
with GrpcApiService {
protected val logger: Logger = LoggerFactory.getLogger(PackageManagementService.getClass)
override def close(): Unit = ()
override def bindService(): ServerServiceDefinition =
@ -81,7 +84,6 @@ class ApiPackageManagementService(
object ApiPackageManagementService {
def createApiService(
readBackend: IndexPackagesService,
writeBackend: WritePackagesService): GrpcApiService with BindableService = {
new ApiPackageManagementService(readBackend, writeBackend) with BindableService
}
writeBackend: WritePackagesService): GrpcApiService =
new ApiPackageManagementService(readBackend, writeBackend) with PackageManagementServiceLogging
}

View File

@ -12,9 +12,8 @@ import com.digitalasset.ledger.api.v1.admin.party_management_service._
import com.digitalasset.platform.api.grpc.GrpcApiService
import com.digitalasset.platform.common.util.{DirectExecutionContext => DE}
import com.digitalasset.platform.server.api.validation.ErrorFactories
import io.grpc.{BindableService, ServerServiceDefinition}
import io.grpc.ServerServiceDefinition
import org.slf4j.LoggerFactory
import scalaz.syntax.tag._
import scala.compat.java8.FutureConverters
import scala.concurrent.{ExecutionContext, Future}
@ -36,7 +35,7 @@ class ApiPartyManagementService private (
request: GetParticipantIdRequest): Future[GetParticipantIdResponse] =
partyManagementService
.getParticipantId()
.map(pid => GetParticipantIdResponse(pid.unwrap))(DE)
.map(pid => GetParticipantIdResponse(pid.toString))(DE)
private[this] def mapPartyDetails(
details: com.digitalasset.ledger.api.domain.PartyDetails): PartyDetails =
@ -74,7 +73,6 @@ object ApiPartyManagementService {
def createApiService(readBackend: IndexPartyManagementService, writeBackend: WritePartyService)(
implicit ec: ExecutionContext,
esf: ExecutionSequencerFactory,
mat: Materializer): GrpcApiService with BindableService with PartyManagementServiceLogging =
new ApiPartyManagementService(readBackend, writeBackend) with BindableService
with PartyManagementServiceLogging
mat: Materializer): GrpcApiService =
new ApiPartyManagementService(readBackend, writeBackend) with PartyManagementServiceLogging
}

View File

@ -9,8 +9,14 @@ import java.util.concurrent.{CompletableFuture, CompletionStage}
import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.daml.ledger.participant.state.v2.{ApplicationId => _, TransactionId => _, _}
import com.daml.ledger.participant.state.v2.{
ApplicationId => _,
LedgerId => _,
TransactionId => _,
_
}
import com.daml.ledger.participant.state.{v2 => ParticipantState}
import com.daml.ledger.participant.state.index.v2._
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.data.Ref.{LedgerString, PackageId, Party, TransactionIdString}
import com.digitalasset.daml.lf.data.{ImmArray, Ref}
@ -25,7 +31,7 @@ import com.digitalasset.ledger.api.domain.CompletionEvent.{
CommandAccepted,
CommandRejected
}
import com.digitalasset.ledger.api.domain.{LedgerId, _}
import com.digitalasset.ledger.api.domain.{ParticipantId => _, _}
import com.digitalasset.platform.common.util.{DirectExecutionContext => DEC}
import com.digitalasset.platform.participant.util.EventFilter
import com.digitalasset.platform.sandbox.metrics.MetricsManager
@ -41,8 +47,6 @@ import scalaz.syntax.tag._
import scala.compat.java8.FutureConverters
import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}
import com.daml.ledger.participant.state.index.v2._
import com.daml.ledger.participant.state.v2.{ApplicationId => _, TransactionId => _, _}
import scala.util.Try
trait IndexAndWriteService extends AutoCloseable {
@ -59,6 +63,7 @@ object SandboxIndexAndWriteService {
def postgres(
ledgerId: LedgerId,
participantId: ParticipantId,
jdbcUrl: String,
timeModel: TimeModel,
timeProvider: TimeProvider,
@ -80,10 +85,12 @@ object SandboxIndexAndWriteService {
queueDepth,
startMode
)
.map(ledger => createInstance(Ledger.metered(ledger), timeModel, timeProvider))(DEC)
.map(ledger =>
createInstance(Ledger.metered(ledger), participantId, timeModel, timeProvider))(DEC)
def inMemory(
ledgerId: LedgerId,
participantId: ParticipantId,
timeModel: TimeModel,
timeProvider: TimeProvider,
acs: InMemoryActiveContracts,
@ -93,13 +100,16 @@ object SandboxIndexAndWriteService {
mm: MetricsManager): IndexAndWriteService = {
val ledger =
Ledger.metered(Ledger.inMemory(ledgerId, timeProvider, acs, templateStore, ledgerEntries))
createInstance(ledger, timeModel, timeProvider)
createInstance(ledger, participantId, timeModel, timeProvider)
}
private def createInstance(ledger: Ledger, timeModel: TimeModel, timeProvider: TimeProvider)(
implicit mat: Materializer) = {
private def createInstance(
ledger: Ledger,
participantId: ParticipantId,
timeModel: TimeModel,
timeProvider: TimeProvider)(implicit mat: Materializer) = {
val contractStore = new SandboxContractStore(ledger)
val indexSvc = new LedgerBackedIndexService(ledger, contractStore) {
val indexSvc = new LedgerBackedIndexService(ledger, contractStore, participantId) {
override def getLedgerConfiguration(): Source[LedgerConfiguration, NotUsed] =
Source
.single(LedgerConfiguration(timeModel.minTtl, timeModel.maxTtl))
@ -146,7 +156,8 @@ object SandboxIndexAndWriteService {
abstract class LedgerBackedIndexService(
ledger: ReadOnlyLedger,
contractStore: ContractStore
contractStore: ContractStore,
participantId: ParticipantId
)(implicit mat: Materializer)
extends IndexService
with AutoCloseable {
@ -383,9 +394,7 @@ abstract class LedgerBackedIndexService(
// PartyManagementService
override def getParticipantId(): Future[ParticipantId] =
// In the case of the sandbox, there is only one participant node
// TODO: Make the participant ID configurable
Future.successful(ParticipantId(ledger.ledgerId.unwrap))
Future.successful(participantId)
override def listParties(): Future[List[PartyDetails]] =
ledger.parties

View File

@ -963,6 +963,9 @@ private class PostgresLedgerDao(
dbDispatcher.executeSql { implicit conn =>
SQL_SELECT_PARTIES
.as(PartyDataParser.*)
// TODO: isLocal should be based on equality of participantId reported in an
// update and the id given to participant in a command-line argument
// (See issue #2026)
.map(d => PartyDetails(Party.assertFromString(d.party), d.displayName, true))
}

View File

@ -8,9 +8,10 @@ import java.time.Instant
import java.util.zip.ZipFile
import akka.stream.ActorMaterializer
import com.daml.ledger.participant.state.v2.ParticipantId
import com.digitalasset.api.util.{TimeProvider, ToleranceWindow}
import com.digitalasset.daml.lf.archive.DarReader
import com.digitalasset.daml.lf.data.ImmArray
import com.digitalasset.daml.lf.data.{ImmArray, Ref}
import com.digitalasset.daml.lf.engine.Engine
import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.platform.sandbox.metrics.MetricsManager
@ -49,10 +50,12 @@ trait TestHelpers {
implicit val mm: MetricsManager = MetricsManager()
val ledgerId = LedgerId("sandbox-ledger")
val participantId: ParticipantId = Ref.LedgerString.assertFromString("sandbox-participant")
val indexAndWriteService = SandboxIndexAndWriteService
.inMemory(
ledgerId,
participantId,
TimeModel.reasonableDefault,
TimeProvider.Constant(Instant.EPOCH),
InMemoryActiveContracts.empty,