Ledger configuration indexing changes (rebased to master) (#3553)

* Add participant-state configuration protobuf

- Move the "DamlConfiguration" from kvutils into participant-state/protobuf/ledger_configuration.proto.
- Add version number and spec (ala transaction.proto)

This is a preparation for indexing the configuration and having one canonical serialization for it.

* Initial thoughts on indexing the configuration

* Implement indexing of ledger configuration changes

* Add record time to all Updates. Wire through participant id.

and rename V7__Add_configuration to V8.

* Add ledger_configuration_java_proto to release artifacts

* Fix up release of ledger_configuration_java_proto

* Suggestions from review

Co-Authored-By: Gerolf Seitz <gerolf.seitz@digitalasset.com>

* Apply suggestions from code review

Co-Authored-By: Gerolf Seitz <gerolf.seitz@digitalasset.com>

* address rebase issues

* Resolve compilation errors after rebase/merge

* happy formatting for scala and bazel and friends

* Drop "openWorld" setting from configuration

And refactor the tests to allocate parties explicitly.

* Fix up migration and tests

* Drop authorizedParticipantIds from configuration

Implement configuration authorization in kvutils using the previous
the participant id of the previous configuration.

* Post-rebase fixes

* Add missing migrations

* Apply suggestions from code review

Co-Authored-By: Gerolf Seitz <gerolf.seitz@digitalasset.com>

* Add missing mavenUpload to ledger-api-server

* Remove stateUpdateRecordTime

* Address code review

- Address PR review
- Merge TimeModelImpl and the traits. Remove TimeModel from ledger-api-common.
- Throw `Err` from KeyValueConsumption on parse errors instead of assert/sys.error

* Reformat

* Add missing protobuf file

* Fix compilation after TimeModel changes. Add version logs to participant-state{,-kvutils}.

* Fix TestUtil.scala build

* Apply suggestions from code review

Co-Authored-By: Gerolf Seitz <gerolf.seitz@digitalasset.com>

* Address review

- synchronized access to ledgerConfiguration in InMemoryLedger
- store rejection if configuration generation is wrong

* Update ledger/participant-state/protobuf/ledger_configuration.rst

Co-Authored-By: Gerolf Seitz <gerolf.seitz@digitalasset.com>
This commit is contained in:
Brian Healey 2019-11-27 11:41:23 -05:00 committed by Jussi Mäki
parent 677b30687f
commit 02186ef068
57 changed files with 1471 additions and 760 deletions

View File

@ -22,7 +22,8 @@ import com.digitalasset.ledger.api.v1.TransactionServiceOuterClass.{
import com.digitalasset.platform.common.LedgerIdMode
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.sandbox.services.SandboxServerResource
import com.digitalasset.platform.services.time.{TimeModel, TimeProviderType}
import com.digitalasset.platform.services.time.TimeProviderType
import com.daml.ledger.participant.state.v1.TimeModel
import io.grpc.Channel
import org.scalatest.Assertion

View File

@ -7,7 +7,6 @@ import java.time.{Duration, Instant}
import com.daml.ledger.participant.state.v1.TimeModel
import com.digitalasset.platform.server.api.validation.ErrorFactories
import com.digitalasset.platform.services.time.TimeModelChecker
import io.grpc.Status
import scala.util.{Failure, Success, Try}
@ -17,13 +16,11 @@ import scala.util.{Failure, Success, Try}
*/
final case class TimeModelValidator(model: TimeModel) extends ErrorFactories {
private val timeModelChecker = TimeModelChecker(model)
/**
* Wraps [[model.checkTtl]] with a StatusRuntimeException wrapper.
*/
def checkTtl(givenLedgerEffectiveTime: Instant, givenMaximumRecordTime: Instant): Try[Unit] = {
if (timeModelChecker.checkTtl(givenLedgerEffectiveTime, givenMaximumRecordTime))
if (model.checkTtl(givenLedgerEffectiveTime, givenMaximumRecordTime))
Success(())
else {
val givenTtl = Duration.between(givenLedgerEffectiveTime, givenMaximumRecordTime)
@ -46,10 +43,7 @@ final case class TimeModelValidator(model: TimeModel) extends ErrorFactories {
applicationId: String): Try[Unit] =
for {
_ <- checkTtl(givenLedgerEffectiveTime, givenMaximumRecordTime)
_ <- if (timeModelChecker.checkLet(
currentTime,
givenLedgerEffectiveTime,
givenMaximumRecordTime))
_ <- if (model.checkLet(currentTime, givenLedgerEffectiveTime, givenMaximumRecordTime))
Success(())
else
Failure(

View File

@ -1,85 +0,0 @@
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.services.time
import com.daml.ledger.participant.state.v1.{TimeModel => ITimeModel}
import com.daml.ledger.participant.state.v1.{TimeModelChecker => ITimeModelChecker}
import java.time.{Duration, Instant}
import scala.util.Try
/**
* The ledger time model and associated validations. Some values are given by constructor args; others are derived.
*
* @param minTransactionLatency The expected minimum latency of a transaction.
* @param maxClockSkew The maximum allowed clock skew between the ledger and clients.
* @param maxTtl The maximum allowed time to live for a transaction.
* Must be greater than the derived minimum time to live.
* @throws IllegalArgumentException if the parameters aren't valid
*/
case class TimeModel private (
val minTransactionLatency: Duration,
val maxClockSkew: Duration,
val maxTtl: Duration)
extends ITimeModel {
/**
* The minimum time to live for a transaction. Equal to the minimum transaction latency plus the maximum clock skew.
*/
val minTtl: Duration = minTransactionLatency.plus(maxClockSkew)
/**
* The maximum window after the current time when transaction ledger effective times will be accepted.
* Currently equal to the max clock skew.
* <p/>
* The corresponding past acceptance window is given by the command's TTL, and thus bounded inclusive by [[maxTtl]].
*/
val futureAcceptanceWindow: Duration = maxClockSkew
}
object TimeModel {
/**
* A default TimeModel that's reasonable for a test or sandbox ledger application.
* Serious applications (viz. ledger) should probably specify their own TimeModel.
*/
val reasonableDefault: TimeModel =
TimeModel(Duration.ofSeconds(1L), Duration.ofSeconds(1L), Duration.ofSeconds(30L)).get
def apply(
minTransactionLatency: Duration,
maxClockSkew: Duration,
maxTtl: Duration): Try[TimeModel] = Try {
require(!minTransactionLatency.isNegative, "Negative min transaction latency")
require(!maxTtl.isNegative, "Negative max TTL")
require(!maxClockSkew.isNegative, "Negative max clock skew")
require(!maxTtl.minus(maxClockSkew).isNegative, "Max TTL must be greater than max clock skew")
new TimeModel(minTransactionLatency, maxClockSkew, maxTtl)
}
}
case class TimeModelChecker(timeModel: ITimeModel) extends ITimeModelChecker {
import timeModel._
override def checkTtl(
givenLedgerEffectiveTime: Instant,
givenMaximumRecordTime: Instant): Boolean = {
val givenTtl = Duration.between(givenLedgerEffectiveTime, givenMaximumRecordTime)
!givenTtl.minus(minTtl).isNegative && !maxTtl.minus(givenTtl).isNegative
}
override def checkLet(
currentTime: Instant,
givenLedgerEffectiveTime: Instant,
givenMaximumRecordTime: Instant): Boolean = {
// Note that, contrary to the documented spec, the record time of a transaction is when it's sequenced.
// It turns out this isn't a problem for the participant or the sandbox,
// and MRT seems to be going away in Sirius anyway, so I've left it as is.
val lowerBound = givenLedgerEffectiveTime.minus(futureAcceptanceWindow)
!currentTime.isBefore(lowerBound) && !currentTime.isAfter(givenMaximumRecordTime)
}
}

View File

@ -6,15 +6,14 @@ package com.digitalasset.platform
import java.io.File
import java.nio.file.Path
import java.time.Duration
import ch.qos.logback.classic.Level
import com.digitalasset.daml.bazeltools.BazelRunfiles._
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.ledger.api.auth.AuthService
import com.digitalasset.platform.common.LedgerIdMode
import com.digitalasset.platform.sandbox.config.{CommandConfiguration, SandboxConfig}
import com.digitalasset.platform.services.time.{TimeModel, TimeProviderType}
import com.digitalasset.platform.services.time.TimeProviderType
import com.daml.ledger.participant.state.v1.TimeModel
import scala.concurrent.duration.{FiniteDuration, _}
import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.platform.apitesting.TestParties

View File

@ -4,7 +4,7 @@
load(
"//bazel_tools:scala.bzl",
"da_scala_library",
"da_scala_test_suite",
"da_scala_test",
)
da_scala_library(
@ -15,6 +15,9 @@ da_scala_library(
visibility = [
"//visibility:public",
],
exports = [
"//ledger/participant-state/protobuf:ledger_configuration_java_proto",
],
runtime_deps = [],
deps = [
"//daml-lf/archive:daml_lf_dev_archive_java_proto",
@ -23,6 +26,7 @@ da_scala_library(
"//daml-lf/transaction:transaction_java_proto",
"//daml-lf/transaction:value_java_proto",
"//ledger/ledger-api-domain",
"//ledger/participant-state/protobuf:ledger_configuration_java_proto",
"@maven//:com_google_guava_guava",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:com_typesafe_akka_akka_actor_2_12",
@ -32,3 +36,15 @@ da_scala_library(
"@maven//:io_grpc_grpc_services",
],
)
da_scala_test(
name = "participant-state-tests",
size = "small",
srcs = glob(["src/test/suite/**/*.scala"]),
resources = glob(["src/test/resources/*"]),
deps = [
":participant-state",
"//bazel_tools/runfiles:scala_runfiles",
"@maven//:org_scalatest_scalatest_2_12",
],
)

View File

@ -31,6 +31,7 @@ da_scala_library(
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/participant-state",
"//ledger/participant-state/protobuf:ledger_configuration_java_proto",
"@maven//:com_google_guava_guava",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:com_typesafe_akka_akka_actor_2_12",
@ -67,6 +68,7 @@ da_scala_test(
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/participant-state",
"//ledger/participant-state/protobuf:ledger_configuration_java_proto",
"@maven//:ch_qos_logback_logback_classic",
"@maven//:ch_qos_logback_logback_core",
"@maven//:com_google_protobuf_protobuf_java",
@ -83,10 +85,12 @@ da_scala_test(
proto_library(
name = "daml_kvutils_proto",
srcs = ["src/main/protobuf/daml_kvutils.proto"],
strip_import_prefix = "src/main/protobuf/",
deps = [
"//daml-lf/archive:daml_lf_dev_archive_proto",
"//daml-lf/transaction:transaction_proto",
"//daml-lf/transaction:value_proto",
"//ledger/participant-state/protobuf:ledger_configuration_proto",
"@com_google_protobuf//:duration_proto",
"@com_google_protobuf//:empty_proto",
"@com_google_protobuf//:timestamp_proto",

View File

@ -20,6 +20,7 @@ import "google/protobuf/duration.proto";
import "com/digitalasset/daml_lf_dev/daml_lf.proto";
import "com/digitalasset/daml/lf/transaction.proto";
import "com/digitalasset/daml/lf/value.proto";
import "com/daml/ledger/participant/state/ledger_configuration.proto";
// Envelope with which we wrap all kvutils messages that are sent over the network
@ -249,12 +250,15 @@ message DamlConfigurationSubmission {
// Implementers are free to select adequate mechanism e.g. UUID or similar.
string submission_id = 1;
// Submitting participant's id.
string participant_id = 2;
// The maximum record time after which the submission will be rejected.
// Allows submitter to control when the request times out and to retry.
google.protobuf.Timestamp maximum_record_time = 2;
google.protobuf.Timestamp maximum_record_time = 3;
// The new configuration that replaces the current configuration.
DamlConfiguration configuration = 3;
com.daml.ledger.participant.state.LedgerConfiguration configuration = 4;
}
// A log entry describing a rejected configuration change.
@ -263,8 +267,11 @@ message DamlConfigurationRejectionEntry {
// request with the result.
string submission_id = 1;
// Submitting participant's id.
string participant_id = 2;
// The new proposed configuration that was rejected.
DamlConfiguration configuration = 2;
com.daml.ledger.participant.state.LedgerConfiguration configuration = 3;
// A mismatch in the configuration generation, that is, the
// new configuration did not carry a generation that was one
@ -291,53 +298,26 @@ message DamlConfigurationRejectionEntry {
}
oneof reason {
ParticipantNotAuthorized participant_not_authorized = 3;
GenerationMismatch generation_mismatch = 4;
InvalidConfiguration invalid_configuration = 5;
TimedOut timed_out = 6;
ParticipantNotAuthorized participant_not_authorized = 4;
GenerationMismatch generation_mismatch = 5;
InvalidConfiguration invalid_configuration = 6;
TimedOut timed_out = 7;
}
}
// Configuration entry that records a configuration change.
// Also used in state to look up latest configuration.
// When a configuration exists, only the participant that
// submitted previously can change it.
message DamlConfigurationEntry {
// The submission from which this configuration originated.
string submission_id = 1;
// Submitting participant's id.
string participant_id = 2;
// The ledger configuration.
DamlConfiguration configuration = 2;
}
message DamlConfiguration {
// The configuration generation. If submitting a configuration the new generation
// must be one larger than previous configuration. This safe-guards against
// configuration changes that are based upon stale data.
int64 generation = 1;
// The ledger time model, specifying the bounds for
// ledger effective time and maximum record time of transactions.
DamlTimeModel time_model = 2;
// The identity of the participant that is allowed to change
// the ledger configuration.
// If unset the configuration can be changed by anyone.
string authorized_participant_id = 3;
// If the "open world" flag is true, then party allocations are not
// explicitly required. That is, a submission from a party that is
// not allocated is allowed. This setting is useful when testing.
bool open_world = 4;
}
message DamlTimeModel {
// The expected minimum latency of a transaction.
google.protobuf.Duration min_transaction_latency = 1;
// The maximum allowed clock skew between the ledger and clients.
google.protobuf.Duration max_clock_skew = 2;
// The maximum allowed time to live for a transaction.
// Must be greater than the derived minimum time to live.
google.protobuf.Duration max_ttl = 3;
com.daml.ledger.participant.state.LedgerConfiguration configuration = 3;
}
// An allocation of party name and assignment of a party to a given
@ -414,7 +394,7 @@ message DamlStateValue {
DamlCommandDedupValue command_dedup = 3;
DamlPartyAllocation party = 4;
DamlContractKeyState contract_key_state = 5;
DamlConfiguration configuration = 6;
DamlConfigurationEntry configuration_entry = 6;
}
}

View File

@ -1,87 +0,0 @@
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.backport
import java.time.{Duration, Instant}
import com.daml.ledger.participant.state.v1.{
TimeModel => ITimeModel,
TimeModelChecker => ITimeModelChecker
}
import scala.util.Try
/**
* The ledger time model and associated validations. Some values are given by constructor args; others are derived.
*
* @param minTransactionLatency The expected minimum latency of a transaction.
* @param maxClockSkew The maximum allowed clock skew between the ledger and clients.
* @param maxTtl The maximum allowed time to live for a transaction.
* Must be greater than the derived minimum time to live.
* @throws IllegalArgumentException if the parameters aren't valid
*/
case class TimeModel private (
minTransactionLatency: Duration,
maxClockSkew: Duration,
maxTtl: Duration)
extends ITimeModel {
/**
* The minimum time to live for a transaction. Equal to the minimum transaction latency plus the maximum clock skew.
*/
val minTtl: Duration = minTransactionLatency.plus(maxClockSkew)
/**
* The maximum window after the current time when transaction ledger effective times will be accepted.
* Currently equal to the max clock skew.
* <p/>
* The corresponding past acceptance window is given by the command's TTL, and thus bounded inclusive by [[maxTtl]].
*/
val futureAcceptanceWindow: Duration = maxClockSkew
}
object TimeModel {
/**
* A default TimeModel that's reasonable for a test or sandbox ledger application.
* Serious applications (viz. ledger) should probably specify their own TimeModel.
*/
val reasonableDefault: TimeModel =
TimeModel(Duration.ofSeconds(1L), Duration.ofSeconds(1L), Duration.ofSeconds(30L)).get
def apply(
minTransactionLatency: Duration,
maxClockSkew: Duration,
maxTtl: Duration): Try[TimeModel] = Try {
require(!minTransactionLatency.isNegative, "Negative min transaction latency")
require(!maxTtl.isNegative, "Negative max TTL")
require(!maxClockSkew.isNegative, "Negative max clock skew")
require(!maxTtl.minus(maxClockSkew).isNegative, "Max TTL must be greater than max clock skew")
new TimeModel(minTransactionLatency, maxClockSkew, maxTtl)
}
}
case class TimeModelChecker(timeModel: ITimeModel) extends ITimeModelChecker {
import timeModel._
override def checkTtl(
givenLedgerEffectiveTime: Instant,
givenMaximumRecordTime: Instant): Boolean = {
val givenTtl = Duration.between(givenLedgerEffectiveTime, givenMaximumRecordTime)
!givenTtl.minus(minTtl).isNegative && !maxTtl.minus(givenTtl).isNegative
}
override def checkLet(
currentTime: Instant,
givenLedgerEffectiveTime: Instant,
givenMaximumRecordTime: Instant): Boolean = {
// Note that, contrary to the documented spec, the record time of a transaction is when it's sequenced.
// It turns out this isn't a problem for the participant or the sandbox,
// and MRT seems to be going away in Sirius anyway, so I've left it as is.
val lowerBound = givenLedgerEffectiveTime.minus(futureAcceptanceWindow)
!currentTime.isBefore(lowerBound) && !currentTime.isAfter(givenMaximumRecordTime)
}
}

View File

@ -5,14 +5,8 @@ package com.daml.ledger.participant.state.kvutils
import java.time.{Duration, Instant}
import com.daml.ledger.participant.state.backport.TimeModel
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.v1.{
Configuration,
PackageId,
SubmittedTransaction,
SubmitterInfo
}
import com.daml.ledger.participant.state.v1.{PackageId, SubmittedTransaction, SubmitterInfo}
import com.digitalasset.daml.lf.data.Ref.{ContractIdString, LedgerString, Party}
import com.digitalasset.daml.lf.data.Time
import com.digitalasset.daml.lf.transaction.Node.GlobalKey
@ -29,7 +23,7 @@ import com.digitalasset.daml.lf.value.{Value, ValueCoder, ValueOuterClass}
import com.google.common.io.BaseEncoding
import com.google.protobuf.{ByteString, Empty}
import scala.util.{Failure, Success, Try}
import scala.util.Try
/** Internal utilities for converting between protobuf messages and our scala
* data structures.
@ -172,51 +166,6 @@ private[kvutils] object Conversions {
maxRecordTime = parseTimestamp(subInfo.getMaximumRecordTime)
)
def buildDamlConfiguration(config: Configuration): DamlConfiguration = {
val tm = config.timeModel
DamlConfiguration.newBuilder
.setGeneration(config.generation)
.setAuthorizedParticipantId(config.authorizedParticipantId.fold("")(identity))
.setOpenWorld(config.openWorld)
.setTimeModel(
DamlTimeModel.newBuilder
.setMaxClockSkew(buildDuration(tm.maxClockSkew))
.setMinTransactionLatency(buildDuration(tm.minTransactionLatency))
.setMaxTtl(buildDuration(tm.maxTtl))
)
.build
}
def parseDamlConfiguration(config: DamlConfiguration): Try[Configuration] =
for {
tm <- if (config.hasTimeModel)
Success(config.getTimeModel)
else
Failure(Err.DecodeError("Configuration", "No time model"))
parsedTM <- TimeModel(
maxClockSkew = parseDuration(tm.getMaxClockSkew),
minTransactionLatency = parseDuration(tm.getMinTransactionLatency),
maxTtl = parseDuration(tm.getMaxTtl)
)
authPidString = config.getAuthorizedParticipantId
authPid <- if (authPidString.isEmpty)
Success(None)
else
LedgerString
.fromString(config.getAuthorizedParticipantId)
.fold(
err => Failure(Err.DecodeError("Configuration", err)),
ls => Success(Some(ls))
)
parsedConfig = Configuration(
generation = config.getGeneration,
timeModel = parsedTM,
authorizedParticipantId = authPid,
openWorld = config.getOpenWorld
)
} yield parsedConfig
def buildTimestamp(ts: Time.Timestamp): com.google.protobuf.Timestamp = {
val instant = ts.toInstant
com.google.protobuf.Timestamp.newBuilder

View File

@ -14,7 +14,6 @@ import akka.actor.{Actor, ActorSystem, PoisonPill, Props}
import akka.pattern.gracefulStop
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.daml.ledger.participant.state.backport.TimeModel
import com.daml.ledger.participant.state.kvutils.{DamlKvutils => Proto}
import com.daml.ledger.participant.state.v1.{UploadPackagesResult, _}
import com.digitalasset.daml.lf.data.Ref
@ -96,8 +95,7 @@ object InMemoryKVParticipantState {
class InMemoryKVParticipantState(
val participantId: ParticipantId,
val ledgerId: LedgerString = Ref.LedgerString.assertFromString(UUID.randomUUID.toString),
file: Option[File] = None,
openWorld: Boolean = true)(implicit system: ActorSystem, mat: Materializer)
file: Option[File] = None)(implicit system: ActorSystem, mat: Materializer)
extends ReadService
with WriteService
with AutoCloseable {
@ -111,9 +109,7 @@ class InMemoryKVParticipantState(
// The initial ledger configuration
private val initialLedgerConfig = Configuration(
generation = 0,
timeModel = TimeModel.reasonableDefault,
authorizedParticipantId = Some(participantId),
openWorld = openWorld
timeModel = TimeModel.reasonableDefault
)
// DAML Engine for transaction validation.
@ -252,7 +248,7 @@ class InMemoryKVParticipantState(
case Right(_) => sys.error("Unexpected message in envelope")
}
val state = stateRef
val newRecordTime = getNewRecordTime()
val newRecordTime = getNewRecordTime
if (state.store.contains(entryId.getEntryId)) {
// The entry identifier already in use, drop the message and let the
@ -320,7 +316,7 @@ class InMemoryKVParticipantState(
// This source stops when the actor dies.
val _ = Source
.tick(HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, ())
.map(_ => CommitHeartbeat(getNewRecordTime()))
.map(_ => CommitHeartbeat(getNewRecordTime))
.to(Sink.actorRef(actorRef, onCompleteMessage = ()))
.run()
@ -548,7 +544,7 @@ class InMemoryKVParticipantState(
* at which this class has been instantiated.
*/
private val initialConditions =
LedgerInitialConditions(ledgerId, initialLedgerConfig, getNewRecordTime())
LedgerInitialConditions(ledgerId, initialLedgerConfig, getNewRecordTime)
/** Get a new record time for the ledger from the system clock.
* Public for use from integration tests.
@ -563,7 +559,8 @@ class InMemoryKVParticipantState(
config: Configuration): CompletionStage[SubmissionResult] =
CompletableFuture.completedFuture({
val submission =
KeyValueSubmission.configurationToSubmission(maxRecordTime, submissionId, config)
KeyValueSubmission
.configurationToSubmission(maxRecordTime, submissionId, participantId, config)
commitActorRef ! CommitSubmission(
allocateEntryId,
Envelope.enclose(submission)

View File

@ -55,6 +55,10 @@ object KeyValueCommitting {
* Resolved input state specified in submission. Optional to mark that input state was resolved
* but not present. Specifically we require the command de-duplication input to be resolved, but don't
* expect to be present.
* We also do not trust the submitter to provide the correct list of input keys and we need
* to verify that an input actually does not exist and was not just included in inputs.
* For example when committing a configuration we need the current configuration to authorize
* the submission.
* @return Log entry to be committed and the DAML state updates to be applied.
*/
@throws(classOf[Err])

View File

@ -7,7 +7,6 @@ import com.daml.ledger.participant.state.kvutils.Conversions._
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.data.Ref.{LedgerString, Party}
import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.ledger.api.domain.PartyDetails
import com.google.common.io.BaseEncoding
@ -30,16 +29,17 @@ object KeyValueConsumption {
def packDamlLogEntry(entry: DamlStateKey): ByteString = entry.toByteString
def unpackDamlLogEntry(bytes: ByteString): DamlLogEntry = DamlLogEntry.parseFrom(bytes)
/** Construct a participant-state [[Update]] from a [[DamlLogEntry]].
/** Construct participant-state [[Update]]s from a [[DamlLogEntry]].
* Throws [[Err]] exception on badly formed data.
*
* This method is expected to be used to implement [[com.daml.ledger.participant.state.v1.ReadService.stateUpdates]].
*
* @param entryId: The log entry identifier.
* @param entry: The log entry.
* @return [[Update]] constructed from log entry.
* @return [[Update]]s constructed from log entry.
*/
@throws(classOf[Err])
def logEntryToUpdate(entryId: DamlLogEntryId, entry: DamlLogEntry): List[Update] = {
val recordTime = parseTimestamp(entry.getRecordTime)
entry.getPayloadCase match {
@ -50,7 +50,7 @@ object KeyValueConsumption {
if (entry.getPackageUploadEntry.getSourceDescription.nonEmpty)
Some(entry.getPackageUploadEntry.getSourceDescription)
else None,
Ref.LedgerString.assertFromString(entry.getPackageUploadEntry.getParticipantId),
parseLedgerString("ParticipantId")(entry.getPackageUploadEntry.getParticipantId),
recordTime
)
}(breakOut)
@ -59,13 +59,11 @@ object KeyValueConsumption {
List.empty
case DamlLogEntry.PayloadCase.PARTY_ALLOCATION_ENTRY =>
val pae = entry.getPartyAllocationEntry
val party = parseParty(pae.getParty)
val participantId = parseLedgerString("ParticipantId")(pae.getParticipantId)
List(
Update.PartyAddedToParticipant(
Party.assertFromString(entry.getPartyAllocationEntry.getParty),
entry.getPartyAllocationEntry.getDisplayName,
Ref.LedgerString.assertFromString(entry.getPartyAllocationEntry.getParticipantId),
recordTime
)
Update.PartyAddedToParticipant(party, pae.getDisplayName, participantId, recordTime)
)
case DamlLogEntry.PayloadCase.PARTY_ALLOCATION_REJECTION_ENTRY =>
@ -76,18 +74,36 @@ object KeyValueConsumption {
case DamlLogEntry.PayloadCase.CONFIGURATION_ENTRY =>
val configEntry = entry.getConfigurationEntry
val newConfig = parseDamlConfiguration(configEntry.getConfiguration).get
List(Update.ConfigurationChanged(configEntry.getSubmissionId, newConfig))
val newConfig = Configuration
.decode(configEntry.getConfiguration)
.fold(err => throw Err.DecodeError("Configuration", err), identity)
val participantId =
parseLedgerString("ParticipantId")(configEntry.getParticipantId)
List(
Update.ConfigurationChanged(
recordTime,
configEntry.getSubmissionId,
participantId,
newConfig
)
)
case DamlLogEntry.PayloadCase.CONFIGURATION_REJECTION_ENTRY =>
val rejection = entry.getConfigurationRejectionEntry
val proposedConfig = rejection.getConfiguration
val proposedConfig = Configuration
.decode(rejection.getConfiguration)
.fold(err => throw Err.DecodeError("Configuration", err), identity)
val participantId =
parseLedgerString("ParticipantId")(rejection.getParticipantId)
List(
Update.ConfigurationChangeRejected(
recordTime = recordTime,
submissionId = rejection.getSubmissionId,
reason = rejection.getReasonCase match {
participantId = participantId,
proposedConfiguration = proposedConfig,
rejectionReason = rejection.getReasonCase match {
case DamlConfigurationRejectionEntry.ReasonCase.GENERATION_MISMATCH =>
s"Generation mismatch: ${proposedConfig.getGeneration} != ${rejection.getGenerationMismatch.getExpectedGeneration}"
s"Generation mismatch: ${proposedConfig.generation} != ${rejection.getGenerationMismatch.getExpectedGeneration}"
case DamlConfigurationRejectionEntry.ReasonCase.INVALID_CONFIGURATION =>
s"Invalid configuration: ${rejection.getInvalidConfiguration.getError}"
case DamlConfigurationRejectionEntry.ReasonCase.PARTICIPANT_NOT_AUTHORIZED =>
@ -103,10 +119,12 @@ object KeyValueConsumption {
))
case DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY =>
List(transactionRejectionEntryToUpdate(entry.getTransactionRejectionEntry))
List(
transactionRejectionEntryToUpdate(recordTime, entry.getTransactionRejectionEntry)
)
case DamlLogEntry.PayloadCase.PAYLOAD_NOT_SET =>
sys.error("entryToUpdate: PAYLOAD_NOT_SET!")
throw Err.InternalError("logEntryToUpdate: PAYLOAD_NOT_SET!")
}
}
@ -148,7 +166,7 @@ object KeyValueConsumption {
entry.getPartyAllocationEntry.getSubmissionId,
PartyAllocationResult.Ok(
PartyDetails(
Party.assertFromString(entry.getPartyAllocationEntry.getParty),
parseParty(entry.getPartyAllocationEntry.getParty),
if (entry.getPartyAllocationEntry.getDisplayName.isEmpty)
None
else
@ -178,14 +196,15 @@ object KeyValueConsumption {
None
case DamlLogEntry.PayloadCase.PAYLOAD_NOT_SET =>
sys.error("logEntryToAsyncResponse: PAYLOAD_NOT_SET!")
throw Err.InternalError("logEntryToAsyncResponse: PAYLOAD_NOT_SET!")
}
}
private def transactionRejectionEntryToUpdate(
rejEntry: DamlTransactionRejectionEntry): Update.CommandRejected = {
recordTime: Timestamp,
rejEntry: DamlTransactionRejectionEntry): Update.CommandRejected =
Update.CommandRejected(
recordTime = recordTime,
submitterInfo = parseSubmitterInfo(rejEntry.getSubmitterInfo),
reason = rejEntry.getReasonCase match {
case DamlTransactionRejectionEntry.ReasonCase.DISPUTED =>
@ -205,10 +224,9 @@ object KeyValueConsumption {
rejEntry.getSubmitterCannotActViaParticipant.getDetails
)
case DamlTransactionRejectionEntry.ReasonCase.REASON_NOT_SET =>
sys.error("transactionRejectionEntryToUpdate: REASON_NOT_SET!")
throw Err.InternalError("transactionRejectionEntryToUpdate: REASON_NOT_SET!")
}
)
}
private def partyRejectionEntryToAsyncResponse(
rejEntry: DamlPartyAllocationRejectionEntry): PartyAllocationResponse = {
@ -223,7 +241,7 @@ object KeyValueConsumption {
case DamlPartyAllocationRejectionEntry.ReasonCase.PARTICIPANT_NOT_AUTHORIZED =>
PartyAllocationResult.ParticipantNotAuthorized
case DamlPartyAllocationRejectionEntry.ReasonCase.REASON_NOT_SET =>
sys.error("partyRejectionEntryToUpdate: REASON_NOT_SET!")
throw Err.InternalError("partyRejectionEntryToUpdate: REASON_NOT_SET!")
}
)
}
@ -239,7 +257,7 @@ object KeyValueConsumption {
case DamlPackageUploadRejectionEntry.ReasonCase.PARTICIPANT_NOT_AUTHORIZED =>
UploadPackagesResult.ParticipantNotAuthorized
case DamlPackageUploadRejectionEntry.ReasonCase.REASON_NOT_SET =>
sys.error("rejectionEntryToUpdate: REASON_NOT_SET!")
throw Err.InternalError("rejectionEntryToUpdate: REASON_NOT_SET!")
}
)
}
@ -250,14 +268,16 @@ object KeyValueConsumption {
txEntry: DamlTransactionEntry,
recordTime: Timestamp): Update.TransactionAccepted = {
val relTx = Conversions.decodeTransaction(txEntry.getTransaction)
val hexTxId = LedgerString.assertFromString(BaseEncoding.base16.encode(entryId.toByteArray))
val hexTxId = parseLedgerString("TransactionId")(
BaseEncoding.base16.encode(entryId.toByteArray)
)
Update.TransactionAccepted(
optSubmitterInfo = Some(parseSubmitterInfo(txEntry.getSubmitterInfo)),
transactionMeta = TransactionMeta(
ledgerEffectiveTime = parseTimestamp(txEntry.getLedgerEffectiveTime),
workflowId =
Some(txEntry.getWorkflowId).filter(_.nonEmpty).map(LedgerString.assertFromString),
workflowId = Some(txEntry.getWorkflowId)
.filter(_.nonEmpty)
.map(parseLedgerString("WorkflowId")),
),
transaction = makeCommittedTransaction(entryId, relTx),
transactionId = hexTxId,
@ -277,4 +297,16 @@ object KeyValueConsumption {
)
}
@throws(classOf[Err])
private def parseLedgerString(what: String)(s: String): Ref.LedgerString =
Ref.LedgerString
.fromString(s)
.fold(err => throw Err.DecodeError(what, "Cannot parse '$s': $err"), identity)
@throws(classOf[Err])
private def parseParty(s: String): Ref.Party =
Ref.Party
.fromString(s)
.fold(err => throw Err.DecodeError("Party", "Cannot parse '$s': $err"), identity)
}

View File

@ -112,6 +112,7 @@ object KeyValueSubmission {
def configurationToSubmission(
maxRecordTime: Timestamp,
submissionId: String,
participantId: String,
config: Configuration): DamlSubmission = {
val tm = config.timeModel
DamlSubmission.newBuilder
@ -119,8 +120,9 @@ object KeyValueSubmission {
.setConfigurationSubmission(
DamlConfigurationSubmission.newBuilder
.setSubmissionId(submissionId)
.setParticipantId(participantId)
.setMaximumRecordTime(buildTimestamp(maxRecordTime))
.setConfiguration(buildDamlConfiguration(config))
.setConfiguration(Configuration.encode(config))
)
.build
}

View File

@ -7,16 +7,21 @@ package com.daml.ledger.participant.state.kvutils
* and the changelog of kvutils.
*
* Changes:
* [since 100.13.29]:
* [after 100.13.37]:
* - Removed DamlConfiguration in favour of participant-state's LedgerConfiguration.
* - Authorization of configuration changes is now based on validating against the participant id
* of the previously submitted configuration.
*
* [after 100.13.29]:
* - Add support for ledger dumps via environment variable: "KVUTILS_LEDGER_DUMP=/tmp/ledger.dump".
* - Add integrity checker tool to verify ledger dumps for validating compatibility of new versions.
*
* [since 100.13.26]:
* [after 100.13.26]:
* - Added metrics to track submission processing.
* - Use InsertOrdMap to store resulting state in kvutils for deterministic ordering of state key-values.
* - Fix bug with transient contract keys, e.g. keys created and archived in same transaction.
*
* [since 100.13.21]:
* [after 100.13.21]:
* - Added 'Envelope' for compressing and versioning kvutils messages that are transmitted
* or stored on disk. [[Envelope.enclose]] and [[Envelope.open]] should be now used for
* submissions and for results from processing them.
@ -24,7 +29,7 @@ package com.daml.ledger.participant.state.kvutils
* time model is being redesigned and the checks will be reimplemented once we have
* the new design.
*
* [since 100.13.16]: *BACKWARDS INCOMPATIBLE*
* [after 100.13.16]: *BACKWARDS INCOMPATIBLE*
* - Log entries are no longer used as inputs to submission processing. The
* contract instance is now stored within DamlContractStatus.
* - Configuration extended with "Open World" flag that defines whether
@ -36,5 +41,11 @@ package com.daml.ledger.participant.state.kvutils
* - Bug in command deduplication fixed: rejected commands are now deduplicated correctly.
*/
object Version {
/** The kvutils version number. Packing kvutils messages into envelopes carries the version number.
* Version should be incremented when semantics of fields change and migration of data is required or
* when the protobuf default value for a field is insufficient and must be filled in during decoding.
* Handling of older versions is handled by [[Envelope.open]] which performs the migration to latest version.
*/
val version: Long = 0
}

View File

@ -130,7 +130,7 @@ private[kvutils] case class PackageCommitter(engine: Engine)
*/
private def preload(submissionId: String, archives: Iterable[Archive]): Runnable = { () =>
val ctx = Metrics.preloadTimer.time()
def trace(msg: String): Unit = logger.trace("[submissionId=$submissionId]: " + msg)
def trace(msg: String): Unit = logger.trace(s"[submissionId=$submissionId]: " + msg)
try {
val loadedPackages = engine.compiledPackages().packageIds
val packages: Map[Ref.PackageId, Ast.Package] = Metrics.decodeTimer.time { () =>

View File

@ -9,7 +9,8 @@ import com.codahale.metrics
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlLogEntry,
DamlStateKey,
DamlStateValue
DamlStateValue,
DamlConfigurationEntry
}
import com.daml.ledger.participant.state.kvutils.{Conversions, Err}
import com.daml.ledger.participant.state.v1.Configuration
@ -173,17 +174,23 @@ private[kvutils] object Common {
def getCurrentConfiguration(
defaultConfig: Configuration,
inputState: Map[DamlStateKey, Option[DamlStateValue]],
logger: Logger): Configuration =
logger: Logger): (Option[DamlConfigurationEntry], Configuration) =
inputState
.get(Conversions.configurationStateKey)
.flatten
.getOrElse(
Conversions.configurationStateKey,
/* If we're retrieving configuration, we require it to at least
* have been declared as an input by the submitter as it is used
* to authorize configuration changes. */
throw Err.MissingInputState(Conversions.configurationStateKey)
)
.flatMap { v =>
Conversions
.parseDamlConfiguration(v.getConfiguration)
val entry = v.getConfigurationEntry
Configuration
.decode(entry.getConfiguration)
.fold({ err =>
logger.error(s"Failed to parse configuration: $err, using default configuration.")
None
}, Some(_))
}, conf => Some(Some(entry) -> conf))
}
.getOrElse(defaultConfig)
.getOrElse(None -> defaultConfig)
}

View File

@ -26,7 +26,7 @@ private[kvutils] case class ProcessConfigSubmission(
private implicit val logger =
LoggerFactory.getLogger(
s"ProcessConfigSubmission[entryId=${Pretty.prettyEntryId(entryId)}, submId=${configSubmission.getSubmissionId}]")
private val currentConfig =
private val (currentConfigEntry, currentConfig) =
Common.getCurrentConfiguration(defaultConfig, inputState, logger)
private val newConfig = configSubmission.getConfiguration
@ -41,7 +41,7 @@ private[kvutils] case class ProcessConfigSubmission(
)
}
private def checkTtl(): Commit[Unit] = delay {
private val checkTtl: Commit[Unit] = delay {
// Check the maximum record time against the record time of the commit.
// This mechanism allows the submitter to detect lost submissions and retry
// with a submitter controlled rate.
@ -49,128 +49,134 @@ private[kvutils] case class ProcessConfigSubmission(
if (recordTime > maxRecordTime) {
logger.warn(
s"Rejected configuration submission. The submission timed out ($recordTime > $maxRecordTime)")
rejectTimedOut
reject(
_.setTimedOut(
DamlConfigurationRejectionEntry.TimedOut.newBuilder
.setMaximumRecordTime(configSubmission.getMaximumRecordTime)
))
} else {
pass
}
}
private def authorizeSubmission(): Commit[Unit] = delay {
private val authorizeSubmission: Commit[Unit] = delay {
// Submission is authorized when:
// 1) The authorized participant is unset
// 2) The authorized participant matches the submitting participant.
val authorized =
currentConfig.authorizedParticipantId
.fold(true)(authPid => authPid == participantId)
// the provided participant id matches source participant id
// AND (
// there exists no current configuration
// OR the current configuration's participant matches the submitting participant.
// )
val submittingParticipantId = configSubmission.getParticipantId
val wellFormed = participantId == submittingParticipantId
if (!authorized) {
val authorized =
currentConfigEntry.forall(_.getParticipantId == participantId)
if (!wellFormed) {
logger.warn(
s"Rejected configuration submission. Authorized participant (${currentConfig.authorizedParticipantId}) does not match submitting participant $participantId.")
rejectParticipantNotAuthorized
s"Rejected configuration submission. Submitting participant $submittingParticipantId does not match request participant $participantId")
reject(
_.setParticipantNotAuthorized(
DamlConfigurationRejectionEntry.ParticipantNotAuthorized.newBuilder
.setDetails(
s"Participant $participantId in request is not the submitting participant $submittingParticipantId"
)
))
} else if (!authorized) {
logger.warn(s"Rejected configuration submission. $participantId is not authorized.")
reject(
_.setParticipantNotAuthorized(
DamlConfigurationRejectionEntry.ParticipantNotAuthorized.newBuilder
.setDetails(
"Participant $participantId is not an authorized participant"
)
))
} else {
pass
}
}
private def validateSubmission(): Commit[Unit] =
parseDamlConfiguration(newConfig)
.fold(exc => rejectInvalidConfiguration(exc.getMessage), pure)
private val validateSubmission: Commit[Unit] =
Configuration
.decode(newConfig)
.fold(
err =>
reject(
_.setInvalidConfiguration(
DamlConfigurationRejectionEntry.InvalidConfiguration.newBuilder
.setError(err))),
pure)
.flatMap { config =>
if (config.generation != (1 + currentConfig.generation))
rejectGenerationMismatch(1 + currentConfig.generation)
reject(
_.setGenerationMismatch(DamlConfigurationRejectionEntry.GenerationMismatch.newBuilder
.setExpectedGeneration(1 + currentConfig.generation)))
else
pass
}
private def buildLogEntry(): Commit[Unit] = sequence2(
private def buildLogEntry(): Commit[Unit] = {
val configEntry = DamlConfigurationEntry.newBuilder
.setSubmissionId(configSubmission.getSubmissionId)
.setParticipantId(participantId)
.setConfiguration(configSubmission.getConfiguration)
sequence2(
delay {
Metrics.accepts.inc()
logger.trace(s"New configuration with generation ${newConfig.getGeneration} accepted.")
set(
configurationStateKey ->
DamlStateValue.newBuilder
.setConfiguration(newConfig)
.setConfigurationEntry(configEntry)
.build)
},
done(
DamlLogEntry.newBuilder
.setRecordTime(buildTimestamp(recordTime))
.setConfigurationEntry(DamlConfigurationEntry.newBuilder
.setSubmissionId(configSubmission.getSubmissionId)
.setConfiguration(configSubmission.getConfiguration))
.build)
.setConfigurationEntry(configEntry)
.build
)
)
}
private def rejectGenerationMismatch(expected: Long): Commit[Unit] = {
Metrics.rejections.inc()
done(
DamlLogEntry.newBuilder
.setConfigurationRejectionEntry(
DamlConfigurationRejectionEntry.newBuilder
.setSubmissionId(configSubmission.getSubmissionId)
.setConfiguration(configSubmission.getConfiguration)
.setGenerationMismatch(
private def rejectGenerationMismatch(expected: Long): Commit[Unit] =
reject {
_.setGenerationMismatch(
DamlConfigurationRejectionEntry.GenerationMismatch.newBuilder
.setExpectedGeneration(expected)
)
)
.build
)
}
private def rejectInvalidConfiguration(error: String): Commit[Configuration] = {
Metrics.rejections.inc()
done(
DamlLogEntry.newBuilder
.setConfigurationRejectionEntry(
DamlConfigurationRejectionEntry.newBuilder
.setSubmissionId(configSubmission.getSubmissionId)
.setConfiguration(configSubmission.getConfiguration)
.setInvalidConfiguration(
DamlConfigurationRejectionEntry.InvalidConfiguration.newBuilder
.setError(error)
)
)
.build
)
}
private def rejectParticipantNotAuthorized[A]: Commit[A] = {
Metrics.rejections.inc()
done(
DamlLogEntry.newBuilder
.setConfigurationRejectionEntry(
DamlConfigurationRejectionEntry.newBuilder
.setSubmissionId(configSubmission.getSubmissionId)
.setConfiguration(configSubmission.getConfiguration)
.setParticipantNotAuthorized(
DamlConfigurationRejectionEntry.ParticipantNotAuthorized.newBuilder
.setDetails(
s"Participant $participantId is not the authorized participant " +
currentConfig.authorizedParticipantId.getOrElse("<missing>")
)
)
)
.build
)
}
private def rejectTimedOut[A]: Commit[A] = {
Metrics.rejections.inc()
done(
DamlLogEntry.newBuilder
.setConfigurationRejectionEntry(
DamlConfigurationRejectionEntry.newBuilder
.setSubmissionId(configSubmission.getSubmissionId)
.setConfiguration(configSubmission.getConfiguration)
.setTimedOut(
private def rejectTimedOut[A]: Commit[A] =
reject {
_.setTimedOut(
DamlConfigurationRejectionEntry.TimedOut.newBuilder
.setMaximumRecordTime(configSubmission.getMaximumRecordTime)
)
}
private def reject[A](
addReason: DamlConfigurationRejectionEntry.Builder => DamlConfigurationRejectionEntry.Builder)
: Commit[A] = {
Metrics.rejections.inc()
done(
DamlLogEntry.newBuilder
.setConfigurationRejectionEntry(
addReason(
DamlConfigurationRejectionEntry.newBuilder
.setSubmissionId(configSubmission.getSubmissionId)
.setParticipantId(participantId)
.setConfiguration(configSubmission.getConfiguration)
)
)
.build
)
}
}
private[kvutils] object ProcessConfigSubmission {

View File

@ -5,7 +5,6 @@ package com.daml.ledger.participant.state.kvutils.committing
import com.codahale.metrics
import com.codahale.metrics.{Counter, Timer}
import com.daml.ledger.participant.state.backport.TimeModelChecker
import com.daml.ledger.participant.state.kvutils.Conversions.{buildTimestamp, commandDedupKey, _}
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.{Conversions, Err, InputsAndEffects, Pretty}
@ -55,7 +54,7 @@ private[kvutils] case class ProcessTransactionSubmission(
// -------------------------------------------------------------------------------
private val config: Configuration =
private val (_, config) =
Common.getCurrentConfiguration(defaultConfig, inputState, logger)
private val txLet = parseTimestamp(txEntry.getLedgerEffectiveTime)
@ -120,19 +119,16 @@ private[kvutils] case class ProcessTransactionSubmission(
RejectionReason.SubmitterCannotActViaParticipant(
s"Party '$submitter' not hosted by participant $participantId"))
case None =>
if (config.openWorld)
pass
else
reject(RejectionReason.PartyNotKnownOnLedger)
}
/** Validate ledger effective time and the command's time-to-live. */
private def validateLetAndTtl: Commit[Unit] = delay {
val timeModelChecker = TimeModelChecker(config.timeModel)
val timeModel = config.timeModel
val givenLET = txLet.toInstant
val givenMRT = parseTimestamp(txEntry.getSubmitterInfo.getMaximumRecordTime).toInstant
if (timeModelChecker.checkLet(
if (timeModel.checkLet(
currentTime = recordTime.toInstant,
givenLedgerEffectiveTime = givenLET,
givenMaximumRecordTime = givenMRT)

View File

@ -8,7 +8,6 @@ import java.time.Duration
import java.util.UUID
import akka.stream.scaladsl.Sink
import com.daml.ledger.participant.state.backport.TimeModel
import com.daml.ledger.participant.state.kvutils.InMemoryKVParticipantStateIT._
import com.daml.ledger.participant.state.v1.Update.{PartyAddedToParticipant, PublicPackageUploaded}
import com.daml.ledger.participant.state.v1._
@ -23,6 +22,7 @@ import com.digitalasset.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import org.scalatest.Assertions._
import org.scalatest.{Assertion, AsyncWordSpec, BeforeAndAfterEach}
import scala.concurrent.Future
import scala.compat.java8.FutureConverters._
import scala.util.Try
@ -45,10 +45,14 @@ class InMemoryKVParticipantStateIT
super.afterEach()
}
"In-memory implementation" should {
private def allocateParty(hint: String): Future[Party] = {
ps.allocateParty(Some(hint), None).toScala.flatMap {
case PartyAllocationResult.Ok(details) => Future.successful(details.party)
case err => Future.failed(new RuntimeException("failed to allocate party: $err"))
}
}
// FIXME(JM): Setup fixture for the participant-state
// creation & teardown!
"In-memory implementation" should {
"return initial conditions" in {
for {
@ -204,51 +208,59 @@ class InMemoryKVParticipantStateIT
"provide update after transaction submission" in {
val rt = ps.getNewRecordTime()
val updateResult = ps.stateUpdates(beginAfter = None).runWith(Sink.head)
ps.submitTransaction(submitterInfo(rt), transactionMeta(rt), emptyTransaction)
updateResult.map {
case (offset, _) =>
assert(offset == Offset(Array(0L, 0L)))
for {
alice <- allocateParty("alice")
_ <- ps
.submitTransaction(submitterInfo(rt, alice), transactionMeta(rt), emptyTransaction)
.toScala
update <- ps.stateUpdates(beginAfter = None).drop(1).runWith(Sink.head)
} yield {
assert(update._1 == Offset(Array(1L, 0L)))
}
}
"reject duplicate commands" in {
val rt = ps.getNewRecordTime()
val updatesResult = ps.stateUpdates(beginAfter = None).take(2).runWith(Sink.seq)
for {
alice <- allocateParty("alice")
_ <- ps
.submitTransaction(submitterInfo(rt, alice), transactionMeta(rt), emptyTransaction)
.toScala
_ <- ps
.submitTransaction(submitterInfo(rt, alice), transactionMeta(rt), emptyTransaction)
.toScala
updates <- ps.stateUpdates(beginAfter = None).take(3).runWith(Sink.seq)
} yield {
val (offset0, update0) = updates(0)
assert(offset0 == Offset(Array(0L, 0L)))
assert(update0.isInstanceOf[Update.PartyAddedToParticipant])
ps.submitTransaction(submitterInfo(rt), transactionMeta(rt), emptyTransaction)
ps.submitTransaction(submitterInfo(rt), transactionMeta(rt), emptyTransaction)
updatesResult.map { updates =>
val (offset1, update1) = updates.head
val (offset2, update2) = updates(1)
assert(offset1 == Offset(Array(0L, 0L)))
val (offset1, update1) = updates(1)
assert(offset1 == Offset(Array(1L, 0L)))
assert(update1.isInstanceOf[Update.TransactionAccepted])
assert(offset2 == Offset(Array(1L, 0L)))
assert(update2.isInstanceOf[Update.CommandRejected])
assert(
update2
.asInstanceOf[Update.CommandRejected]
.reason == RejectionReason.DuplicateCommand)
val (offset2, update2) = updates(2)
assert(offset2 == Offset(Array(2L, 0L)))
}
}
"return second update with beginAfter=0" in {
val rt = ps.getNewRecordTime()
val updateResult =
ps.stateUpdates(beginAfter = Some(Offset(Array(0L, 0L)))).runWith(Sink.head)
ps.submitTransaction(submitterInfo(rt), transactionMeta(rt), emptyTransaction)
ps.submitTransaction(submitterInfo(rt), transactionMeta(rt), emptyTransaction)
updateResult.map {
case (offset, update) =>
assert(offset == Offset(Array(1L, 0L)))
for {
alice <- allocateParty("alice") // offset now at [1,0]
_ <- ps
.submitTransaction(submitterInfo(rt, alice), transactionMeta(rt), emptyTransaction)
.toScala
_ <- ps
.submitTransaction(submitterInfo(rt, alice), transactionMeta(rt), emptyTransaction)
.toScala
offsetAndUpdate <- ps
.stateUpdates(beginAfter = Some(Offset(Array(1L, 0L))))
.runWith(Sink.head)
} yield {
val (offset, update) = offsetAndUpdate
assert(offset == Offset(Array(2L, 0L)))
assert(update.isInstanceOf[Update.CommandRejected])
}
}
@ -266,47 +278,11 @@ class InMemoryKVParticipantStateIT
}
}
"correctly implements open world tx submission authorization" in {
val rt = ps.getNewRecordTime()
val updatesResult = ps.stateUpdates(beginAfter = None).take(3).runWith(Sink.seq)
for {
// Submit without allocation in open world setting, expecting this to succeed.
_ <- ps.submitTransaction(submitterInfo(rt), transactionMeta(rt), emptyTransaction).toScala
// Allocate a party and try the submission again with an allocated party.
allocResult <- ps
.allocateParty(
None /* no name hint, implementation decides party name */,
Some("Somebody"))
.toScala
_ <- assert(allocResult.isInstanceOf[PartyAllocationResult.Ok])
_ <- ps
.submitTransaction(
submitterInfo(
rt,
party = allocResult.asInstanceOf[PartyAllocationResult.Ok].result.party),
transactionMeta(rt),
emptyTransaction)
.toScala
Seq((offset1, update1), (offset2, update2), (offset3, update3)) <- updatesResult
} yield {
assert(offset1 == Offset(Array(0, 0)))
assert(update1.isInstanceOf[Update.TransactionAccepted])
assert(offset2 == Offset(Array(1, 0)))
assert(update2.isInstanceOf[Update.PartyAddedToParticipant])
assert(offset3 == Offset(Array(2, 0)))
assert(update3.isInstanceOf[Update.TransactionAccepted])
}
}
"correctly implements closed world tx submission authorization" in {
"correctly implements tx submission authorization" in {
val rt = ps.getNewRecordTime()
val updatesResult = ps.stateUpdates(beginAfter = None).take(4).runWith(Sink.seq)
val unallocatedParty = Ref.Party.assertFromString("nobody")
for {
lic <- ps.getLedgerInitialConditions().runWith(Sink.head)
@ -317,13 +293,17 @@ class InMemoryKVParticipantStateIT
submissionId = "test1",
config = lic.config.copy(
generation = lic.config.generation + 1,
openWorld = false,
)
)
.toScala
// Submit without allocation in closed world setting.
_ <- ps.submitTransaction(submitterInfo(rt), transactionMeta(rt), emptyTransaction).toScala
// Submit without allocation
_ <- ps
.submitTransaction(
submitterInfo(rt, unallocatedParty),
transactionMeta(rt),
emptyTransaction)
.toScala
// Allocate a party and try the submission again with an allocated party.
allocResult <- ps
@ -340,23 +320,24 @@ class InMemoryKVParticipantStateIT
transactionMeta(rt),
emptyTransaction)
.toScala
updates <- updatesResult
Seq((offset1, update1), (offset2, update2), (offset3, update3), (offset4, update4)) <- ps
.stateUpdates(beginAfter = None)
.take(4)
.runWith(Sink.seq)
} yield {
def takeUpdate(n: Int) = {
val (offset, update) = updates(n)
assert(offset == Offset(Array(n.toLong, 0L)))
update
}
assert(takeUpdate(0).isInstanceOf[Update.ConfigurationChanged])
assert(update1.isInstanceOf[Update.ConfigurationChanged])
assert(offset1 == Offset(Array(0L, 0L)))
assert(
takeUpdate(1)
update2
.asInstanceOf[Update.CommandRejected]
.reason == RejectionReason.PartyNotKnownOnLedger)
assert(takeUpdate(2).isInstanceOf[Update.PartyAddedToParticipant])
assert(takeUpdate(3).isInstanceOf[Update.TransactionAccepted])
assert(offset2 == Offset(Array(1L, 0L)))
assert(update3.isInstanceOf[Update.PartyAddedToParticipant])
assert(offset3 == Offset(Array(2L, 0L)))
assert(update4.isInstanceOf[Update.TransactionAccepted])
assert(offset4 == Offset(Array(3L, 0L)))
}
}
@ -366,14 +347,13 @@ class InMemoryKVParticipantStateIT
for {
lic <- ps.getLedgerInitialConditions().runWith(Sink.head)
// Submit a configuration change that flips the "open world" flag.
// Submit an initial configuration change
_ <- ps
.submitConfiguration(
maxRecordTime = rt.addMicros(1000000),
submissionId = "test1",
config = lic.config.copy(
generation = lic.config.generation + 1,
openWorld = !lic.config.openWorld
))
.toScala
@ -417,8 +397,8 @@ object InMemoryKVParticipantStateIT {
private val archives =
darReader.readArchiveFromFile(new File(rlocation("ledger/test-common/Test-stable.dar"))).get.all
private def submitterInfo(rt: Timestamp, party: String = "Alice") = SubmitterInfo(
submitter = Ref.Party.assertFromString(party),
private def submitterInfo(rt: Timestamp, party: Ref.Party) = SubmitterInfo(
submitter = party,
applicationId = Ref.LedgerString.assertFromString("tests"),
commandId = Ref.LedgerString.assertFromString("X"),
maxRecordTime = rt.addMicros(Duration.ofSeconds(10).toNanos / 1000)

View File

@ -14,6 +14,8 @@ import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.daml.lf.engine.Engine
import com.digitalasset.daml_lf_dev.DamlLf
import scalaz.State
import scalaz.syntax.traverse._
import scalaz.std.list._
import scala.collection.JavaConverters._
@ -44,9 +46,10 @@ object KVTest {
def runTest[A](test: KVTest[A]): A =
test.eval(initialTestState)
def runTestWithSimplePackage[A](test: KVTest[A]): A =
def runTestWithSimplePackage[A](parties: Party*)(test: KVTest[A]): A =
(for {
_ <- uploadSimpleArchive
_ <- parties.toList.map(p => allocateParty(p, p)).sequenceU
r <- test
} yield r).eval(initialTestState)
@ -83,7 +86,8 @@ object KVTest {
getDamlState(Conversions.configurationStateKey)
.flatMap {
case None => getDefaultConfiguration
case Some(v) => State.state(Conversions.parseDamlConfiguration(v.getConfiguration).get)
case Some(v) =>
State.state(Configuration.decode(v.getConfigurationEntry.getConfiguration).right.get)
}
def setRecordTime(rt: Timestamp): KVTest[Unit] =
@ -196,7 +200,7 @@ object KVTest {
def submitConfig(
configModify: Configuration => Configuration,
submissionId: String = "",
submissionId: String = randomString,
mrtDelta: Duration = minMRTDelta
): KVTest[DamlLogEntry] =
for {
@ -206,6 +210,7 @@ object KVTest {
KeyValueSubmission.configurationToSubmission(
maxRecordTime = testState.recordTime.addMicros(mrtDelta.toNanos / 1000),
submissionId = submissionId,
participantId = testState.participantId,
config = configModify(oldConf)
)
)

View File

@ -6,10 +6,10 @@ package com.daml.ledger.participant.state.kvutils
import java.time.Duration
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.v1.Configuration
import com.digitalasset.daml.lf.data.Ref
import org.scalatest.{Matchers, WordSpec}
import scala.util.Success
class KVUtilsConfigSpec extends WordSpec with Matchers {
import KVTest._
import TestHelpers._
@ -22,27 +22,27 @@ class KVUtilsConfigSpec extends WordSpec with Matchers {
KeyValueSubmission.configurationToSubmission(
maxRecordTime = theRecordTime,
submissionId = "foobar",
participantId = Ref.LedgerString.assertFromString("participant"),
config = theDefaultConfig
)))
val configSubm = subm.getConfigurationSubmission
Conversions.parseTimestamp(configSubm.getMaximumRecordTime) shouldEqual theRecordTime
configSubm.getSubmissionId shouldEqual "foobar"
Conversions.parseDamlConfiguration(configSubm.getConfiguration) shouldEqual Success(
theDefaultConfig)
Configuration.decode(configSubm.getConfiguration) shouldEqual Right(theDefaultConfig)
}
"check generation" in KVTest.runTest {
for {
logEntry <- submitConfig(
configModify = c => c.copy(generation = c.generation + 1, openWorld = false),
configModify = c => c.copy(generation = c.generation + 1),
submissionId = "submission0"
)
newConfig <- getConfiguration
// Change again, but without bumping generation.
logEntry2 <- submitConfig(
configModify = c => c.copy(generation = c.generation, openWorld = true),
configModify = c => c.copy(generation = c.generation),
submissionId = "submission1"
)
newConfig2 <- getConfiguration
@ -51,7 +51,6 @@ class KVUtilsConfigSpec extends WordSpec with Matchers {
logEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.CONFIGURATION_ENTRY
logEntry.getConfigurationEntry.getSubmissionId shouldEqual "submission0"
newConfig.generation shouldEqual 1
newConfig.openWorld shouldEqual false
logEntry2.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.CONFIGURATION_REJECTION_ENTRY
logEntry2.getConfigurationRejectionEntry.getSubmissionId shouldEqual "submission1"
@ -79,8 +78,7 @@ class KVUtilsConfigSpec extends WordSpec with Matchers {
// Set a configuration with an authorized participant id
logEntry0 <- submitConfig { c =>
c.copy(
generation = c.generation + 1,
authorizedParticipantId = Some(p0)
generation = c.generation + 1
)
}
@ -93,7 +91,6 @@ class KVUtilsConfigSpec extends WordSpec with Matchers {
c =>
c.copy(
generation = c.generation + 1,
openWorld = false
)
)
}
@ -107,7 +104,6 @@ class KVUtilsConfigSpec extends WordSpec with Matchers {
c =>
c.copy(
generation = c.generation + 1,
openWorld = false
))
}

View File

@ -45,7 +45,7 @@ class KVUtilsTransactionSpec extends WordSpec with Matchers {
val p0 = mkParticipantId(0)
val p1 = mkParticipantId(1)
"be able to submit transaction" in KVTest.runTestWithSimplePackage(
"be able to submit transaction" in KVTest.runTestWithSimplePackage(alice)(
for {
tx <- runCommand(alice, simpleCreateCmd)
logEntry <- submitTransaction(submitter = alice, tx = tx).map(_._2)
@ -67,7 +67,7 @@ class KVUtilsTransactionSpec extends WordSpec with Matchers {
)
*/
"reject transaction with out of bounds LET" in KVTest.runTestWithSimplePackage(
"reject transaction with out of bounds LET" in KVTest.runTestWithSimplePackage(alice)(
for {
tx <- runCommand(alice, simpleCreateCmd)
conf <- getDefaultConfiguration
@ -80,7 +80,7 @@ class KVUtilsTransactionSpec extends WordSpec with Matchers {
}
)
"be able to exercise and rejects double spends" in KVTest.runTestWithSimplePackage {
"be able to exercise and rejects double spends" in KVTest.runTestWithSimplePackage(alice) {
for {
createTx <- runCommand(alice, simpleCreateCmd)
result <- submitTransaction(submitter = alice, tx = createTx)
@ -108,10 +108,10 @@ class KVUtilsTransactionSpec extends WordSpec with Matchers {
}
}
"reject transactions for unallocated parties" in KVTest.runTestWithSimplePackage {
"reject transactions for unallocated parties" in KVTest.runTestWithSimplePackage() {
for {
configEntry <- submitConfig { c =>
c.copy(generation = c.generation + 1, openWorld = false)
c.copy(generation = c.generation + 1)
}
createTx <- runCommand(alice, simpleCreateCmd)
txEntry <- submitTransaction(submitter = alice, tx = createTx).map(_._2)
@ -122,10 +122,10 @@ class KVUtilsTransactionSpec extends WordSpec with Matchers {
}
}
"reject transactions for unhosted parties" in KVTest.runTestWithSimplePackage {
"reject transactions for unhosted parties" in KVTest.runTestWithSimplePackage() {
for {
configEntry <- submitConfig { c =>
c.copy(generation = c.generation + 1, openWorld = false)
c.copy(generation = c.generation + 1)
}
createTx <- runCommand(alice, simpleCreateCmd)
@ -143,7 +143,8 @@ class KVUtilsTransactionSpec extends WordSpec with Matchers {
}
}
"reject unauthorized transactions " in KVTest.runTestWithSimplePackage {
"reject unauthorized transactions" in KVTest.runTestWithSimplePackage() {
for {
// Submit a creation of a contract with owner 'Alice', but submit it as 'Bob'.
createTx <- runCommand(alice, simpleCreateCmd)
@ -164,7 +165,7 @@ class KVUtilsTransactionSpec extends WordSpec with Matchers {
}
}
"transient contracts and keys are properly archived" in KVTest.runTestWithSimplePackage {
"transient contracts and keys are properly archived" in KVTest.runTestWithSimplePackage(alice) {
for {
tx1 <- runCommand(alice, simpleCreateAndExerciseCmd)
createAndExerciseTx1 <- submitTransaction(alice, tx1).map(_._2)

View File

@ -5,9 +5,8 @@ package com.daml.ledger.participant.state.kvutils
import java.util.UUID
import com.daml.ledger.participant.state.backport.TimeModel
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId
import com.daml.ledger.participant.state.v1.{Configuration, ParticipantId}
import com.daml.ledger.participant.state.v1.{Configuration, ParticipantId, TimeModel}
import com.digitalasset.daml.lf.archive.Decode
import com.digitalasset.daml.lf.archive.testing.Encode
import com.digitalasset.daml.lf.data.Time.Timestamp
@ -72,9 +71,7 @@ object TestHelpers {
val theRecordTime: Timestamp = Timestamp.Epoch
val theDefaultConfig = Configuration(
generation = 0,
timeModel = TimeModel.reasonableDefault,
authorizedParticipantId = None,
openWorld = true,
timeModel = TimeModel.reasonableDefault
)
def mkEntryId(n: Int): DamlLogEntryId = {

View File

@ -7,9 +7,8 @@ import java.io.{DataInputStream, FileInputStream}
import java.util.concurrent.TimeUnit
import com.codahale.metrics
import com.daml.ledger.participant.state.backport.TimeModel
import com.daml.ledger.participant.state.kvutils.{DamlKvutils => Proto, _}
import com.daml.ledger.participant.state.v1.{Configuration, Update}
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.engine.Engine
@ -51,9 +50,7 @@ object IntegrityCheck extends App {
val engine = Engine()
val defaultConfig = Configuration(
generation = 0,
timeModel = TimeModel.reasonableDefault,
authorizedParticipantId = None,
openWorld = true
timeModel = TimeModel.reasonableDefault
)
var state = Map.empty[Proto.DamlStateKey, Proto.DamlStateValue]

View File

@ -0,0 +1,21 @@
# Copyright (c) 2019 The DAML Authors. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
load("//bazel_tools:java.bzl", "da_java_proto_library")
proto_library(
name = "ledger_configuration_proto",
srcs = ["com/daml/ledger/participant/state/ledger_configuration.proto"],
strip_import_prefix = "/ledger/participant-state/protobuf",
visibility = ["//ledger/participant-state:__subpackages__"],
deps = [
"@com_google_protobuf//:duration_proto",
],
)
da_java_proto_library(
name = "ledger_configuration_java_proto",
tags = ["maven_coordinates=com.daml.ledger:participant-state-ledger-configuration-java-proto:__VERSION__"],
visibility = ["//ledger:__subpackages__"],
deps = [":ledger_configuration_proto"],
)

View File

@ -0,0 +1,6 @@
participant-state/protobuf
--------------------------
Common protocol buffer definitions used across different implementations.

View File

@ -0,0 +1,45 @@
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// The DAML Ledger configuration. Please refer to the spec
// (ledger/participant-state/protobuf/ledger_configuration.rst)
// for detailed information on versions and semantics.
//
// version summary:
// * 1: initial version
//
syntax = "proto3";
package com.daml.ledger.participant.state;
option java_package = "com.daml.ledger.participant.state.protobuf";
option java_multiple_files = true;
option csharp_namespace = "Com.Daml.Ledger.Participant.State.Protobuf";
import "google/protobuf/duration.proto";
message LedgerConfiguration {
// The version of the configuration message. Defines the semantics
// of how it is decoded and interpreted.
int64 version = 1;
// The configuration generation. If submitting a configuration the new generation
// must be one larger than previous configuration. This safe-guards against
// configuration changes that are based upon stale data.
int64 generation = 2;
// The ledger time model, specifying the bounds for
// ledger effective time and maximum record time of transactions.
LedgerTimeModel time_model = 3;
}
message LedgerTimeModel {
// The expected minimum latency of a transaction.
google.protobuf.Duration min_transaction_latency = 1;
// The maximum allowed clock skew between the ledger and clients.
google.protobuf.Duration max_clock_skew = 2;
// The maximum allowed time to live for a transaction.
// Must be greater than the derived minimum time to live.
google.protobuf.Duration max_ttl = 3;
}

View File

@ -0,0 +1,86 @@
.. Copyright (c) 2019 The DAML Authors. All rights reserved.
.. SPDX-License-Identifier: Apache-2.0
DAML Ledger Configuration Specification
=======================================
**version 1, 25th November 2019**
This specification, along with ``ledger_configuration.proto``
defines the data representation and the semantics of ledger
configuration, including the time model.
We follow the same rules as ``transaction.proto`` with regards
to assignment of version numbers and maintaining backwards compatibility.
Please read ``daml-lf/spec/transaction.rst`` to understand the rules
we follow for evolving the ledger configuration.
The canonical specification compliant implementation for encoding and
decoding ledger configurations is part of the participant-state package
in ``ledger/participant-state``.
Version history
^^^^^^^^^^^^^^^
Please refer to ``ledger_configuration.proto`` for version history.
message LedgerConfiguration
^^^^^^^^^^^^^^^^^^^^^^^^^^^
*since version 1*
An instance of the ledger configuration.
As of version 1, these fields are included:
* ``int64`` version
* ``int64`` generation
* ``LedgerTimeModel`` time_model
* ``string`` authorized_participant_id
``version`` is required, and should be set to the latest version as
specified by this document. Consumers should reject configurations
with an unknown version.
``generation`` is required and must be set to a number one larger than
the current configuration in order for the configuration change to be
accepted. This safe-guards against configuration changes based on
stale data.
``time_model`` is required.
``authorized_participant_ids`` is optional. If non-empty, then configuration
change originating from a participant that does not match any of the authorized
participants field will be rejected.
If unset, then change from any participant is accepted and that participant can set this field.
message LedgerTimeModel
^^^^^^^^^^^^^^^^^^^^^^^
*since version 1*
Defines the ledger time model, which governs the rules for acceptable
ledger effective time and maximum record time parameters that are part
of transaction submission.
As of version 1, these fields are included:
* ``Duration`` min_transaction_latency
* ``Duration`` max_clock_skew
* ``Duration`` max_ttl
``min_transaction_latency`` is required. It defines the minimum expected
latency for a transaction to be committed.
``max_clock_skew`` is required. It defines the maximum allowed clock skew
between the ledger and clients.
``max_ttl`` is required. It defines the maximum allowed time to live for a
transaction.
These three parameters are used in the following way.
Given the record time ``RT``, a transaction submission with a ledger effective time ``LET``
and maximum record time ``MRT`` is accepted if:
* ``LET - MRT >= min_transaction_latency + max_clock_skew``
* ``LET - MRT <= max_ttl``.
* ``LET - max_clock_skew <= RT <= MRT``.

View File

@ -3,6 +3,10 @@
package com.daml.ledger.participant.state.v1
import java.time.Duration
import scala.util.Try
/** Ledger configuration describing the ledger's time model.
* Emitted in [[com.daml.ledger.participant.state.v1.Update.ConfigurationChanged]].
*/
@ -11,9 +15,71 @@ final case class Configuration(
generation: Long,
/** The time model of the ledger. Specifying the time-to-live bounds for Ledger API commands. */
timeModel: TimeModel,
/** The identity of the participant allowed to change the configuration. If not set, any participant
* can change the configuration. */
authorizedParticipantId: Option[ParticipantId],
/** Flag to enable "open world" mode in which submissions from unallocated parties are allowed through. Useful in testing. */
openWorld: Boolean
)
object Configuration {
import com.daml.ledger.participant.state.protobuf
val protobufVersion: Long = 1L
def decode(bytes: Array[Byte]): Either[String, Configuration] =
Try(protobuf.LedgerConfiguration.parseFrom(bytes)).toEither.left
.map(_.getMessage)
.right
.flatMap(decode)
def decode(config: protobuf.LedgerConfiguration): Either[String, Configuration] =
config.getVersion match {
case 1 => DecodeV1.decode(config)
case v => Left("Unknown version: $v")
}
private object DecodeV1 {
def decode(config: protobuf.LedgerConfiguration): Either[String, Configuration] =
for {
tm <- if (config.hasTimeModel)
decodeTimeModel(config.getTimeModel)
else
Left("Missing time model")
} yield {
Configuration(
generation = config.getGeneration,
timeModel = tm,
)
}
def decodeTimeModel(tm: protobuf.LedgerTimeModel): Either[String, TimeModel] =
TimeModel(
maxClockSkew = parseDuration(tm.getMaxClockSkew),
minTransactionLatency = parseDuration(tm.getMinTransactionLatency),
maxTtl = parseDuration(tm.getMaxTtl)
).toEither.left.map(e => s"decodeTimeModel: ${e.getMessage}")
}
def encode(config: Configuration): protobuf.LedgerConfiguration = {
val tm = config.timeModel
protobuf.LedgerConfiguration.newBuilder
.setVersion(protobufVersion)
.setGeneration(config.generation)
.setTimeModel(
protobuf.LedgerTimeModel.newBuilder
.setMaxClockSkew(buildDuration(tm.maxClockSkew))
.setMinTransactionLatency(buildDuration(tm.minTransactionLatency))
.setMaxTtl(buildDuration(tm.maxTtl))
)
.build
}
private def parseDuration(dur: com.google.protobuf.Duration): Duration = {
Duration.ofSeconds(dur.getSeconds, dur.getNanos.toLong)
}
private def buildDuration(dur: Duration): com.google.protobuf.Duration = {
com.google.protobuf.Duration.newBuilder
.setSeconds(dur.getSeconds)
.setNanos(dur.getNano)
.build
}
}

View File

@ -4,44 +4,70 @@
package com.daml.ledger.participant.state.v1
import java.time.{Duration, Instant}
import scala.util.Try
trait TimeModel {
def minTransactionLatency: Duration
def futureAcceptanceWindow: Duration
def maxClockSkew: Duration
def minTtl: Duration
def maxTtl: Duration
}
trait TimeModelChecker {
/**
* The ledger time model and associated validations. Some values are given by constructor args; others are derived.
*
* @param minTransactionLatency The expected minimum latency of a transaction.
* @param maxClockSkew The maximum allowed clock skew between the ledger and clients.
* @param maxTtl The maximum allowed time to live for a transaction.
* Must be greater than the derived minimum time to live.
* @throws IllegalArgumentException if the parameters aren't valid
*/
case class TimeModel private (
minTransactionLatency: Duration,
maxClockSkew: Duration,
maxTtl: Duration) {
/**
* Validates that the given ledger effective time is within an acceptable time window of the current system time.
*
* @param currentTime the current time
* @param givenLedgerEffectiveTime The ledger effective time to validate.
* @param givenMaximumRecordTime The maximum record time to validate.
* @return true if successful
* The minimum time to live for a transaction. Equal to the minimum transaction latency plus the maximum clock skew.
*/
val minTtl: Duration = minTransactionLatency.plus(maxClockSkew)
/**
* The maximum window after the current time when transaction ledger effective times will be accepted.
* Currently equal to the max clock skew.
* <p/>
* The corresponding past acceptance window is given by the command's TTL, and thus bounded inclusive by [[maxTtl]].
*/
val futureAcceptanceWindow: Duration = maxClockSkew
def checkTtl(givenLedgerEffectiveTime: Instant, givenMaximumRecordTime: Instant): Boolean = {
val givenTtl = Duration.between(givenLedgerEffectiveTime, givenMaximumRecordTime)
!givenTtl.minus(minTtl).isNegative && !maxTtl.minus(givenTtl).isNegative
}
def checkLet(
currentTime: Instant,
givenLedgerEffectiveTime: Instant,
givenMaximumRecordTime: Instant): Boolean
givenMaximumRecordTime: Instant): Boolean = {
// Note that, contrary to the documented spec, the record time of a transaction is when it's sequenced.
// It turns out this isn't a problem for the participant or the sandbox,
// and MRT seems to be going away in Sirius anyway, so I've left it as is.
val lowerBound = givenLedgerEffectiveTime.minus(futureAcceptanceWindow)
!currentTime.isBefore(lowerBound) && !currentTime.isAfter(givenMaximumRecordTime)
}
}
object TimeModel {
/**
* Validates that the ttl of the given times is within bounds.
* The ttl of a command is defined as the duration between
* the ledger effective time and maximum record time.
*
* @param givenLedgerEffectiveTime The given ledger effective time.
* @param givenMaximumRecordTime The given maximum record time.
* @return true if successful
* A default TimeModel that's reasonable for a test or sandbox ledger application.
* Serious applications (viz. ledger) should probably specify their own TimeModel.
*/
def checkTtl(givenLedgerEffectiveTime: Instant, givenMaximumRecordTime: Instant): Boolean
val reasonableDefault: TimeModel =
TimeModel(Duration.ofSeconds(1L), Duration.ofSeconds(1L), Duration.ofSeconds(30L)).get
def apply(
minTransactionLatency: Duration,
maxClockSkew: Duration,
maxTtl: Duration): Try[TimeModel] = Try {
require(!minTransactionLatency.isNegative, "Negative min transaction latency")
require(!maxTtl.isNegative, "Negative max TTL")
require(!maxClockSkew.isNegative, "Negative max clock skew")
require(!maxTtl.minus(maxClockSkew).isNegative, "Max TTL must be greater than max clock skew")
new TimeModel(minTransactionLatency, maxClockSkew, maxTtl)
}
}

View File

@ -20,6 +20,9 @@ sealed trait Update extends Product with Serializable {
/** Short human-readable one-line description summarizing the state updates content. */
def description: String
/** The record time at which the state change was committed. */
def recordTime: Timestamp
}
object Update {
@ -30,18 +33,27 @@ object Update {
}
/** Signal that the current [[Configuration]] has changed. */
final case class ConfigurationChanged(submissionId: String, newConfiguration: Configuration)
final case class ConfigurationChanged(
recordTime: Timestamp,
submissionId: String,
participantId: ParticipantId,
newConfiguration: Configuration)
extends Update {
override def description: String =
s"Configuration changed to: $newConfiguration"
s"Configuration change '$submissionId' from participant '$participantId' accepted with configuration: $newConfiguration"
}
/** Signal that a configuration change submitted by this participant was rejected.
*/
final case class ConfigurationChangeRejected(submissionId: String, reason: String)
final case class ConfigurationChangeRejected(
recordTime: Timestamp,
submissionId: String,
participantId: ParticipantId,
proposedConfiguration: Configuration,
rejectionReason: String)
extends Update {
override def description: String = {
s"Configuration change '$submissionId' was rejected: $reason"
s"Configuration change '$submissionId' from participant '$participantId' was rejected: $rejectionReason"
}
}
@ -155,6 +167,7 @@ object Update {
* rejected.
*/
final case class CommandRejected(
recordTime: Timestamp,
submitterInfo: SubmitterInfo,
reason: RejectionReason,
) extends Update {

View File

@ -7,7 +7,13 @@ package com.daml.ledger.participant.state.v1
* and version constants (currently none).
*
* Changes:
* [since 100.13.21]:
* [after 100.13.37]:
* - Moved configuration serialization from kvutils to participant-state. This is used both by
* kvutils and the index to encode and decode configurations.
* - Authorized participant identifier and "open-world" flag removed from configuration.
* - Record time added to all [[Update]]s.
*
* [after 100.13.21]:
* - Rename referencedContracts to divulgedContracts in [[Update.TransactionAccepted]].
*/
object Version {}

View File

@ -1,36 +1,33 @@
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.server.services.time
package com.daml.ledger.participant.state.v1
import java.time._
import com.digitalasset.platform.services.time.{TimeModel, TimeModelChecker}
import org.scalatest.{Matchers, WordSpec}
class TimeModelTest extends WordSpec with Matchers {
private val referenceTime = Instant.EPOCH
private val epsilon = Duration.ofMillis(10L)
private val sut =
private val timeModel =
TimeModel(Duration.ofSeconds(1L), Duration.ofSeconds(1L), Duration.ofSeconds(30L)).get
private val checker = TimeModelChecker(sut)
private val referenceMrt = referenceTime.plus(sut.maxTtl)
private val referenceMrt = referenceTime.plus(timeModel.maxTtl)
private def acceptLet(let: Instant): Boolean =
checker.checkLet(referenceTime, let, let.plus(sut.maxTtl))
timeModel.checkLet(referenceTime, let, let.plus(timeModel.maxTtl))
private def acceptMrt(mrt: Instant): Boolean =
checker.checkLet(referenceTime, mrt.minus(sut.maxTtl), mrt)
timeModel.checkLet(referenceTime, mrt.minus(timeModel.maxTtl), mrt)
private def acceptTtl(mrt: Instant): Boolean = checker.checkTtl(referenceTime, mrt)
private def acceptTtl(mrt: Instant): Boolean = timeModel.checkTtl(referenceTime, mrt)
"Ledger effective time model checker" when {
"calculating derived values" should {
"calculate minTtl correctly" in {
sut.minTtl shouldEqual sut.minTransactionLatency.plus(sut.maxClockSkew)
timeModel.minTtl shouldEqual timeModel.minTransactionLatency.plus(timeModel.maxClockSkew)
}
}
@ -40,55 +37,55 @@ class TimeModelTest extends WordSpec with Matchers {
}
"succeed if the time is higher than the current time and is within tolerance limit" in {
acceptLet(referenceTime.plus(sut.futureAcceptanceWindow).minus(epsilon)) shouldEqual true
acceptLet(referenceTime.plus(timeModel.futureAcceptanceWindow).minus(epsilon)) shouldEqual true
}
"succeed if the time is equal to the high boundary" in {
acceptLet(referenceTime.plus(sut.futureAcceptanceWindow)) shouldEqual true
acceptLet(referenceTime.plus(timeModel.futureAcceptanceWindow)) shouldEqual true
}
"fail if the time is higher than the high boundary" in {
acceptLet(referenceTime.plus(sut.futureAcceptanceWindow).plus(epsilon)) shouldEqual false
acceptLet(referenceTime.plus(timeModel.futureAcceptanceWindow).plus(epsilon)) shouldEqual false
}
"fail if the MRT is less than the low boundary" in {
acceptMrt(referenceMrt.minus(sut.maxTtl).minus(epsilon)) shouldEqual false
acceptMrt(referenceMrt.minus(timeModel.maxTtl).minus(epsilon)) shouldEqual false
}
"succeed if the MRT is equal to the low boundary" in {
acceptMrt(referenceMrt.minus(sut.maxTtl)) shouldEqual true
acceptMrt(referenceMrt.minus(timeModel.maxTtl)) shouldEqual true
}
"succeed if the MRT is greater than than the low boundary" in {
acceptMrt(referenceMrt.minus(sut.maxTtl).plus(epsilon)) shouldEqual true
acceptMrt(referenceMrt.minus(timeModel.maxTtl).plus(epsilon)) shouldEqual true
}
}
}
"TTL time model checker" when {
"TTL time model" when {
"checking if TTL is within accepted boundaries" should {
"fail if the TTL is less than than the low boundary" in {
acceptTtl(referenceTime.plus(sut.minTtl).minus(epsilon)) shouldEqual false
acceptTtl(referenceTime.plus(timeModel.minTtl).minus(epsilon)) shouldEqual false
}
"succeed if the TTL is equal to the low boundary" in {
acceptTtl(referenceTime.plus(sut.minTtl)) shouldEqual true
acceptTtl(referenceTime.plus(timeModel.minTtl)) shouldEqual true
}
"succeed if the TTL is greater than the low boundary" in {
acceptTtl(referenceTime.plus(sut.minTtl).plus(epsilon)) shouldEqual true
acceptTtl(referenceTime.plus(timeModel.minTtl).plus(epsilon)) shouldEqual true
}
"succeed if the TTL is less than the high boundary" in {
acceptTtl(referenceTime.plus(sut.maxTtl).minus(epsilon)) shouldEqual true
acceptTtl(referenceTime.plus(timeModel.maxTtl).minus(epsilon)) shouldEqual true
}
"succeed if the TTL is equal to the high boundary" in {
acceptTtl(referenceTime.plus(sut.maxTtl)) shouldEqual true
acceptTtl(referenceTime.plus(timeModel.maxTtl)) shouldEqual true
}
"fail if the TTL is greater than than the high boundary" in {
acceptTtl(referenceTime.plus(sut.maxTtl).plus(epsilon)) shouldEqual false
acceptTtl(referenceTime.plus(timeModel.maxTtl).plus(epsilon)) shouldEqual false
}
}
}

View File

@ -0,0 +1 @@
55762555d92c53b57217b32a0feb4a278dde5d0a7be23d227ac816f2b899b7fc

View File

@ -0,0 +1,42 @@
-- Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
---------------------------------------------------------------------------------------------------
-- V7: Add table for ledger configuration changes
--
-- This schema version adds a table for ledger configuration changes and adds the latest
-- configuration to the parameters table.
---------------------------------------------------------------------------------------------------
-- Table for storing a log of ledger configuration changes and rejections.
CREATE TABLE configuration_entries (
ledger_offset bigint primary key not null,
recorded_at timestamp not null, -- with time zone
submission_id varchar not null,
participant_id varchar not null,
-- The type of entry, one of 'accept' or 'reject'.
typ varchar not null,
-- The configuration that was proposed and either accepted or rejected depending on the type.
-- Encoded according to participant-state/protobuf/ledger_configuration.proto.
configuration bytea not null,
-- If the type is 'rejection', then the rejection reason is set.
-- Rejection reason is a human-readable description why the change was rejected.
rejection_reason varchar,
-- Check that fields are correctly set based on the type.
constraint configuration_entries_check_reason
check (
(typ = 'accept' and rejection_reason is null) or
(typ = 'reject' and rejection_reason is not null))
);
-- Index for retrieving the configuration entry by submission identifier.
-- To be used for completing configuration submissions.
CREATE UNIQUE INDEX idx_configuration_submission
ON configuration_entries (submission_id, participant_id);
-- Add the current configuration column to parameters.
ALTER TABLE parameters
ADD configuration bytea;

View File

@ -0,0 +1 @@
2371c0f1dfa9a21f3536dd129cc6c9b517a02a6604b58912ef665f0318e542a9

View File

@ -0,0 +1,42 @@
-- Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
---------------------------------------------------------------------------------------------------
-- V12: Add table for ledger configuration changes
--
-- This schema version adds a table for ledger configuration changes and adds the latest
-- configuration to the parameters table.
---------------------------------------------------------------------------------------------------
-- Table for storing a log of ledger configuration changes and rejections.
CREATE TABLE configuration_entries (
ledger_offset bigint primary key not null,
recorded_at timestamp not null, -- with time zone
submission_id varchar not null,
participant_id varchar not null,
-- The type of entry, one of 'accept' or 'reject'.
typ varchar not null,
-- The configuration that was proposed and either accepted or rejected depending on the type.
-- Encoded according to participant-state/protobuf/ledger_configuration.proto.
configuration bytea not null,
-- If the type is 'rejection', then the rejection reason is set.
-- Rejection reason is a human-readable description why the change was rejected.
rejection_reason varchar,
-- Check that fields are correctly set based on the type.
constraint check_entry
check (
(typ = 'accept' and rejection_reason is null) or
(typ = 'reject' and rejection_reason is not null))
);
-- Index for retrieving the configuration entry by submission identifier.
-- To be used for completing configuration submissions.
CREATE UNIQUE INDEX idx_configuration_submission
ON configuration_entries (submission_id, participant_id);
-- Add the current configuration column to parameters.
ALTER TABLE parameters
ADD configuration bytea;

View File

@ -245,7 +245,7 @@ class JdbcIndexer private[index] (
private def handleStateUpdate(offset: Offset, update: Update): Future[Unit] = {
lastReceivedOffset = offset.toLedgerString
stateUpdateRecordTime(update).foreach(lastReceivedRecordTime = _)
lastReceivedRecordTime = update.recordTime.toInstant
val externalOffset = Some(offset.toLedgerString)
update match {
@ -319,21 +319,38 @@ class JdbcIndexer private[index] (
.storeLedgerEntry(headRef, headRef + 1, externalOffset, pt)
.map(_ => headRef = headRef + 1)(DEC)
case _: ConfigurationChanged =>
// TODO (GS) implement configuration changes
Future.successful(())
case config: ConfigurationChanged =>
ledgerDao
.storeConfigurationEntry(
headRef,
headRef + 1,
externalOffset,
config.recordTime.toInstant,
config.submissionId,
config.participantId,
config.newConfiguration,
None
)
.map(_ => headRef = headRef + 1)(DEC)
case _: ConfigurationChangeRejected =>
// TODO(JM) implement configuration rejections
Future.successful(())
case configRejection: ConfigurationChangeRejected =>
ledgerDao
.storeConfigurationEntry(
headRef,
headRef + 1,
externalOffset,
configRejection.recordTime.toInstant,
configRejection.submissionId,
configRejection.participantId,
configRejection.proposedConfiguration,
Some(configRejection.rejectionReason)
)
.map(_ => headRef = headRef + 1)(DEC)
case CommandRejected(submitterInfo, RejectionReason.DuplicateCommand) =>
Future.successful(())
case CommandRejected(submitterInfo, reason) =>
case CommandRejected(recordTime, submitterInfo, reason) =>
val rejection = PersistenceEntry.Rejection(
LedgerEntry.Rejection(
Instant.now(), // TODO should we get this from the backend?
recordTime.toInstant,
submitterInfo.commandId,
submitterInfo.applicationId,
submitterInfo.submitter,
@ -347,17 +364,6 @@ class JdbcIndexer private[index] (
}
}
private def stateUpdateRecordTime(update: Update): Option[Instant] =
(update match {
case Heartbeat(recordTime) => Some(recordTime)
case PartyAddedToParticipant(_, _, _, recordTime) => Some(recordTime)
case PublicPackageUploaded(_, _, _, recordTime) => Some(recordTime)
case TransactionAccepted(_, _, _, _, recordTime, _) => Some(recordTime)
case ConfigurationChanged(_, _) => None
case ConfigurationChangeRejected(_, _) => None
case CommandRejected(_, _) => None
}) map (_.toInstant)
private def toDomainRejection(
submitterInfo: SubmitterInfo,
state: RejectionReason): domain.RejectionReason = state match {

View File

@ -9,7 +9,8 @@ import ch.qos.logback.classic.Level
import com.digitalasset.ledger.api.auth.AuthService
import com.digitalasset.ledger.api.tls.TlsConfiguration
import com.digitalasset.platform.common.LedgerIdMode
import com.digitalasset.platform.services.time.{TimeModel, TimeProviderType}
import com.digitalasset.platform.services.time.TimeProviderType
import com.daml.ledger.participant.state.v1.TimeModel
import scala.concurrent.duration._

View File

@ -40,7 +40,6 @@ import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntr
import com.digitalasset.platform.sandbox.stores.ledger._
import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode
import com.digitalasset.platform.server.api.validation.ErrorFactories
import com.digitalasset.platform.services.time.TimeModel
import org.slf4j.LoggerFactory
import scalaz.Tag
import scalaz.syntax.tag._
@ -65,7 +64,7 @@ object SandboxIndexAndWriteService {
ledgerId: LedgerId,
participantId: ParticipantId,
jdbcUrl: String,
timeModel: TimeModel,
timeModel: ParticipantState.TimeModel,
timeProvider: TimeProvider,
acs: InMemoryActiveLedgerState,
ledgerEntries: ImmArray[LedgerEntryOrBump],
@ -78,6 +77,7 @@ object SandboxIndexAndWriteService {
.jdbcBacked(
jdbcUrl,
ledgerId,
participantId,
timeProvider,
acs,
templateStore,
@ -94,7 +94,7 @@ object SandboxIndexAndWriteService {
def inMemory(
ledgerId: LedgerId,
participantId: ParticipantId,
timeModel: TimeModel,
timeModel: ParticipantState.TimeModel,
timeProvider: TimeProvider,
acs: InMemoryActiveLedgerState,
ledgerEntries: ImmArray[LedgerEntryOrBump],
@ -102,7 +102,7 @@ object SandboxIndexAndWriteService {
metrics: MetricRegistry)(implicit mat: Materializer): IndexAndWriteService = {
val ledger =
Ledger.metered(
Ledger.inMemory(ledgerId, timeProvider, acs, templateStore, ledgerEntries),
Ledger.inMemory(ledgerId, participantId, timeProvider, acs, templateStore, ledgerEntries),
metrics)
createInstance(ledger, participantId, timeModel, timeProvider)
}
@ -110,7 +110,7 @@ object SandboxIndexAndWriteService {
private def createInstance(
ledger: Ledger,
participantId: ParticipantId,
timeModel: TimeModel,
timeModel: ParticipantState.TimeModel,
timeProvider: TimeProvider)(implicit mat: Materializer) = {
val contractStore = new SandboxContractStore(ledger)
val indexSvc = new LedgerBackedIndexService(ledger, contractStore, participantId) {
@ -439,6 +439,5 @@ class LedgerBackedWriteService(ledger: Ledger, timeProvider: TimeProvider) exten
maxRecordTime: Time.Timestamp,
submissionId: String,
config: Configuration): CompletionStage[SubmissionResult] =
// FIXME(JM): Implement configuration changes in sandbox.
CompletableFuture.completedFuture(SubmissionResult.NotSupported)
FutureConverters.toJava(ledger.publishConfiguration(maxRecordTime, submissionId, config))
}

View File

@ -0,0 +1,25 @@
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.sandbox.stores.ledger
import com.daml.ledger.participant.state.v1.{Configuration, ParticipantId}
sealed abstract class ConfigurationEntry extends Product with Serializable
object ConfigurationEntry {
final case class Accepted(
submissionId: String,
participantId: ParticipantId,
configuration: Configuration,
) extends ConfigurationEntry
final case class Rejected(
submissionId: String,
participantId: ParticipantId,
rejectionReason: String,
proposedConfiguration: Configuration
) extends ConfigurationEntry
}

View File

@ -14,6 +14,7 @@ import com.daml.ledger.participant.state.v1._
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.data.ImmArray
import com.digitalasset.daml.lf.data.Ref.{PackageId, Party, TransactionIdString}
import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.daml.lf.language.Ast
import com.digitalasset.daml.lf.transaction.Node.GlobalKey
import com.digitalasset.daml.lf.value.Value
@ -53,6 +54,14 @@ trait WriteLedger extends AutoCloseable {
knownSince: Instant,
sourceDescription: Option[String],
payload: List[Archive]): Future[UploadPackagesResult]
// Configuration management
def publishConfiguration(
maxRecordTime: Timestamp,
submissionId: String,
config: Configuration
): Future[SubmissionResult]
}
/** Defines all the functionalities a Ledger needs to provide */
@ -85,6 +94,9 @@ trait ReadOnlyLedger extends AutoCloseable {
def getLfPackage(packageId: PackageId): Future[Option[Ast.Package]]
// Configuration management
def lookupLedgerConfiguration(): Future[Option[Configuration]]
def configurationEntries(offset: Option[Long]): Source[(Long, ConfigurationEntry), NotUsed]
}
object Ledger {
@ -95,6 +107,7 @@ object Ledger {
* Creates an in-memory ledger
*
* @param ledgerId the id to be used for the ledger
* @param participantId the id of the participant
* @param timeProvider the provider of time
* @param acs the starting ACS store
* @param ledgerEntries the starting entries
@ -102,17 +115,19 @@ object Ledger {
*/
def inMemory(
ledgerId: LedgerId,
participantId: ParticipantId,
timeProvider: TimeProvider,
acs: InMemoryActiveLedgerState,
packages: InMemoryPackageStore,
ledgerEntries: ImmArray[LedgerEntryOrBump]): Ledger =
new InMemoryLedger(ledgerId, timeProvider, acs, packages, ledgerEntries)
new InMemoryLedger(ledgerId, participantId, timeProvider, acs, packages, ledgerEntries)
/**
* Creates a JDBC backed ledger
*
* @param jdbcUrl the jdbc url string containing the username and password as well
* @param ledgerId the id to be used for the ledger
* @param participantId the participant identifier
* @param timeProvider the provider of time
* @param acs the starting ACS store
* @param ledgerEntries the starting entries
@ -123,6 +138,7 @@ object Ledger {
def jdbcBacked(
jdbcUrl: String,
ledgerId: LedgerId,
participantId: ParticipantId,
timeProvider: TimeProvider,
acs: InMemoryActiveLedgerState,
packages: InMemoryPackageStore,
@ -135,6 +151,7 @@ object Ledger {
SqlLedger(
jdbcUrl,
Some(ledgerId),
participantId,
timeProvider,
acs,
packages,

View File

@ -11,6 +11,7 @@ import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref.{PackageId, Party, TransactionIdString}
import com.digitalasset.daml.lf.data.Time
import com.digitalasset.daml.lf.language.Ast
import com.digitalasset.daml.lf.transaction.Node.GlobalKey
import com.digitalasset.daml.lf.value.Value
@ -30,6 +31,7 @@ private class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: MetricRegis
val lookupContract = metrics.timer("Ledger.lookupContract")
val lookupKey = metrics.timer("Ledger.lookupKey")
val lookupTransaction = metrics.timer("Ledger.lookupTransaction")
val lookupLedgerConfiguration = metrics.timer("Ledger.lookupLedgerConfiguration ")
val parties = metrics.timer("Ledger.parties")
val listLfPackages = metrics.timer("Ledger.listLfPackages")
val getLfArchive = metrics.timer("Ledger.getLfArchive")
@ -73,6 +75,13 @@ private class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: MetricRegis
override def close(): Unit = {
ledger.close()
}
override def lookupLedgerConfiguration(): Future[Option[Configuration]] =
timedFuture(Metrics.lookupLedgerConfiguration, ledger.lookupLedgerConfiguration())
override def configurationEntries(
offset: Option[Long]): Source[(Long, ConfigurationEntry), NotUsed] =
ledger.configurationEntries(offset)
}
object MeteredReadOnlyLedger {
@ -89,6 +98,7 @@ private class MeteredLedger(ledger: Ledger, metrics: MetricRegistry)
val publishTransaction = metrics.timer("Ledger.publishTransaction")
val addParty = metrics.timer("Ledger.addParty")
val uploadPackages = metrics.timer("Ledger.uploadPackages")
val publishConfiguration = metrics.timer("Ledger.publishConfiguration ")
}
override def publishHeartbeat(time: Instant): Future[Unit] =
@ -115,9 +125,18 @@ private class MeteredLedger(ledger: Ledger, metrics: MetricRegistry)
Metrics.uploadPackages,
ledger.uploadPackages(knownSince, sourceDescription, payload))
override def publishConfiguration(
maxRecordTime: Time.Timestamp,
submissionId: String,
config: Configuration): Future[SubmissionResult] =
timedFuture(
Metrics.publishConfiguration,
ledger.publishConfiguration(maxRecordTime, submissionId, config))
override def close(): Unit = {
ledger.close()
}
}
object MeteredLedger {

View File

@ -10,6 +10,8 @@ import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.ledger.participant.state.v1.{
Configuration,
ParticipantId,
PartyAllocationResult,
SubmissionResult,
SubmittedTransaction,
@ -18,7 +20,7 @@ import com.daml.ledger.participant.state.v1.{
UploadPackagesResult
}
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.data.ImmArray
import com.digitalasset.daml.lf.data.{ImmArray, Time}
import com.digitalasset.daml.lf.data.Ref.{PackageId, Party, TransactionIdString}
import com.digitalasset.daml.lf.data.Ref.LedgerString.ordering
import com.digitalasset.daml.lf.engine.Blinding
@ -39,7 +41,12 @@ import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.ActiveContract
import com.digitalasset.platform.sandbox.stores.deduplicator.Deduplicator
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry.{Checkpoint, Rejection}
import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryOrBump
import com.digitalasset.platform.sandbox.stores.ledger.{Ledger, LedgerEntry, LedgerSnapshot}
import com.digitalasset.platform.sandbox.stores.ledger.{
Ledger,
LedgerEntry,
LedgerSnapshot,
ConfigurationEntry
}
import com.digitalasset.platform.sandbox.stores.{
ActiveLedgerState,
InMemoryActiveLedgerState,
@ -50,11 +57,16 @@ import org.slf4j.LoggerFactory
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
sealed trait InMemoryEntry extends Product with Serializable
final case class InMemoryLedgerEntry(entry: LedgerEntry) extends InMemoryEntry
final case class InMemoryConfigEntry(entry: ConfigurationEntry) extends InMemoryEntry
/** This stores all the mutable data that we need to run a ledger: the PCS, the ACS, and the deduplicator.
*
*/
class InMemoryLedger(
val ledgerId: LedgerId,
participantId: ParticipantId,
timeProvider: TimeProvider,
acs0: InMemoryActiveLedgerState,
packageStoreInit: InMemoryPackageStore,
@ -64,13 +76,13 @@ class InMemoryLedger(
private val logger = LoggerFactory.getLogger(this.getClass)
private val entries = {
val l = new LedgerEntries[LedgerEntry](_.toString)
val l = new LedgerEntries[InMemoryEntry](_.toString)
ledgerEntries.foreach {
case LedgerEntryOrBump.Bump(increment) =>
l.incrementOffset(increment)
()
case LedgerEntryOrBump.Entry(entry) =>
l.publish(entry)
l.publish(InMemoryLedgerEntry(entry))
()
}
l
@ -79,11 +91,14 @@ class InMemoryLedger(
private val packageStoreRef = new AtomicReference[InMemoryPackageStore](packageStoreInit)
override def ledgerEntries(offset: Option[Long]): Source[(Long, LedgerEntry), NotUsed] =
entries.getSource(offset)
entries
.getSource(offset)
.collect { case (offset, InMemoryLedgerEntry(entry)) => offset -> entry }
// mutable state
private var acs = acs0
private var deduplicator = Deduplicator()
private var ledgerConfiguration: Option[Configuration] = None
override def ledgerEnd: Long = entries.ledgerEnd
@ -114,7 +129,7 @@ class InMemoryLedger(
override def publishHeartbeat(time: Instant): Future[Unit] =
Future.successful(this.synchronized[Unit] {
entries.publish(Checkpoint(time))
entries.publish(InMemoryLedgerEntry(Checkpoint(time)))
()
})
@ -208,7 +223,7 @@ class InMemoryLedger(
mappedTx,
recordBlinding
)
entries.publish(entry)
entries.publish(InMemoryLedgerEntry(entry))
()
}
}
@ -218,6 +233,7 @@ class InMemoryLedger(
private def handleError(submitterInfo: SubmitterInfo, reason: RejectionReason): Unit = {
logger.warn(s"Publishing error to ledger: ${reason.description}")
entries.publish(
InMemoryLedgerEntry(
Rejection(
timeProvider.getCurrentTime,
submitterInfo.commandId,
@ -225,6 +241,7 @@ class InMemoryLedger(
submitterInfo.submitter,
reason)
)
)
()
}
@ -240,8 +257,8 @@ class InMemoryLedger(
Future.successful(
entries
.getEntryAt(n)
.collect[(Long, LedgerEntry.Transaction)] {
case t: LedgerEntry.Transaction =>
.collect {
case InMemoryLedgerEntry(t: LedgerEntry.Transaction) =>
(n, t) // the transaction id is also the offset
})
}
@ -292,4 +309,37 @@ class InMemoryLedger(
}
)
}
override def publishConfiguration(
maxRecordTime: Time.Timestamp,
submissionId: String,
config: Configuration): Future[SubmissionResult] =
Future.successful {
this.synchronized {
ledgerConfiguration match {
case Some(currentConfig) if config.generation != currentConfig.generation =>
entries.publish(InMemoryConfigEntry(ConfigurationEntry.Rejected(
submissionId,
participantId,
"Generation mismatch, expected ${currentConfig.generation}, got ${config.generation}",
config)))
case _ =>
entries.publish(
InMemoryConfigEntry(ConfigurationEntry.Accepted(submissionId, participantId, config)))
ledgerConfiguration = Some(config)
}
SubmissionResult.Acknowledged
}
}
override def lookupLedgerConfiguration(): Future[Option[Configuration]] =
Future.successful(this.synchronized { ledgerConfiguration })
override def configurationEntries(
offset: Option[Long]): Source[(Long, ConfigurationEntry), NotUsed] =
entries
.getSource(offset)
.collect { case (offset, InMemoryConfigEntry(entry)) => offset -> entry }
}

View File

@ -7,6 +7,7 @@ import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.v1.Configuration
import com.digitalasset.daml.lf.archive.Decode
import com.digitalasset.daml.lf.data.Ref.{PackageId, Party, TransactionIdString}
import com.digitalasset.daml.lf.language.Ast
@ -21,7 +22,12 @@ import com.digitalasset.platform.common.util.DirectExecutionContext
import com.digitalasset.platform.participant.util.EventFilter.TemplateAwareFilter
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.LedgerReadDao
import com.digitalasset.platform.sandbox.stores.ledger.{LedgerEntry, LedgerSnapshot, ReadOnlyLedger}
import com.digitalasset.platform.sandbox.stores.ledger.{
LedgerEntry,
LedgerSnapshot,
ReadOnlyLedger,
ConfigurationEntry
}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
@ -85,8 +91,16 @@ class BaseLedger(val ledgerId: LedgerId, headAtInitialization: Long, ledgerDao:
.flatMap(archiveO =>
Future.fromTry(Try(archiveO.map(archive => Decode.decodeArchive(archive)._2))))(DEC)
override def lookupLedgerConfiguration(): Future[Option[Configuration]] =
ledgerDao.lookupLedgerConfiguration()
override def configurationEntries(
offset: Option[Long]): Source[(Long, ConfigurationEntry), NotUsed] =
dispatcher.startingAt(offset.getOrElse(0), RangeSource(ledgerDao.getConfigurationEntries(_, _)))
override def close(): Unit = {
dispatcher.close()
ledgerDao.close()
}
}

View File

@ -14,9 +14,9 @@ import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.ledger.participant.state.v1._
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.data.Ref.Party
import com.digitalasset.daml.lf.data.Ref.LedgerString.ordering
import com.digitalasset.daml.lf.data.{ImmArray, Ref}
import com.digitalasset.daml.lf.data.Ref.Party
import com.digitalasset.daml.lf.data.{ImmArray, Ref, Time}
import com.digitalasset.daml.lf.engine.Blinding
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractId}
import com.digitalasset.daml_lf_dev.DamlLf.Archive
@ -69,6 +69,7 @@ object SqlLedger {
def apply(
jdbcUrl: String,
ledgerId: Option[LedgerId],
participantId: ParticipantId,
timeProvider: TimeProvider,
acs: InMemoryActiveLedgerState,
packages: InMemoryPackageStore,
@ -108,6 +109,7 @@ object SqlLedger {
sqlLedgerFactory.createSqlLedger(
ledgerId,
participantId,
timeProvider,
startMode,
acs,
@ -123,6 +125,7 @@ object SqlLedger {
private class SqlLedger(
ledgerId: LedgerId,
participantId: ParticipantId,
headAtInitialization: Long,
ledgerDao: LedgerDao,
timeProvider: TimeProvider,
@ -135,11 +138,13 @@ private class SqlLedger(
private val logger = loggerFactory.getLogger(getClass)
private case class Offsets(offset: Long, nextOffset: Long)
// the reason for modelling persistence as a reactive pipeline is to avoid having race-conditions between the
// moving ledger-end, the async persistence operation and the dispatcher head notification
private val (checkpointQueue, persistenceQueue): (
SourceQueueWithComplete[Long => PersistenceEntry],
SourceQueueWithComplete[Long => PersistenceEntry]) = createQueues()
SourceQueueWithComplete[Offsets => Future[Unit]],
SourceQueueWithComplete[Offsets => Future[Unit]]) = createQueues()
watchForFailures(checkpointQueue, "checkpoint")
watchForFailures(persistenceQueue, "persistence")
@ -153,12 +158,12 @@ private class SqlLedger(
}(DEC)
private def createQueues(): (
SourceQueueWithComplete[Long => PersistenceEntry],
SourceQueueWithComplete[Long => PersistenceEntry]) = {
SourceQueueWithComplete[Offsets => Future[Unit]],
SourceQueueWithComplete[Offsets => Future[Unit]]) = {
val checkpointQueue = Source.queue[Long => PersistenceEntry](1, OverflowStrategy.dropHead)
val checkpointQueue = Source.queue[Offsets => Future[Unit]](1, OverflowStrategy.dropHead)
val persistenceQueue =
Source.queue[Long => PersistenceEntry](queueDepth, OverflowStrategy.dropNew)
Source.queue[Offsets => Future[Unit]](queueDepth, OverflowStrategy.dropNew)
implicit val ec: ExecutionContext = DEC
@ -167,7 +172,7 @@ private class SqlLedger(
q1Mat -> q2Mat
} { implicit b => (s1, s2) =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val merge = b.add(MergePreferred[Long => PersistenceEntry](1))
val merge = b.add(MergePreferred[Offsets => Future[Unit]](1))
s1 ~> merge.preferred
s2 ~> merge.in(0)
@ -187,27 +192,20 @@ private class SqlLedger(
//shooting the SQL queries in parallel
Future
.sequence(queue.toIterator.zipWithIndex.map {
case (ledgerEntryGen, i) =>
case (persist, i) =>
val offset = startOffset + i
ledgerDao
.storeLedgerEntry(offset, offset + 1, None, ledgerEntryGen(offset))
.map(_ => ())(DEC)
.recover {
case t =>
//recovering from the failure so the persistence stream doesn't die
logger.error(s"Failed to persist entry with offset: $offset", t)
()
}(DEC)
persist(Offsets(offset, offset + 1L))
})
.map { _ =>
//note that we can have holes in offsets in case of the storing of an entry failed for some reason
dispatcher.signalNewHead(startOffset + queue.length) //signalling downstream subscriptions
}(DEC)
}
.toMat(Sink.ignore)(Keep.left[
.toMat(Sink.ignore)(
Keep.left[
(
SourceQueueWithComplete[Long => PersistenceEntry],
SourceQueueWithComplete[Long => PersistenceEntry]),
SourceQueueWithComplete[Offsets => Future[Unit]],
SourceQueueWithComplete[Offsets => Future[Unit]]),
Future[Done]])
.run()
}
@ -218,17 +216,29 @@ private class SqlLedger(
checkpointQueue.complete()
}
private def storeLedgerEntry(offsets: Offsets, entry: PersistenceEntry): Future[Unit] =
ledgerDao
.storeLedgerEntry(offsets.offset, offsets.nextOffset, None, entry)
.map(_ => ())(DEC)
.recover {
case t =>
//recovering from the failure so the persistence stream doesn't die
logger.error(s"Failed to persist entry with offsets: $offsets", t)
()
}(DEC)
override def publishHeartbeat(time: Instant): Future[Unit] =
checkpointQueue
.offer(_ => PersistenceEntry.Checkpoint(LedgerEntry.Checkpoint(time)))
.offer(offsets =>
storeLedgerEntry(offsets, PersistenceEntry.Checkpoint(LedgerEntry.Checkpoint(time))))
.map(_ => ())(DEC) //this never pushes back, see createQueues above!
override def publishTransaction(
submitterInfo: SubmitterInfo,
transactionMeta: TransactionMeta,
transaction: SubmittedTransaction): Future[SubmissionResult] =
enqueue { offset =>
val transactionId = Ref.LedgerString.fromLong(offset)
enqueue { offsets =>
val transactionId = Ref.LedgerString.fromLong(offsets.offset)
val toAbsCoid: ContractId => AbsoluteContractId =
SandboxEventIdFormatter.makeAbsCoid(transactionId)
@ -249,7 +259,7 @@ private class SqlLedger(
}
val recordTime = timeProvider.getCurrentTime
if (recordTime.isAfter(submitterInfo.maxRecordTime.toInstant)) {
val entry = if (recordTime.isAfter(submitterInfo.maxRecordTime.toInstant)) {
// This can happen if the DAML-LF computation (i.e. exercise of a choice) takes longer
// than the time window between LET and MRT allows for.
// See https://github.com/digital-asset/daml/issues/987
@ -281,11 +291,14 @@ private class SqlLedger(
List.empty
)
}
storeLedgerEntry(offsets, entry)
}
private def enqueue(f: Long => PersistenceEntry): Future[SubmissionResult] =
private def enqueue(persist: Offsets => Future[Unit]): Future[SubmissionResult] =
persistenceQueue
.offer(f)
.offer(persist)
.transform {
case Success(Enqueued) =>
Success(SubmissionResult.Acknowledged)
@ -331,6 +344,35 @@ private class SqlLedger(
UploadPackagesResult.Ok
}(DEC)
}
override def publishConfiguration(
maxRecordTime: Time.Timestamp,
submissionId: String,
config: Configuration): Future[SubmissionResult] =
enqueue { offsets =>
val recordTime = timeProvider.getCurrentTime
// NOTE(JM): If the generation in the new configuration is invalid
// we persist a rejection.
ledgerDao
.storeConfigurationEntry(
offsets.offset,
offsets.nextOffset,
None,
recordTime,
submissionId,
participantId,
config,
None
)
.map(_ => ())(DEC)
.recover {
case t =>
//recovering from the failure so the persistence stream doesn't die
logger.error(s"Failed to persist configuration with offsets: $offsets", t)
()
}(DEC)
}
}
private class SqlLedgerFactory(ledgerDao: LedgerDao, loggerFactory: NamedLoggerFactory) {
@ -343,6 +385,7 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao, loggerFactory: NamedLoggerF
* @param initialLedgerId a random ledger id is generated if none given, if set it's used to initialize the ledger.
* In case the ledger had already been initialized, the given ledger id must not be set or must
* be equal to the one in the database.
* @param participantId the participant identifier
* @param timeProvider to get the current time when sequencing transactions
* @param startMode whether we should start with a clean state or continue where we left off
* @param initialLedgerEntries The initial ledger entries -- usually provided by the scenario runner. Will only be
@ -354,6 +397,7 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao, loggerFactory: NamedLoggerF
*/
def createSqlLedger(
initialLedgerId: Option[LedgerId],
participantId: ParticipantId,
timeProvider: TimeProvider,
startMode: SqlStartMode,
acs: InMemoryActiveLedgerState,
@ -381,6 +425,7 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao, loggerFactory: NamedLoggerF
} yield
new SqlLedger(
ledgerId,
participantId,
ledgerEnd,
ledgerDao,
timeProvider,

View File

@ -4,6 +4,7 @@ package com.digitalasset.platform.sandbox.stores.ledger.sql.dao
import java.io.InputStream
import java.sql.Connection
import java.time.Instant
import java.util.Date
import akka.stream.Materializer
@ -13,7 +14,12 @@ import anorm.SqlParser._
import anorm.ToStatement.optionToStatement
import anorm.{AkkaStream, BatchSql, Macro, NamedParameter, RowParser, SQL, SqlParser}
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.ledger.participant.state.v1.{AbsoluteContractInst, TransactionId}
import com.daml.ledger.participant.state.v1.{
AbsoluteContractInst,
Configuration,
ParticipantId,
TransactionId
}
import com.digitalasset.daml.lf.archive.Decode
import com.digitalasset.daml.lf.data.Ref.{
ContractIdString,
@ -40,7 +46,7 @@ import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.{
DivulgedContract
}
import com.digitalasset.platform.sandbox.stores._
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry
import com.digitalasset.platform.sandbox.stores.ledger.{ConfigurationEntry, LedgerEntry}
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry._
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.JdbcLedgerDao.{
H2DatabaseQueries,
@ -141,6 +147,154 @@ private class JdbcLedgerDao(
()
}
private val SQL_UPDATE_CURRENT_CONFIGURATION = SQL(
"update parameters set configuration={configuration}"
)
private val SQL_SELECT_CURRENT_CONFIGURATION = SQL("select configuration from parameters")
private val SQL_GET_CONFIGURATION_ENTRIES = SQL(
"select * from configuration_entries where ledger_offset>={startInclusive} and ledger_offset<{endExclusive} order by ledger_offset asc")
private def updateCurrentConfiguration(configBytes: Array[Byte])(
implicit conn: Connection): Unit = {
SQL_UPDATE_CURRENT_CONFIGURATION
.on("configuration" -> configBytes)
.execute()
()
}
private def selectLedgerConfiguration(implicit conn: Connection) =
SQL_SELECT_CURRENT_CONFIGURATION
.as(byteArray("configuration").?.single)
.flatMap(Configuration.decode(_).toOption)
override def lookupLedgerConfiguration(): Future[Option[Configuration]] =
dbDispatcher.executeSql("lookup_configuration")(implicit conn => selectLedgerConfiguration)
private val configurationAcceptType = "accept"
private val configurationRejectType = "reject"
private val configurationEntryParser: RowParser[(Long, ConfigurationEntry)] =
(long("ledger_offset") ~
str("typ") ~
str("submission_id") ~
str("participant_id") ~
str("rejection_reason")(emptyStringToNullColumn).? ~
byteArray("configuration"))
.map(flatten)
.map {
case (offset, typ, submissionId, participantIdRaw, rejectionReason, configBytes) =>
val config = Configuration
.decode(configBytes)
.fold(err => sys.error(s"Failed to decode configuration: $err"), identity)
val participantId = LedgerString
.fromString(participantIdRaw)
.fold(
err => sys.error(s"Failed to decode participant id in configuration entry: $err"),
identity)
offset ->
(typ match {
case `configurationAcceptType` =>
ConfigurationEntry.Accepted(
submissionId = submissionId,
participantId = participantId,
configuration = config
)
case `configurationRejectType` =>
ConfigurationEntry.Rejected(
submissionId = submissionId,
participantId = participantId,
rejectionReason = rejectionReason.getOrElse("<missing reason>"),
proposedConfiguration = config
)
case _ =>
sys.error(s"getConfigurationEntries: Unknown configuration entry type: $typ")
})
}
override def getConfigurationEntries(
startInclusive: Long,
endExclusive: Long): Source[(Long, ConfigurationEntry), NotUsed] =
paginatingStream(
startInclusive,
endExclusive,
PageSize,
(startI, endE) => {
dbDispatcher.executeSql("load_configuration_entries", Some(s"bounds: [$startI, $endE[")) {
implicit conn =>
SQL_GET_CONFIGURATION_ENTRIES
.on("startInclusive" -> startI, "endExclusive" -> endE)
.as(configurationEntryParser.*)
}
}
).flatMapConcat(Source(_))
private val SQL_INSERT_CONFIGURATION_ENTRY =
SQL(
"""insert into configuration_entries(ledger_offset, recorded_at, submission_id, participant_id, typ, rejection_reason, configuration)
|values({ledger_offset}, {recorded_at}, {submission_id}, {participant_id}, {typ}, {rejection_reason}, {configuration})
|""".stripMargin)
override def storeConfigurationEntry(
offset: LedgerOffset,
newLedgerEnd: LedgerOffset,
externalOffset: Option[ExternalOffset],
recordedAt: Instant,
submissionId: String,
participantId: ParticipantId,
configuration: Configuration,
rejectionReason: Option[String]
): Future[PersistenceResponse] = {
dbDispatcher.executeSql("store_configuration_entry", Some("submissionId=$submissionId")) {
implicit conn =>
val currentConfig = selectLedgerConfiguration
var finalRejectionReason = rejectionReason
if (rejectionReason.isEmpty && (currentConfig exists (_.generation + 1 != configuration.generation))) {
// If we're not storing a rejection and the new generation is not succ of current configuration, then
// we store a rejection. This code path is only expected to be taken in sandbox. This follows the same
// pattern as storing transactions.
finalRejectionReason = Some(s"Generation mismatch")
}
updateLedgerEnd(newLedgerEnd, externalOffset)
val configurationBytes = Configuration.encode(configuration).toByteArray
val typ = if (finalRejectionReason.isEmpty) {
configurationAcceptType
} else {
configurationRejectType
}
Try({
SQL_INSERT_CONFIGURATION_ENTRY
.on(
"ledger_offset" -> offset,
"recorded_at" -> recordedAt,
"submission_id" -> submissionId,
"participant_id" -> participantId,
"typ" -> typ,
"rejection_reason" -> finalRejectionReason.orNull,
"configuration" -> configurationBytes
)
.execute()
if (typ == configurationAcceptType) {
updateCurrentConfiguration(configurationBytes)
}
PersistenceResponse.Ok
}).recover {
case NonFatal(e) if e.getMessage.contains(queries.DUPLICATE_KEY_ERROR) =>
logger.warn(
s"Ignoring duplicate configuration submission for submissionId $submissionId, participantId $participantId")
conn.rollback()
PersistenceResponse.Duplicate
}.get
}
}
private val SQL_INSERT_CONTRACT_KEY =
SQL(
"insert into contract_keys(package_id, name, value_hash, contract_id) values({package_id}, {name}, {value_hash}, {contract_id})")
@ -1317,6 +1471,7 @@ private class JdbcLedgerDao(
|truncate contract_key_maintainers cascade;
|truncate parameters cascade;
|truncate contract_keys cascade;
|truncate configuration_entries cascade;
""".stripMargin)
override def reset(): Future[Unit] =

View File

@ -3,12 +3,15 @@
package com.digitalasset.platform.sandbox.stores.ledger.sql.dao
import java.time.Instant
import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.ledger.participant.state.v1.{AbsoluteContractInst, TransactionId}
import com.daml.ledger.participant.state.v1.{Configuration, ParticipantId, TransactionId}
import com.digitalasset.daml.lf.data.Ref.{LedgerString, PackageId, Party}
import com.digitalasset.daml.lf.data.Relation.Relation
import com.digitalasset.daml.lf.transaction.Node
@ -20,7 +23,7 @@ import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails}
import com.digitalasset.platform.common.util.DirectExecutionContext
import com.digitalasset.platform.participant.util.EventFilter.TemplateAwareFilter
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.{ActiveContract, Contract}
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry
import com.digitalasset.platform.sandbox.stores.ledger.{ConfigurationEntry, LedgerEntry}
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry.Transaction
import scala.collection.immutable
@ -78,6 +81,14 @@ trait LedgerReadDao extends AutoCloseable {
contractId: AbsoluteContractId,
forParty: Party): Future[Option[Contract]]
/** Looks up the current ledger configuration, if it has been set. */
def lookupLedgerConfiguration(): Future[Option[Configuration]]
/** Returns a stream of configuration entries. */
def getConfigurationEntries(
startInclusive: LedgerOffset,
endExclusive: LedgerOffset): Source[(Long, ConfigurationEntry), NotUsed]
/**
* Looks up a LedgerEntry at a given offset
*
@ -200,6 +211,20 @@ trait LedgerWriteDao extends AutoCloseable {
externalOffset: Option[ExternalOffset]
): Future[PersistenceResponse]
/**
* Store a configuration change or rejection.
*/
def storeConfigurationEntry(
offset: LedgerOffset,
newLedgerEnd: LedgerOffset,
externalOffset: Option[ExternalOffset],
recordedAt: Instant,
submissionId: String,
participantId: ParticipantId,
configuration: Configuration,
rejectionReason: Option[String]
): Future[PersistenceResponse]
/**
* Stores a set of DAML-LF packages
*

View File

@ -3,11 +3,13 @@
package com.digitalasset.platform.sandbox.stores.ledger.sql.dao
import java.time.Instant
import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.v1.TransactionId
import com.daml.ledger.participant.state.v1.{Configuration, ParticipantId, TransactionId}
import com.digitalasset.daml.lf.data.Ref.{LedgerString, PackageId, Party}
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.digitalasset.daml.lf.transaction.Node
@ -18,6 +20,7 @@ import com.digitalasset.platform.participant.util.EventFilter.TemplateAwareFilte
import com.digitalasset.platform.sandbox.metrics.timedFuture
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.{ActiveContract, Contract}
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry
import com.digitalasset.platform.sandbox.stores.ledger.{ConfigurationEntry, LedgerEntry}
import scala.collection.immutable
import scala.concurrent.Future
@ -32,6 +35,7 @@ private class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics: MetricRegi
val lookupExternalLedgerEnd = metrics.timer("LedgerDao.lookupExternalLedgerEnd")
val lookupLedgerEntry = metrics.timer("LedgerDao.lookupLedgerEntry")
val lookupTransaction = metrics.timer("LedgerDao.lookupTransaction")
val lookupLedgerConfiguration = metrics.timer("LedgerDao.lookupLedgerConfiguration")
val lookupKey = metrics.timer("LedgerDao.lookupKey")
val lookupActiveContract = metrics.timer("LedgerDao.lookupActiveContract")
val getParties = metrics.timer("LedgerDao.getParties")
@ -88,6 +92,16 @@ private class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics: MetricRegi
override def close(): Unit = {
ledgerDao.close()
}
/** Looks up the current ledger configuration, if it has been set. */
override def lookupLedgerConfiguration(): Future[Option[Configuration]] =
timedFuture(Metrics.lookupLedgerConfiguration, ledgerDao.lookupLedgerConfiguration())
/** Get a stream of configuration entries. */
override def getConfigurationEntries(
startInclusive: LedgerOffset,
endExclusive: LedgerOffset): Source[(LedgerOffset, ConfigurationEntry), NotUsed] =
ledgerDao.getConfigurationEntries(startInclusive, endExclusive)
}
private class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: MetricRegistry)
@ -99,7 +113,7 @@ private class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: MetricRegistry)
val storeInitialState = metrics.timer("LedgerDao.storeInitialState")
val uploadLfPackages = metrics.timer("LedgerDao.uploadLfPackages")
val storeLedgerEntry = metrics.timer("LedgerDao.storeLedgerEntry")
val storeConfigurationEntry = metrics.timer("LedgerDao.storeConfigurationEntry")
}
override def storeLedgerEntry(
offset: Long,
@ -131,6 +145,29 @@ private class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: MetricRegistry)
externalOffset: Option[ExternalOffset]): Future[PersistenceResponse] =
timedFuture(Metrics.storeParty, ledgerDao.storeParty(party, displayName, externalOffset))
override def storeConfigurationEntry(
offset: LedgerOffset,
newLedgerEnd: LedgerOffset,
externalOffset: Option[ExternalOffset],
recordTime: Instant,
submissionId: String,
participantId: ParticipantId,
configuration: Configuration,
rejectionReason: Option[String]
): Future[PersistenceResponse] =
timedFuture(
Metrics.storeConfigurationEntry,
ledgerDao.storeConfigurationEntry(
offset,
newLedgerEnd,
externalOffset,
recordTime,
submissionId,
participantId,
configuration,
rejectionReason)
)
override def uploadLfPackages(
uploadId: String,
packages: List[(Archive, PackageDetails)],

View File

@ -5,6 +5,7 @@ package com.digitalasset.platform.sandbox
import akka.stream.Materializer
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.v1.ParticipantId
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.ledger.api.testing.utils.Resource
import com.digitalasset.platform.sandbox.persistence.{PostgresFixture, PostgresResource}
@ -34,6 +35,7 @@ object LedgerResource {
def inMemory(
ledgerId: LedgerId,
participantId: ParticipantId,
timeProvider: TimeProvider,
acs: InMemoryActiveLedgerState = InMemoryActiveLedgerState.empty,
packages: InMemoryPackageStore = InMemoryPackageStore.empty,
@ -41,12 +43,13 @@ object LedgerResource {
LedgerResource.resource(
() =>
Future.successful(
Ledger.inMemory(ledgerId, timeProvider, acs, packages, entries)
Ledger.inMemory(ledgerId, participantId, timeProvider, acs, packages, entries)
)
)
def postgres(
ledgerId: LedgerId,
participantId: ParticipantId,
timeProvider: TimeProvider,
metrics: MetricRegistry,
packages: InMemoryPackageStore = InMemoryPackageStore.empty)(implicit mat: Materializer) = {
@ -68,6 +71,7 @@ object LedgerResource {
Ledger.jdbcBacked(
postgres.value.jdbcUrl,
ledgerId,
participantId,
timeProvider,
InMemoryActiveLedgerState.empty,
packages,

View File

@ -8,7 +8,7 @@ import java.time.Instant
import akka.stream.ActorMaterializer
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.participant.state.v1.{ParticipantId, TimeModel}
import com.digitalasset.api.util.{TimeProvider, ToleranceWindow}
import com.digitalasset.daml.lf.archive.DarReader
import com.digitalasset.daml.lf.data.{ImmArray, Ref}
@ -23,7 +23,6 @@ import com.digitalasset.platform.sandbox.stores.{
InMemoryPackageStore,
SandboxIndexAndWriteService
}
import com.digitalasset.platform.services.time.TimeModel
import scala.concurrent.ExecutionContext

View File

@ -5,7 +5,6 @@ package com.digitalasset.platform.sandbox.services
import java.io.File
import java.util.concurrent.Executors
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import com.digitalasset.daml.bazeltools.BazelRunfiles._
@ -22,16 +21,16 @@ import com.digitalasset.ledger.api.v1.testing.time_service.TimeServiceGrpc
import com.digitalasset.ledger.client.services.testing.time.StaticTime
import com.digitalasset.platform.common.LedgerIdMode
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.services.time.{TimeModel, TimeProviderType}
import com.digitalasset.platform.services.time.TimeProviderType
import io.grpc.Channel
import org.scalatest.{BeforeAndAfterAll, Suite}
import scalaz.syntax.tag._
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration._
import scala.util.Try
import com.digitalasset.ledger.api.domain.LedgerId
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.daml.ledger.participant.state.v1.TimeModel
import org.slf4j.LoggerFactory
trait SandboxFixture extends SuiteResource[Channel] with BeforeAndAfterAll {

View File

@ -6,7 +6,12 @@ package com.digitalasset.platform.sandbox.stores.ledger
import java.time.Instant
import akka.stream.scaladsl.Sink
import com.daml.ledger.participant.state.v1.{SubmissionResult, SubmitterInfo, TransactionMeta}
import com.daml.ledger.participant.state.v1.{
ParticipantId,
SubmissionResult,
SubmitterInfo,
TransactionMeta
}
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.data.{ImmArray, Ref, Time}
import com.digitalasset.daml.lf.transaction.Node._
@ -60,6 +65,7 @@ class ImplicitPartyAdditionIT
override def timeLimit: Span = scaled(60.seconds)
private val ledgerId: LedgerId = LedgerId("ledgerId")
private val participantId: ParticipantId = Ref.LedgerString.assertFromString("participantId")
private val timeProvider = TimeProvider.Constant(Instant.EPOCH.plusSeconds(10))
private val templateId1: Ref.Identifier = Ref.Identifier(
@ -80,9 +86,9 @@ class ImplicitPartyAdditionIT
override protected def constructResource(index: Int, fixtureId: BackendType): Resource[Ledger] =
fixtureId match {
case BackendType.InMemory =>
LedgerResource.inMemory(ledgerId, timeProvider)
LedgerResource.inMemory(ledgerId, participantId, timeProvider)
case BackendType.Postgres =>
LedgerResource.postgres(ledgerId, timeProvider, metrics)
LedgerResource.postgres(ledgerId, participantId, timeProvider, metrics)
}
private def publishSingleNodeTx(

View File

@ -6,7 +6,12 @@ package com.digitalasset.platform.sandbox.stores.ledger
import java.time.Instant
import akka.stream.scaladsl.Sink
import com.daml.ledger.participant.state.v1.{SubmissionResult, SubmitterInfo, TransactionMeta}
import com.daml.ledger.participant.state.v1.{
ParticipantId,
SubmissionResult,
SubmitterInfo,
TransactionMeta
}
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.data.{ImmArray, Ref, Time}
import com.digitalasset.daml.lf.transaction.GenTransaction
@ -51,6 +56,7 @@ class TransactionMRTComplianceIT
override def timeLimit: Span = scaled(60.seconds)
val ledgerId: LedgerId = LedgerId(Ref.LedgerString.assertFromString("ledgerId"))
private val participantId: ParticipantId = Ref.LedgerString.assertFromString("participantId")
val timeProvider = TimeProvider.Constant(Instant.EPOCH.plusSeconds(10))
/** Overriding this provides an easy way to narrow down testing to a single implementation. */
@ -60,9 +66,9 @@ class TransactionMRTComplianceIT
override protected def constructResource(index: Int, fixtureId: BackendType): Resource[Ledger] =
fixtureId match {
case BackendType.InMemory =>
LedgerResource.inMemory(ledgerId, timeProvider)
LedgerResource.inMemory(ledgerId, participantId, timeProvider)
case BackendType.Postgres =>
LedgerResource.postgres(ledgerId, timeProvider, metrics)
LedgerResource.postgres(ledgerId, participantId, timeProvider, metrics)
}
val LET = Instant.EPOCH.plusSeconds(2)

View File

@ -11,7 +11,7 @@ import java.util.concurrent.atomic.AtomicLong
import akka.stream.scaladsl.{Sink, Source}
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.v1.Offset
import com.daml.ledger.participant.state.v1.{Configuration, Offset, TimeModel}
import com.digitalasset.daml.bazeltools.BazelRunfiles
import com.digitalasset.daml.lf.archive.DarReader
import com.digitalasset.daml.lf.data.Ref.LedgerString.ordering
@ -45,7 +45,6 @@ import com.digitalasset.platform.common.logging.NamedLoggerFactory
import com.digitalasset.platform.participant.util.EventFilter
import com.digitalasset.platform.sandbox.persistence.PostgresAroundAll
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.ActiveContract
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao._
import com.digitalasset.platform.sandbox.stores.ledger.sql.migration.FlywayMigrations
import com.digitalasset.platform.sandbox.stores.ledger.sql.serialisation.{
@ -55,6 +54,7 @@ import com.digitalasset.platform.sandbox.stores.ledger.sql.serialisation.{
ValueSerializer
}
import com.digitalasset.platform.sandbox.stores.ledger.sql.util.DbDispatcher
import com.digitalasset.platform.sandbox.stores.ledger.{ConfigurationEntry, LedgerEntry}
import org.scalatest.{AsyncWordSpec, Matchers, OptionValues}
import scala.collection.immutable.TreeMap
@ -270,6 +270,142 @@ class JdbcLedgerDaoSpec
persistAndLoadRejection(nextExternalOffset())
}
val defaultConfig = Configuration(
generation = 0,
timeModel = TimeModel.reasonableDefault
)
"be able to persist and load configuration" in {
val offset = nextOffset()
for {
startingOffset <- ledgerDao.lookupLedgerEnd()
startingConfig <- ledgerDao.lookupLedgerConfiguration()
response <- ledgerDao.storeConfigurationEntry(
offset,
offset + 1,
None,
Instant.EPOCH,
"submission-0",
Ref.LedgerString.assertFromString("participant-0"),
defaultConfig,
None
)
storedConfig <- ledgerDao.lookupLedgerConfiguration()
endingOffset <- ledgerDao.lookupLedgerEnd()
} yield {
response shouldEqual PersistenceResponse.Ok
startingConfig shouldEqual None
storedConfig shouldEqual Some(defaultConfig)
endingOffset shouldEqual (startingOffset + 1)
}
}
"be able to persist configuration rejection" in {
val offset = nextOffset()
val participantId = Ref.LedgerString.assertFromString("participant-0")
for {
startingConfig <- ledgerDao.lookupLedgerConfiguration()
proposedConfig = startingConfig.getOrElse(defaultConfig)
response <- ledgerDao.storeConfigurationEntry(
offset,
offset + 1,
None,
Instant.EPOCH,
"config-rejection-0",
participantId,
proposedConfig,
Some("bad config")
)
storedConfig <- ledgerDao.lookupLedgerConfiguration()
entries <- ledgerDao.getConfigurationEntries(offset, offset + 1).runWith(Sink.seq)
} yield {
response shouldEqual PersistenceResponse.Ok
startingConfig shouldEqual storedConfig
entries shouldEqual List(
offset -> ConfigurationEntry
.Rejected("config-rejection-0", participantId, "bad config", proposedConfig)
)
}
}
"refuse to persist invalid configuration entry" in {
val offset0 = nextOffset()
val participantId = Ref.LedgerString.assertFromString("participant-0")
for {
config <- ledgerDao.lookupLedgerConfiguration().map(_.getOrElse(defaultConfig))
// Store a new configuration with a known submission id
resp0 <- ledgerDao.storeConfigurationEntry(
offset0,
offset0 + 1,
None,
Instant.EPOCH,
"refuse-config-0",
participantId,
config.copy(generation = config.generation + 1),
None
)
newConfig <- ledgerDao.lookupLedgerConfiguration().map(_.get)
// Submission with duplicate submissionId is rejected
offset1 = nextOffset()
resp1 <- ledgerDao.storeConfigurationEntry(
offset1,
offset1 + 1,
None,
Instant.EPOCH,
"refuse-config-0",
participantId,
newConfig.copy(generation = config.generation + 1),
None
)
// Submission with mismatching generation is rejected
offset2 = nextOffset()
resp2 <- ledgerDao.storeConfigurationEntry(
offset2,
offset2 + 1,
None,
Instant.EPOCH,
"refuse-config-1",
participantId,
config,
None
)
// Submission with unique submissionId and correct generation is accepted.
offset3 = nextOffset()
lastConfig = newConfig.copy(generation = newConfig.generation + 1)
resp3 <- ledgerDao.storeConfigurationEntry(
offset3,
offset3 + 1,
None,
Instant.EPOCH,
"refuse-config-2",
participantId,
lastConfig,
None
)
lastConfigActual <- ledgerDao.lookupLedgerConfiguration().map(_.get)
entries <- ledgerDao.getConfigurationEntries(offset0, offset3 + 1).runWith(Sink.seq)
} yield {
resp0 shouldEqual PersistenceResponse.Ok
resp1 shouldEqual PersistenceResponse.Duplicate
resp2 shouldEqual PersistenceResponse.Ok
resp3 shouldEqual PersistenceResponse.Ok
lastConfig shouldEqual lastConfigActual
entries.toList shouldEqual List(
offset0 -> ConfigurationEntry.Accepted("refuse-config-0", participantId, newConfig),
offset2 -> ConfigurationEntry
.Rejected("refuse-config-1", participantId, "Generation mismatch", config),
offset3 -> ConfigurationEntry.Accepted("refuse-config-2", participantId, lastConfig)
)
}
}
"refuse to persist an upload with no packages without external offset" in {
recoverToSucceededIf[IllegalArgumentException] {
ledgerDao.uploadLfPackages(UUID.randomUUID().toString, Nil, None)

View File

@ -3,6 +3,7 @@
package com.digitalasset.platform.sandbox.stores.ledger.sql
import com.daml.ledger.participant.state.v1.ParticipantId
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.data.{ImmArray, Ref}
import com.digitalasset.ledger.api.domain.LedgerId
@ -31,6 +32,7 @@ class SqlLedgerSpec
private val queueDepth = 128
private val ledgerId: LedgerId = LedgerId(Ref.LedgerString.assertFromString("TheLedger"))
private val participantId: ParticipantId = Ref.LedgerString.assertFromString("TheParticipant")
private val loggerFactory = NamedLoggerFactory(this.getClass)
@ -39,6 +41,7 @@ class SqlLedgerSpec
val ledgerF = SqlLedger(
jdbcUrl = postgresFixture.jdbcUrl,
ledgerId = None,
participantId = participantId,
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty,
@ -58,6 +61,7 @@ class SqlLedgerSpec
val ledgerF = SqlLedger(
jdbcUrl = postgresFixture.jdbcUrl,
ledgerId = Some(ledgerId),
participantId = participantId,
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty,
@ -79,6 +83,7 @@ class SqlLedgerSpec
ledger1 <- SqlLedger(
jdbcUrl = postgresFixture.jdbcUrl,
ledgerId = Some(ledgerId),
participantId = participantId,
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty,
@ -92,6 +97,7 @@ class SqlLedgerSpec
ledger2 <- SqlLedger(
jdbcUrl = postgresFixture.jdbcUrl,
ledgerId = Some(ledgerId),
participantId = participantId,
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty,
@ -105,6 +111,7 @@ class SqlLedgerSpec
ledger3 <- SqlLedger(
jdbcUrl = postgresFixture.jdbcUrl,
ledgerId = None,
participantId = participantId,
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty,
@ -128,6 +135,7 @@ class SqlLedgerSpec
_ <- SqlLedger(
jdbcUrl = postgresFixture.jdbcUrl,
ledgerId = Some(LedgerId(Ref.LedgerString.assertFromString("TheLedger"))),
participantId = participantId,
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty,
@ -140,6 +148,7 @@ class SqlLedgerSpec
_ <- SqlLedger(
jdbcUrl = postgresFixture.jdbcUrl,
ledgerId = Some(LedgerId(Ref.LedgerString.assertFromString("AnotherLedger"))),
participantId = participantId,
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty,

View File

@ -1,10 +1,10 @@
- target: //daml-lf/data:data
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //daml-lf/archive:daml_lf_dev_archive_java_proto
type: jar-lib
javadoc-jar: daml_lf_dev_archive_java_proto_javadoc.jar
mavenUpload: True
mavenUpload: true
- target: //daml-lf/archive:daml_lf_dev_archive_proto_zip
type: zip
location:
@ -18,7 +18,7 @@
- target: //daml-lf/archive:daml_lf_1.6_archive_java_proto
type: jar-lib
javadoc-jar: daml_lf_1.6_archive_java_proto_javadoc.jar
mavenUpload: True
mavenUpload: true
- target: //daml-lf/archive:daml_lf_1.6_archive_proto_zip
type: zip
location:
@ -41,10 +41,10 @@
artifactId: daml-lf-1.7-archive-proto
- target: //daml-lf/archive:daml_lf_archive_reader
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //compiler/damlc/jar:damlc_jar
type: jar-deploy
platformDependent: True
platformDependent: true
- target: //daml-lf/transaction:value_java_proto
type: jar-proto
mavenUpload: true
@ -56,48 +56,48 @@
mavenUpload: true
- target: //daml-lf/transaction:transaction
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //daml-lf/data-scalacheck:data-scalacheck
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //daml-lf/transaction-scalacheck:transaction-scalacheck
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //daml-lf/language:language
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //daml-lf/interface:interface
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //daml-lf/validation:validation
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //daml-lf/interpreter:interpreter
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //daml-lf/scenario-interpreter:scenario-interpreter
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //daml-lf/engine:engine
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //daml-lf/repl:repl
type: jar
- target: //ledger-api/grpc-definitions:ledger-api-protos-tarball
type: targz
mavenUpload: True
mavenUpload: true
location:
groupId: com.digitalasset
artifactId: ledger-api-protos
- target: //ledger-api/rs-grpc-bridge:rs-grpc-bridge
type: jar-lib
mavenUpload: True
mavenUpload: true
- target: //language-support/java/bindings:bindings-java
type: jar-lib
mavenUpload: True
mavenUpload: true
- target: //language-support/java/bindings-rxjava:bindings-rxjava
type: jar-lib
mavenUpload: True
mavenUpload: true
- target: //ledger/sandbox:sandbox-tarball
type: targz
location:
@ -107,44 +107,46 @@
type: jar-deploy
- target: //ledger-api/grpc-definitions:ledger-api-scalapb
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //ledger-api/testing-utils:testing-utils
type: jar-scala
mavenUpload: true
- target: //language-support/scala/bindings:bindings
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //ledger-api/rs-grpc-akka:rs-grpc-akka
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //ledger-api/rs-grpc-testing-utils:rs-grpc-testing-utils
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //ledger/ledger-api-akka:ledger-api-akka
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //scala-protoc-plugins/scala-logging:scala-logging-lib
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //ledger/ledger-api-scala-logging:ledger-api-scala-logging
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //ledger/ledger-api-client:ledger-api-client
type: jar-scala
mavenUpload: true
- target: //ledger/ledger-api-domain:ledger-api-domain
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //ledger/ledger-api-common:ledger-api-common
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //ledger/ledger-api-auth:ledger-api-auth
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //ledger/ledger-api-auth-client:ledger-api-auth-client
type: jar-lib
mavenUpload: True
mavenUpload: true
- target: //ledger/sandbox:sandbox
type: jar-scala
mavenUpload: true
- target: //ledger/ledger-api-test-tool:ledger-api-test-tool
type: jar-deploy
mavenUpload: true
@ -155,7 +157,7 @@
type: jar
- target: //language-support/scala/bindings-akka:bindings-akka
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //language-support/java/codegen:shaded_binary
type: jar-scala
mavenUpload: true
@ -172,13 +174,16 @@
mavenUpload: true
- target: //ledger/participant-state:participant-state
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //ledger/participant-state-index:participant-state-index
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //ledger/sandbox:ledger-api-server
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //ledger/participant-state/protobuf:ledger_configuration_java_proto
type: jar-proto
mavenUpload: true
- target: //ledger/participant-state/kvutils:daml_kvutils_java_proto
type: jar-proto
mavenUpload: true
@ -191,7 +196,7 @@
type: jar-scala
- target: //ledger-service/jwt:jwt
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //ledger-service/db-backend:db-backend
type: jar-scala
- target: //ledger-service/http-json:http-json
@ -205,7 +210,7 @@
mavenUpload: true
- target: //libs-scala/grpc-utils:grpc-utils
type: jar-scala
mavenUpload: True
mavenUpload: true
- target: //libs-scala/timer-utils:timer-utils
type: jar-scala
mavenUpload: true