Write transactions ready to be served by the Ledger API (#5029)

* Write transactions ready to be served by the Ledger API

CHANGELOG_BEGIN
CHANGELOG_END

* Address https://github.com/digital-asset/daml/pull/5029#discussion_r393192752

* Address https://github.com/digital-asset/daml/pull/5029#discussion_r393191059

* Address https://github.com/digital-asset/daml/pull/5029#discussion_r393201832

* Address https://github.com/digital-asset/daml/pull/5029#discussion_r393492987
This commit is contained in:
Stefano Baghino 2020-03-17 10:36:44 +01:00 committed by GitHub
parent 970fa3dbdb
commit 13f44ff3d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 156 additions and 46 deletions

View File

@ -0,0 +1 @@
493cd93df525ad472d781758c81de3a406f17214c0510837001c022094266365

View File

@ -0,0 +1,74 @@
-- Copyright (c) 2019 The DAML Authors. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
-- contains all events for all transactions
create table participant_events
(
event_id varchar primary key not null,
event_offset bigint not null,
contract_id varchar not null,
transaction_id varchar not null,
ledger_effective_time timestamp not null,
template_package_id varchar not null,
template_name varchar not null,
node_index int not null, -- post-traversal order of an event within a transaction
is_root boolean not null,
-- these fields can be null if the transaction originated in another participant
command_id varchar,
workflow_id varchar, -- null unless provided by a Ledger API call
application_id varchar,
submitter varchar,
-- non-null iff this event is a create
create_argument bytea,
create_signatories varchar array,
create_observers varchar array,
create_agreement_text varchar, -- null if agreement text is not provided
create_consumed_at varchar, -- null if the contract created by this event is active
create_key_value bytea, -- null if the contract created by this event has no key
-- non-null iff this event is an exercise
exercise_consuming boolean,
exercise_choice varchar,
exercise_argument bytea,
exercise_result bytea,
exercise_actors varchar array,
exercise_child_event_ids varchar array -- event identifiers of consequences of this exercise
);
-- support ordering by offset and transaction, ready for serving via the Ledger API
create index on participant_events(event_offset, transaction_id, node_index);
-- support looking up a create event by the identifier of the contract it created, so that
-- consuming exercise events can use it to set the value of create_consumed_at
create index on participant_events(contract_id);
-- support requests of transactions by transaction_id
create index on participant_events(transaction_id);
-- support filtering by template
create index on participant_events(template_name);
-- subset of witnesses to see the visibility in the flat transaction stream
create table participant_event_flat_transaction_witnesses
(
event_id varchar not null,
event_witness varchar not null,
foreign key (event_id) references participant_events(event_id)
);
create index on participant_event_flat_transaction_witnesses(event_id); -- join with events
create index on participant_event_flat_transaction_witnesses(event_witness); -- filter by party
-- complement to participant_event_flat_transaction_witnesses to include
-- the visibility of events in the transaction trees stream
create table participant_event_witnesses_complement
(
event_id varchar not null,
event_witness varchar not null,
foreign key (event_id) references participant_events(event_id)
);
create index on participant_event_witnesses_complement(event_id); -- join with events
create index on participant_event_witnesses_complement(event_witness); -- filter by party

View File

@ -31,7 +31,7 @@ import com.digitalasset.daml.lf.data.Relation.Relation
import com.digitalasset.daml.lf.transaction.Node
import com.digitalasset.daml.lf.transaction.Node.{GlobalKey, KeyWithMaintainers}
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst}
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst, NodeId}
import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.ledger.api.domain.RejectionReason._
import com.digitalasset.ledger.api.domain.{
@ -45,9 +45,11 @@ import com.digitalasset.ledger.api.domain.{
import com.digitalasset.ledger.api.health.HealthStatus
import com.digitalasset.ledger.{ApplicationId, CommandId, EventId, WorkflowId}
import com.digitalasset.logging.{ContextualizedLogger, LoggingContext}
import com.digitalasset.platform.events.EventIdFormatter.split
import com.digitalasset.platform.store.Contract.{ActiveContract, DivulgedContract}
import com.digitalasset.platform.store.Conversions._
import com.digitalasset.platform.store.dao.JdbcLedgerDao.{H2DatabaseQueries, PostgresQueries}
import com.digitalasset.platform.store.dao.events.TransactionWriter
import com.digitalasset.platform.store.entries.LedgerEntry.Transaction
import com.digitalasset.platform.store.entries.{
ConfigurationEntry,
@ -946,12 +948,27 @@ private class JdbcLedgerDao(
val txBytes = serializeTransaction(ledgerEntry.entry)
def splitOrThrow(id: EventId): NodeId =
split(id).fold(sys.error(s"Illegal format for event identifier $id"))(_.nodeId)
def insertEntry(le: PersistenceEntry)(implicit conn: Connection): PersistenceResponse =
le match {
case PersistenceEntry.Transaction(tx, globalDivulgence, divulgedContracts) =>
Try {
storeTransaction(offset, tx, txBytes)
transactions.storeTransaction(
applicationId = tx.applicationId,
workflowId = tx.workflowId,
transactionId = tx.transactionId,
commandId = tx.commandId,
submitter = tx.submittingParty,
roots = tx.transaction.roots.iterator.map(splitOrThrow).toSet,
ledgerEffectiveTime = Date.from(tx.ledgerEffectiveTime),
offset = offset,
transaction = tx.transaction.mapNodeId(splitOrThrow),
)
// Ensure divulged contracts are known about before they are referred to.
storeContractData(divulgedContracts)
@ -1723,6 +1740,9 @@ private class JdbcLedgerDao(
()
}
override val transactions: TransactionWriter[LedgerOffset] =
TransactionWriter(dbDispatcher)
private def executeBatchSql(query: String, params: Iterable[Seq[NamedParameter]])(
implicit con: Connection) = {
require(params.nonEmpty, "batch sql statement must have at least one set of name parameters")

View File

@ -18,6 +18,7 @@ import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails, TransactionFilter}
import com.digitalasset.ledger.api.health.ReportsHealth
import com.digitalasset.platform.store.Contract.ActiveContract
import com.digitalasset.platform.store.dao.events.TransactionWriter
import com.digitalasset.platform.store.entries.{
ConfigurationEntry,
LedgerEntry,
@ -256,6 +257,8 @@ trait LedgerWriteDao extends ReportsHealth {
/** Resets the platform into a state as it was never used before. Meant to be used solely for testing. */
def reset(): Future[Unit]
def transactions: TransactionWriter[LedgerOffset]
}
trait LedgerDao extends LedgerReadDao with LedgerWriteDao {

View File

@ -19,6 +19,7 @@ import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails, TransactionFi
import com.digitalasset.ledger.api.health.HealthStatus
import com.digitalasset.platform.metrics.timedFuture
import com.digitalasset.platform.store.Contract.ActiveContract
import com.digitalasset.platform.store.dao.events.TransactionWriter
import com.digitalasset.platform.store.entries.{
ConfigurationEntry,
LedgerEntry,
@ -223,4 +224,6 @@ class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: MetricRegistry)
timedFuture(
Metrics.storePackageEntry,
ledgerDao.storePackageEntry(offset, newLedgerEnd, externalOffset, packages, entry))
override def transactions: TransactionWriter[LedgerOffset] = ledgerDao.transactions
}

View File

@ -60,8 +60,7 @@ private[events] object EventsTable {
| template_package_id,
| template_name,
| node_index,
| is_root_node,
| is_state_change,
| is_root,
| command_id,
| application_id,
| submitter,
@ -70,7 +69,7 @@ private[events] object EventsTable {
| create_observers,
| create_agreement_text,
| create_consumed_at,
| key_value
| create_key_value
|) values (
| {event_id},
| {event_offset},
@ -81,8 +80,7 @@ private[events] object EventsTable {
| {template_package_id},
| {template_name},
| {node_index},
| {is_root_node},
| true,
| {is_root},
| {command_id},
| {application_id},
| {submitter},
@ -91,7 +89,7 @@ private[events] object EventsTable {
| {create_observers},
| {create_agreement_text},
| null,
| {key_value}
| {create_key_value}
|)
|""".stripMargin
@ -112,18 +110,18 @@ private[events] object EventsTable {
"event_offset" -> offset,
"contract_id" -> create.coid.coid.toString,
"transaction_id" -> transactionId.toString,
"workflow_id" -> workflowId.toString,
"workflow_id" -> workflowId.map(_.toString),
"ledger_effective_time" -> ledgerEffectiveTime,
"template_package_id" -> create.coinst.template.packageId.toString,
"template_name" -> create.coinst.template.qualifiedName.toString,
"node_index" -> nodeId.index,
"is_root_node" -> roots(nodeId),
"is_root" -> roots(nodeId),
"command_id" -> commandId.map(_.toString),
"application_id" -> applicationId.map(_.toString),
"submitter" -> submitter.map(_.toString),
"create_argument" -> serializeCreateArgOrThrow(create),
"create_signatories" -> create.signatories.map(_.toString),
"create_observers" -> create.stakeholders.diff(create.signatories).map(_.toString),
"create_signatories" -> create.signatories.map(_.toString).toArray,
"create_observers" -> create.stakeholders.diff(create.signatories).map(_.toString).toArray,
"create_agreement_text" -> Some(create.coinst.agreementText).filter(_.nonEmpty),
"create_key_value" -> serializeNullableKeyOrThrow(create),
)
@ -139,8 +137,7 @@ private[events] object EventsTable {
| template_package_id,
| template_name,
| node_index,
| is_root_node,
| is_state_change,
| is_root,
| command_id,
| application_id,
| submitter,
@ -160,8 +157,7 @@ private[events] object EventsTable {
| {template_package_id},
| {template_name},
| {node_index},
| {is_root_node},
| {exercise_consuming},
| {is_root},
| {command_id},
| {application_id},
| {submitter},
@ -196,16 +192,18 @@ private[events] object EventsTable {
"template_package_id" -> exercise.templateId.packageId.toString,
"template_name" -> exercise.templateId.qualifiedName.toString,
"node_index" -> nodeId.index,
"is_root_node" -> roots(nodeId),
"is_state_change" -> exercise.consuming,
"is_root" -> roots(nodeId),
"command_id" -> commandId.map(_.toString),
"application_id" -> applicationId.map(_.toString),
"submitter" -> submitter.map(_.toString),
"exercise_consuming" -> exercise.consuming,
"exercise_choice" -> exercise.choiceId.toString,
"exercise_argument" -> serializeExerciseArgOrThrow(exercise),
"exercise_result" -> serializeNullableExerciseResultOrThrow(exercise),
"exercise_actors" -> exercise.actingParties.map(_.toString),
"exercise_child_event_ids" -> (exercise.children.toSeq.map(_.index): Seq[Int]),
"exercise_actors" -> exercise.actingParties.map(_.toString).toArray,
"exercise_child_event_ids" -> exercise.children
.map(fromTransactionId(transactionId, _): String)
.toArray,
)
private val updateArchived =
@ -329,6 +327,8 @@ private[events] object EventsTable {
} else {
batchWithExercises
}
case (batches, _) =>
batches // ignore any event which is not a create or an exercise
}
.prepare

View File

@ -31,11 +31,11 @@ sealed abstract class WitnessesTable(tableName: String) {
transactionId: TransactionId,
witnesses: DisclosureRelation,
): Option[BatchSql] = {
if (witnesses.nonEmpty) {
val ws = DisclosureRelation
.flatten(witnesses)
.map { case (nodeId, party) => parameters(transactionId)(nodeId, party) }
.toSeq
val flattenedWitnesses = DisclosureRelation.flatten(witnesses)
if (flattenedWitnesses.nonEmpty) {
val ws = flattenedWitnesses.map {
case (nodeId, party) => parameters(transactionId)(nodeId, party)
}.toSeq
Some(BatchSql(insert, ws.head, ws.tail: _*))
} else {
None

View File

@ -27,6 +27,7 @@ import com.digitalasset.daml.lf.transaction.Node.{
import com.digitalasset.daml.lf.value.Value.{
AbsoluteContractId,
ContractInst,
NodeId,
ValueRecord,
ValueText,
ValueUnit,
@ -34,7 +35,7 @@ import com.digitalasset.daml.lf.value.Value.{
}
import com.digitalasset.daml.lf.value.ValueVersions
import com.digitalasset.daml_lf_dev.DamlLf
import com.digitalasset.ledger.EventId
import com.digitalasset.ledger.{EventId, TransactionId}
import com.digitalasset.ledger.api.domain.{
Filters,
InclusiveFilters,
@ -45,6 +46,7 @@ import com.digitalasset.ledger.api.domain.{
}
import com.digitalasset.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.platform.events.EventIdFormatter
import com.digitalasset.platform.store.entries.{ConfigurationEntry, LedgerEntry, PartyLedgerEntry}
import com.digitalasset.platform.store.{DbType, FlywayMigrations, PersistenceEntry}
import com.digitalasset.resources.Resource
@ -133,8 +135,8 @@ class JdbcLedgerDaoSpec
"JDBC Ledger DAO" should {
val event1: EventId = "event1"
val event2: EventId = "event2"
def event(txid: TransactionId, idx: Long): EventId =
EventIdFormatter.fromTransactionId(txid, NodeId(idx.toInt))
def persistAndLoadContractsTest(externalOffset: Option[LedgerString]) = {
val offset = nextOffset()
@ -146,6 +148,8 @@ class JdbcLedgerDaoSpec
VersionedValue(ValueVersions.acceptedVersions.head, ValueText(s"key-$offset")),
Set(alice)
)
val event1 = event(txId, 1)
val event2 = event(txId, 2)
val transaction = LedgerEntry.Transaction(
Some("commandId1"),
@ -618,6 +622,9 @@ class JdbcLedgerDaoSpec
val offset = nextOffset()
val absCid = AbsoluteContractId("cId2")
val let = Instant.now
val txid = "trId2"
val event1 = event(txid, 1)
val event2 = event(txid, 2)
val keyWithMaintainers = KeyWithMaintainers(
VersionedValue(ValueVersions.acceptedVersions.head, ValueText("key2")),
@ -626,7 +633,7 @@ class JdbcLedgerDaoSpec
val transaction = LedgerEntry.Transaction(
Some("commandId2"),
"trId2",
txid,
Some("appID2"),
Some("Alice"),
Some("workflowId"),
@ -669,6 +676,8 @@ class JdbcLedgerDaoSpec
val let = Instant.now
val transactionId = s"trId$offset"
val event1 = event(transactionId, 1)
val event2 = event(transactionId, 2)
val transaction = LedgerEntry.Transaction(
Some(s"commandId$offset"),
@ -735,7 +744,7 @@ class JdbcLedgerDaoSpec
let,
GenTransaction(
HashMap(
(s"event$id": EventId) -> NodeCreate(
event(txId, id) -> NodeCreate(
nodeSeed = None,
coid = absCid,
coinst = someContractInstance,
@ -744,9 +753,9 @@ class JdbcLedgerDaoSpec
stakeholders = Set(alice, bob),
key = None
)),
ImmArray[EventId](s"event$id"),
ImmArray(event(txId, id)),
),
Map((s"event$id": EventId) -> Set("Alice", "Bob"))
Map(event(txId, id) -> Set("Alice", "Bob"))
)
}
@ -763,7 +772,7 @@ class JdbcLedgerDaoSpec
let,
GenTransaction(
HashMap(
(s"event$id": EventId) -> NodeExercises(
event(txId, id) -> NodeExercises(
nodeSeed = None,
targetCoid = targetCid,
templateId = someTemplateId,
@ -783,9 +792,9 @@ class JdbcLedgerDaoSpec
ValueText("some exercise result"))),
key = None
)),
ImmArray[EventId](s"event$id"),
ImmArray(event(txId, id)),
),
Map((s"event$id": EventId) -> Set("Alice", "Bob"))
Map(event(txId, id) -> Set("Alice", "Bob"))
)
}
@ -940,7 +949,7 @@ class JdbcLedgerDaoSpec
let,
GenTransaction(
HashMap(
(s"event$id": EventId) -> NodeCreate(
event(s"transactionId$id", id) -> NodeCreate(
nodeSeed = None,
coid = AbsoluteContractId(s"contractId$id"),
coinst = someContractInstance,
@ -952,9 +961,9 @@ class JdbcLedgerDaoSpec
VersionedValue(ValueVersions.acceptedVersions.head, ValueText(key)),
Set(party)))
)),
ImmArray[EventId](s"event$id"),
ImmArray(event(s"transactionId$id", id)),
),
Map((s"event$id": EventId) -> Set(party))
Map(event(s"transactionId$id", id) -> Set(party))
),
Map.empty,
List.empty
@ -973,7 +982,7 @@ class JdbcLedgerDaoSpec
let,
GenTransaction(
HashMap(
(s"event$id": EventId) -> NodeExercises(
event(s"transactionId$id", id) -> NodeExercises(
nodeSeed = None,
targetCoid = AbsoluteContractId(s"contractId$cid"),
templateId = someTemplateId,
@ -993,9 +1002,9 @@ class JdbcLedgerDaoSpec
VersionedValue(ValueVersions.acceptedVersions.head, ValueText(key)),
Set(party)))
)),
ImmArray[EventId](s"event$id"),
ImmArray(event(s"transactionId$id", id)),
),
Map((s"event$id": EventId) -> Set(party))
Map(event(s"transactionId$id", id) -> Set(party))
),
Map.empty,
List.empty
@ -1014,7 +1023,7 @@ class JdbcLedgerDaoSpec
let,
GenTransaction(
HashMap(
(s"event$id": EventId) -> NodeLookupByKey(
event(s"transactionId$id", id) -> NodeLookupByKey(
someTemplateId,
None,
KeyWithMaintainers(
@ -1022,9 +1031,9 @@ class JdbcLedgerDaoSpec
Set(party)),
result.map(id => AbsoluteContractId(s"contractId$id")),
)),
ImmArray[EventId](s"event$id"),
ImmArray(event(s"transactionId$id", id)),
),
Map((s"event$id": EventId) -> Set(party))
Map(event(s"transactionId$id", id) -> Set(party))
),
Map.empty,
List.empty
@ -1043,7 +1052,7 @@ class JdbcLedgerDaoSpec
let,
GenTransaction(
HashMap(
(s"event$id": EventId) -> NodeFetch(
event(s"transactionId$id", id) -> NodeFetch(
coid = AbsoluteContractId(s"contractId$cid"),
templateId = someTemplateId,
optLocation = None,
@ -1051,9 +1060,9 @@ class JdbcLedgerDaoSpec
signatories = Set(party),
stakeholders = Set(party),
)),
ImmArray[EventId](s"event$id"),
ImmArray(event(s"transactionId$id", id)),
),
Map((s"event$id": EventId) -> Set(party))
Map(event(s"transactionId$id", id) -> Set(party))
),
Map.empty,
List.empty