Basic documentation for simplified ledger integration API (#5032)

* Switched to ByteString from Array[Byte] on almost all simplified API interfaces.

* Sort output by keys.

* Added comment.
CHANGELOG_BEGIN
CHANGELOG_END

* Removed DamlLogEntryId from LedgerEntry.

* Return a SortedMap ordering output state by its keys' hash in order to have deterministic ordering.

* Code tidying.

* Added implicit conversion for anorm for ByteStrings to make SQL queries cleaner.

* Ooops, missed adding a header.

* Avoid copying bytes by anorm by using ByteString.newInput()

* Added some Scaladoc to simplified API interfaces.

* Added docs to LedgerStateAccess.

* Reverted some changes.

* Added some docs to ValidatingCommitter.

* Corrected some typos.

* Added package-level documentation to kvutils.api.

* Clarified convenience classes for LedgerStateOperations.

* Update ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/Version.scala

Co-Authored-By: Gerolf Seitz <gerolf.seitz@digitalasset.com>

* Minor rewording.

* Added missing header.

* Fixed problem with merge.

Co-authored-by: Gerolf Seitz <gerolf.seitz@digitalasset.com>
This commit is contained in:
Miklos 2020-03-17 18:02:27 +01:00 committed by GitHub
parent e71cba698e
commit 9aa61ec3da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 144 additions and 9 deletions

View File

@ -69,8 +69,8 @@ object Version {
* it and to make it possible to remove DamlLogEntryId.
*
* 3: * Add an explicit deduplication time window to each submission. Backwards incompatible because
* it is unclear how to set a sensible default value while the submission time us unknown.
* * Add submissionTime in DamlTransactionEntry and used this time instead ledgerTime to derive
* it is unclear how to set a sensible default value if the submission time is unknown.
* * Add submissionTime in DamlTransactionEntry and use it instead of ledgerTime to derive
* contract ids.
*/
val version: Long = 3

View File

@ -13,6 +13,18 @@ import com.digitalasset.daml.lf.data.Time
import com.digitalasset.daml_lf_dev.DamlLf
import com.digitalasset.ledger.api.health.HealthStatus
/**
* Implements read and write operations required for running a participant server.
*
* Adapts [[LedgerReader]] and [[LedgerWriter]] interfaces to [[com.daml.ledger.participant.state.v1.ReadService]] and
* [[com.daml.ledger.participant.state.v1.WriteService]], respectively.
* Will report [[com.digitalasset.ledger.api.health.Healthy]] as health status only if both
* `reader` and `writer` are healthy.
*
* @param reader [[LedgerReader]] instance to adapt
* @param writer [[LedgerWriter]] instance to adapt
* @param materializer materializer to use when streaming updates from `reader`
*/
class KeyValueParticipantState(reader: LedgerReader, writer: LedgerWriter)(
implicit materializer: Materializer)
extends ReadService

View File

@ -8,16 +8,30 @@ import java.time.Instant
import com.daml.ledger.participant.state.kvutils.Bytes
import com.daml.ledger.participant.state.v1.Offset
/**
* Defines a log entry or update that may be read from the ledger.
*/
sealed trait LedgerEntry
object LedgerEntry {
/**
* A log entry read from the ledger.
* @param offset offset of log entry
* @param entryId opaque ID of log entry
* @param envelope opaque contents of log entry
*/
final class LedgerRecord(
val offset: Offset,
val entryId: Bytes,
val envelope: Bytes
) extends LedgerEntry
/**
* A heart beat read from the ledger.
* @param offset offset of heartbeat log entry
* @param instant timestamp of heartbeat
*/
final case class Heartbeat(offset: Offset, instant: Instant) extends LedgerEntry
object LedgerRecord {

View File

@ -10,18 +10,42 @@ import akka.stream.scaladsl.Source
import com.daml.ledger.participant.state.v1.{Configuration, LedgerId, Offset, TimeModel}
import com.digitalasset.ledger.api.health.ReportsHealth
/**
* Defines how a participant's state is read from the ledger.
*
* For a detailed description of the required semantics of state updates see
* [[com.daml.ledger.participant.state.v1.ReadService]].
*/
trait LedgerReader extends ReportsHealth {
/**
* Streams raw updates from the given offset for the participant.
*
* In case an offset is not specified, all updates must be streamed from the oldest known state.
* Each update is defined either as an opaque log entry ID and an
* envelope ([[com.daml.ledger.participant.state.kvutils.api.LedgerEntry.LedgerRecord]]) or
* as a heartbeat ([[com.daml.ledger.participant.state.kvutils.api.LedgerEntry.Heartbeat]]).
*
* @param offset offset right after which updates must be streamed; in case not specified updates
* must be returned from the beginning
* @return stream of updates
*/
def events(offset: Option[Offset]): Source[LedgerEntry, NotUsed]
/**
* Get the ledger's ID from which this reader instance streams events.
* Should not be a blocking operation.
* @return ID of the ledger from which this reader streams events
*
* @return ID of the ledger from which this reader streams events
*/
def ledgerId(): LedgerId
}
object LedgerReader {
/**
* Default initial ledger configuration used by [[KeyValueParticipantStateReader]].
*/
val DefaultConfiguration = Configuration(
generation = 0,
timeModel = TimeModel.reasonableDefault,

View File

@ -9,8 +9,26 @@ import com.digitalasset.ledger.api.health.ReportsHealth
import scala.concurrent.Future
/**
* Defines how we initiate a commit to the ledger.
*
* For example, the implementation may call the committer node through RPC and transmit the
* submission, or in case of an in-memory implementation the validator may be directly called.
*/
trait LedgerWriter extends ReportsHealth {
/**
* Sends a submission to be committed to the ledger.
*
* @param correlationId correlation ID to be used for logging purposes
* @param envelope opaque submission; may be compressed
* @return future for sending the submission; for possible results see
* [[com.daml.ledger.participant.state.v1.SubmissionResult]]
*/
def commit(correlationId: String, envelope: Bytes): Future[SubmissionResult]
/**
* @return participant ID of the participant on which this LedgerWriter instance runs
*/
def participantId: ParticipantId
}

View File

@ -0,0 +1,30 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils
/**
* This package contains interfaces simplifying implementation of a participant server.
*
* =Interfaces=
* The main interfaces that you need to implement to be able to run a participant server are as follows:
* - [[com.daml.ledger.participant.state.kvutils.api.LedgerWriter]]: Defines how you submit requests to the ledger
* as opaque bytes.
* - [[com.daml.ledger.participant.state.kvutils.api.LedgerReader]]: Defines how you read committed key-value pairs
* from the ledger as opaque bytes.
*
* =Running a Participant Server=
* In order to spin up a participant server there are a few key classes:
* - [[com.daml.ledger.participant.state.kvutils.app.LedgerFactory.KeyValueLedgerFactory]]: Defines how you
* instantiate your `LedgerReader` and `LedgerWriter` implementations.
* - [[com.daml.ledger.participant.state.kvutils.app.Runner]]: Helper class for spinning up a fully functional
* participant server, including the indexer, gRPC interface, etc.
* For an example ledger that implements the above interfaces please see the package [[com.daml.ledger.on.memory]].
*
* For implementing a validator/committer component please see the below references.
*
* @see [[com.daml.ledger.validator.LedgerStateAccess]]
* @see [[com.daml.ledger.validator.SubmissionValidator]]
* @see [[com.daml.ledger.validator.ValidatingCommitter]]
*/
package object api {}

View File

@ -6,7 +6,7 @@ package com.daml.ledger.participant.state
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
import com.google.protobuf.ByteString
/** The participant-state key-value utilities provide methods to succintly implement
/** The participant-state key-value utilities provide methods to succinctly implement
* [[com.daml.ledger.participant.state.v1.ReadService]] and
* [[com.daml.ledger.participant.state.v1.WriteService]] on top of ledger's that provide a key-value state storage.
*
@ -16,8 +16,8 @@ import com.google.protobuf.ByteString
*
* `logEntryIds` describes the ordering of log entries. The `logEntryMap` contains the data for the log entries.
* This map is expected to be append-only and existing entries are never modified or removed.
* `kvState` describes auxilliary mutable state which may be created as part of one log entry and mutated by a later one.
* (e.g. a log entry might describe a DAML transaction containing contracts and the auxilliary mutable data may
* `kvState` describes auxiliary mutable state which may be created as part of one log entry and mutated by a later one.
* (e.g. a log entry might describe a DAML transaction containing contracts and the auxiliary mutable data may
* describe their activeness).
*
* While these can be represented in a key-value store directly, some implementations may

View File

@ -8,6 +8,11 @@ import com.daml.ledger.validator.LedgerStateOperations._
import scala.concurrent.{ExecutionContext, Future}
/**
* Defines how the validator/committer can access the backing store of the ledger to perform read/write operations in
* a transaction.
* @tparam LogResult type of the offset used for a log entry
*/
trait LedgerStateAccess[LogResult] {
/**
@ -19,6 +24,10 @@ trait LedgerStateAccess[LogResult] {
def inTransaction[T](body: LedgerStateOperations[LogResult] => Future[T]): Future[T]
}
/**
* Defines how the validator/committer can access the backing store of the ledger.
* @tparam LogResult type of the offset used for a log entry
*/
trait LedgerStateOperations[LogResult] {
/**
@ -48,12 +57,13 @@ trait LedgerStateOperations[LogResult] {
/**
* Writes a single log entry to the backing store. The implementation may return Future.failed in case the key
* (i.e., the log entry ID) already exists.
* @return offset of the latest log entry
*/
def appendToLog(key: Key, value: Value): Future[LogResult]
}
/**
* Implements non-batching read and write operations on the backing store based on batched implementations.
* Convenience class for implementing read and write operations on a backing store that supports batched operations.
*/
abstract class BatchingLedgerStateOperations[LogResult](implicit executionContext: ExecutionContext)
extends LedgerStateOperations[LogResult] {
@ -65,7 +75,8 @@ abstract class BatchingLedgerStateOperations[LogResult](implicit executionContex
}
/**
* Implements batching read and write operations on the backing store based on non-batched implementations.
* Convenience class for implementing read and write operations on a backing store that '''does not''' support batched
* operations.
*/
abstract class NonBatchingLedgerStateOperations[LogResult](
implicit executionContext: ExecutionContext

View File

@ -14,6 +14,32 @@ import com.digitalasset.logging.LoggingContext.newLoggingContext
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
/**
* Orchestrates committing to a ledger after validating the submission.
* Example usage, assuming a [[com.daml.ledger.participant.state.kvutils.api.LedgerWriter]] sends the submission over
* the wire:
* {{{
* ...
* private val ledgerStateAccess = ...
* private val validator = SubmissionValidator.create(ledgerStateAccess)
* private val validatingCommitter = new ValidatingCommitter(
* myParticipantId,
* () => Instant.now(),
* validator,
* signalDispatcher)
* ...
*
* def commitRequestHandler(request: CommitRequest): Future[CommitResponse] =
* validatingCommitter.commit(request.correlationId, request.envelope)
* .map(...)
* }}}
*
* @param now function implementing resolution of current time when processing submission
* @param validator validator instance to use
* @param postCommit function called after a successful commit, e.g., this can be used to signal readers that a new log
* entry is available
* @tparam LogResult type of the offset used for a log entry
*/
class ValidatingCommitter[LogResult](
now: () => Instant,
validator: SubmissionValidator[LogResult],

View File

@ -42,7 +42,7 @@ trait ReadService extends ReportsHealth {
*
* 1. properties about the sequence of [[(Offset, Update)]] tuples
* in a stream read from the beginning, and
* 2. properties relating the streams obtained from two separate alls
* 2. properties relating the streams obtained from two separate calls
* to [[ReadService.stateUpdates]].
*
* The first class of properties are invariants of a single stream: