[DPP-1327] Data migration and ingestion for new transactions related tables (#15703)

This commit is contained in:
pbatko-da 2022-12-08 14:56:26 +01:00 committed by GitHub
parent 6b0aad9824
commit 89d571a87b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 985 additions and 334 deletions

View File

@ -0,0 +1 @@
871a62483f3ea549e765e3b199eb0a9c116b2a6d6408918dd101aaf6d5899059

View File

@ -0,0 +1,125 @@
------------------------------------ ETQ Data migration -------------------------------
-- Removes all elements from a that are present in b, essentially computes a - b.
CREATE OR REPLACE FUNCTION etq_array_diff(
arrayClob1 IN CLOB,
arrayClob2 IN CLOB
)
RETURN CLOB
IS
arrayJson1 json_array_t := json_array_t.parse(arrayClob1);
outputJsonArray json_array_t := json_array_t ('[]');
-- Number type has
-- 999...(38 9's) x10^125 maximum value
-- -999...(38 9's) x10^125 minimum value
-- so 200 characters should be enough to hold it together with the whole filter expression
filterExpression varchar2(200);
BEGIN
FOR i IN 0 .. arrayJson1.get_size - 1
LOOP
-- `$[*]` selects each element of the array
-- `(@ == v)` is a filter expression that check whether each matched element is equal to some value `v`
filterExpression := '$[*]?(@ == ' || (arrayJson1.get(i).to_clob()) ||')';
IF NOT json_exists(arrayClob2, filterExpression)
THEN
outputJsonArray.append(arrayJson1.get(i));
END IF;
END LOOP;
RETURN outputJsonArray.to_clob();
END;
/
-- Populate pe_create_id_filter_non_stakeholder_informee
INSERT INTO pe_create_id_filter_non_stakeholder_informee(event_sequential_id, party_id)
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
etq_array_diff(tree_event_witnesses, flat_event_witnesses) AS ps
FROM participant_events_create
)
SELECT i, p
FROM input1, json_table(ps, '$[*]' columns (p NUMBER PATH '$'));
-- Populate pe_consuming_id_filter_stakeholder
INSERT INTO pe_consuming_id_filter_stakeholder(event_sequential_id, template_id, party_id)
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
template_id AS t,
flat_event_witnesses AS ps
FROM participant_events_consuming_exercise
)
SELECT i, t, p
FROM input1, json_table(ps, '$[*]' columns (p NUMBER PATH '$'));
-- Populate pe_consuming_id_filter_non_stakeholder_informee
INSERT INTO pe_consuming_id_filter_non_stakeholder_informee(event_sequential_id, party_id)
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
etq_array_diff(tree_event_witnesses, flat_event_witnesses) AS ps
FROM participant_events_consuming_exercise
)
SELECT i, p
FROM input1, json_table(ps, '$[*]' columns (p NUMBER PATH '$'));
-- Populate pe_non_consuming_exercise_filter_nonstakeholder_informees
INSERT INTO pe_non_consuming_id_filter_informee(event_sequential_id, party_id)
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
etq_array_diff(tree_event_witnesses, flat_event_witnesses) AS ps
FROM participant_events_non_consuming_exercise
)
SELECT i, p
FROM input1, json_table(ps, '$[*]' columns (p NUMBER PATH '$'));
-- Populate participant_transaction_meta
INSERT INTO participant_transaction_meta(transaction_id, event_offset, event_sequential_id_first, event_sequential_id_last)
WITH
input1 AS (
SELECT
transaction_id AS t,
event_offset AS o,
event_sequential_id AS i
FROM participant_events_create
UNION ALL
SELECT
transaction_id AS t,
event_offset AS o,
event_sequential_id AS i
FROM participant_events_consuming_exercise
UNION ALL
SELECT
transaction_id AS t,
event_offset AS o,
event_sequential_id AS i
FROM participant_events_non_consuming_exercise
UNION ALL
SELECT
c.transaction_id AS t,
c.event_offset AS o,
d.event_sequential_id AS i
FROM participant_events_divulgence d
JOIN participant_events_create c ON d.contract_id = c.contract_id
),
input2 AS (
SELECT
t,
o,
min(i) as first_i,
max(i) as last_i
FROM input1
GROUP BY t, o
)
SELECT t, o, first_i, last_i FROM input2;
DROP FUNCTION etq_array_diff;

View File

@ -0,0 +1 @@
045d9250d86a224a140fd591ccb76e32b44d9c063dcc8b47371d93c4a1cd48ad

View File

@ -0,0 +1,105 @@
------------------------------------ ETQ Data migration -------------------------------
-- Removes all elements from a that are present in b, essentially computes a - b.
CREATE OR REPLACE FUNCTION etq_array_diff(a int[], b int[])
RETURNS int[]
AS
$$
SELECT coalesce(array_agg(el), '{}')
FROM unnest(a) as el
WHERE el <> all(b)
$$
LANGUAGE SQL;
-- Populate pe_create_id_filter_non_stakeholder_informee
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
etq_array_diff(tree_event_witnesses, flat_event_witnesses) AS ps
FROM participant_events_create
)
INSERT INTO pe_create_id_filter_non_stakeholder_informee(event_sequential_id, party_id)
SELECT i, unnest(ps) FROM input1;
-- Populate pe_consuming_id_filter_stakeholder
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
template_id AS t,
flat_event_witnesses AS ps
FROM participant_events_consuming_exercise
)
INSERT INTO pe_consuming_id_filter_stakeholder(event_sequential_id, template_id, party_id)
SELECT i, t, unnest(ps) FROM input1;
-- Populate pe_consuming_id_filter_non_stakeholder_informee
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
etq_array_diff(tree_event_witnesses, flat_event_witnesses) AS ps
FROM participant_events_consuming_exercise
)
INSERT INTO pe_consuming_id_filter_non_stakeholder_informee(event_sequential_id, party_id)
SELECT i, unnest(ps) FROM input1;
-- Populate pe_non_consuming_exercise_filter_nonstakeholder_informees
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
tree_event_witnesses AS ps
FROM participant_events_non_consuming_exercise
)
INSERT INTO pe_non_consuming_id_filter_informee(event_sequential_id, party_id)
SELECT i, unnest(ps) FROM input1;
-- Populate participant_transaction_meta
WITH
input1 AS (
SELECT
transaction_id AS t,
event_offset AS o,
event_sequential_id AS i
FROM participant_events_create
UNION ALL
SELECT
transaction_id AS t,
event_offset AS o,
event_sequential_id AS i
FROM participant_events_consuming_exercise
UNION ALL
SELECT
transaction_id AS t,
event_offset AS o,
event_sequential_id AS i
FROM participant_events_non_consuming_exercise
UNION ALL
-- NOTE: Divulgence offsets with no corresponding create events will not
-- have an entry in transaction_meta table
SELECT
c.transaction_id AS t,
c.event_offset AS o,
d.event_sequential_id AS i
FROM participant_events_divulgence d
JOIN participant_events_create c ON d.contract_id = c.contract_id
),
input2 AS (
SELECT
t,
o,
min(i) as first_i,
max(i) as last_i
FROM input1
GROUP BY t, o
)
INSERT INTO participant_transaction_meta(transaction_id, event_offset, event_sequential_id_first, event_sequential_id_last)
SELECT t, o, first_i, last_i FROM input2;
DROP FUNCTION etq_array_diff;

View File

@ -22,6 +22,7 @@ import com.daml.platform.store.dao.DbDispatcher
import com.daml.platform.store.dao.events.{CompressionStrategy, LfValueTranslation}
import com.daml.platform.store.interning.{InternizingStringInterningView, StringInterning}
import java.sql.Connection
import scala.util.chaining._
import com.daml.metrics.api.MetricsContext
@ -186,6 +187,7 @@ object ParallelIndexerSubscription {
Timed.value(
metrics.daml.parallelIndexer.seqMapping.duration, {
var eventSeqId = previous.lastSeqEventId
var lastTransactionMetaEventSeqId = eventSeqId
val batchWithSeqIds = current.batch.map {
case dbDto: DbDto.EventCreate =>
eventSeqId += 1
@ -199,10 +201,24 @@ object ParallelIndexerSubscription {
eventSeqId += 1
dbDto.copy(event_sequential_id = eventSeqId)
case dbDto: DbDto.CreateFilter =>
// we do not increase the event_seq_id here, because all the CreateFilter DbDto-s must have the same eventSeqId as the preceding EventCreate
case dbDto: DbDto.IdFilterCreateStakeholder =>
// we do not increase the event_seq_id here, because all the IdFilterCreateStakeholder DbDto-s must have the same eventSeqId as the preceding EventCreate
dbDto.copy(event_sequential_id = eventSeqId)
case dbDto: DbDto.IdFilterCreateNonStakeholderInformee =>
dbDto.copy(event_sequential_id = eventSeqId)
case dbDto: DbDto.IdFilterConsumingStakeholder =>
dbDto.copy(event_sequential_id = eventSeqId)
case dbDto: DbDto.IdFilterConsumingNonStakeholderInformee =>
dbDto.copy(event_sequential_id = eventSeqId)
case dbDto: DbDto.IdFilterNonConsumingInformee =>
dbDto.copy(event_sequential_id = eventSeqId)
case dbDto: DbDto.TransactionMeta =>
dbDto
.copy(
event_sequential_id_first = lastTransactionMetaEventSeqId + 1,
event_sequential_id_last = eventSeqId,
)
.tap(_ => lastTransactionMetaEventSeqId = eventSeqId)
case unChanged => unChanged
}

View File

@ -144,12 +144,40 @@ object DbDto {
StringInterningDto(entry._1, entry._2)
}
final case class CreateFilter(
final case class IdFilterCreateStakeholder(
event_sequential_id: Long,
template_id: String,
party_id: String,
) extends DbDto
final case class IdFilterCreateNonStakeholderInformee(
event_sequential_id: Long,
party_id: String,
) extends DbDto
final case class IdFilterConsumingStakeholder(
event_sequential_id: Long,
template_id: String,
party_id: String,
) extends DbDto
final case class IdFilterConsumingNonStakeholderInformee(
event_sequential_id: Long,
party_id: String,
) extends DbDto
final case class IdFilterNonConsumingInformee(
event_sequential_id: Long,
party_id: String,
) extends DbDto
final case class TransactionMeta(
transaction_id: String,
event_offset: String,
event_sequential_id_first: Long,
event_sequential_id_last: Long,
) extends DbDto
final case class TransactionMetering(
application_id: String,
action_count: Int,

View File

@ -139,6 +139,12 @@ object UpdateToDbDto {
)
.reverse
val transactionMeta = DbDto.TransactionMeta(
transaction_id = u.transactionId,
event_offset = offset.toHexString,
event_sequential_id_first = 0, // this is filled later
event_sequential_id_last = 0, // this is filled later
)
val events: Iterator[DbDto] = preorderTraversal.iterator
.flatMap {
case (nodeId, create: Create) =>
@ -146,6 +152,8 @@ object UpdateToDbDto {
val templateId = create.templateId.toString
val stakeholders = create.stakeholders.map(_.toString)
val (createArgument, createKeyValue) = translation.serialize(eventId, create)
val informees = blinding.disclosure.getOrElse(nodeId, Set.empty).map(_.toString)
val nonStakeholderInformees = informees.diff(stakeholders)
Iterator(
DbDto.EventCreate(
event_offset = Some(offset.toHexString),
@ -160,8 +168,7 @@ object UpdateToDbDto {
contract_id = create.coid.coid,
template_id = Some(templateId),
flat_event_witnesses = stakeholders,
tree_event_witnesses =
blinding.disclosure.getOrElse(nodeId, Set.empty).map(_.toString),
tree_event_witnesses = informees,
create_argument = Some(createArgument)
.map(compressionStrategy.createArgumentCompression.compress),
create_signatories = Some(create.signatories.map(_.toString)),
@ -184,17 +191,27 @@ object UpdateToDbDto {
u.contractMetadata.get(create.coid).map(_.toByteArray),
)
) ++ stakeholders.iterator.map(
DbDto.CreateFilter(
DbDto.IdFilterCreateStakeholder(
event_sequential_id = 0, // this is filled later
template_id = templateId,
_,
)
) ++ nonStakeholderInformees.iterator.map(
DbDto.IdFilterCreateNonStakeholderInformee(
event_sequential_id = 0, // this is filled later
_,
)
)
case (nodeId, exercise: Exercise) =>
val eventId = EventId(u.transactionId, nodeId)
val (exerciseArgument, exerciseResult, createKeyValue) =
translation.serialize(eventId, exercise)
val stakeholders = exercise.stakeholders.map(_.toString)
val informees = blinding.disclosure.getOrElse(nodeId, Set.empty).map(_.toString)
val flatWitnesses = if (exercise.consuming) stakeholders else Set.empty[String]
val nonStakeholderInformees = informees.diff(stakeholders)
val templateId = exercise.templateId.toString
Iterator(
DbDto.EventExercise(
consuming = exercise.consuming,
@ -208,11 +225,9 @@ object UpdateToDbDto {
node_index = Some(nodeId.index),
event_id = Some(EventId(u.transactionId, nodeId).toLedgerString),
contract_id = exercise.targetCoid.coid,
template_id = Some(exercise.templateId.toString),
flat_event_witnesses =
if (exercise.consuming) exercise.stakeholders.map(_.toString) else Set.empty,
tree_event_witnesses =
blinding.disclosure.getOrElse(nodeId, Set.empty).map(_.toString),
template_id = Some(templateId),
flat_event_witnesses = flatWitnesses,
tree_event_witnesses = informees,
create_key_value = createKeyValue
.map(compressionStrategy.createKeyValueCompression.compress),
exercise_choice = Some(exercise.qualifiedChoideName.toString),
@ -232,8 +247,29 @@ object UpdateToDbDto {
exercise_result_compression = compressionStrategy.exerciseResultCompression.id,
event_sequential_id = 0, // this is filled later
)
)
) ++ {
if (exercise.consuming) {
stakeholders.iterator.map(stakeholder =>
DbDto.IdFilterConsumingStakeholder(
event_sequential_id = 0, // this is filled later
template_id = templateId,
party_id = stakeholder,
)
) ++ nonStakeholderInformees.iterator.map(stakeholder =>
DbDto.IdFilterConsumingNonStakeholderInformee(
event_sequential_id = 0, // this is filled later
party_id = stakeholder,
)
)
} else {
informees.iterator.map(informee =>
DbDto.IdFilterNonConsumingInformee(
event_sequential_id = 0, // this is filled later
party_id = informee,
)
)
}
}
case _ =>
Iterator.empty // It is okay to collect: blinding info is already there, we are free at hand to filter out the fetch and lookup nodes here already
}
@ -268,7 +304,11 @@ object UpdateToDbDto {
commandCompletion(offset, u.recordTime, Some(u.transactionId), _)
)
events ++ divulgences ++ completions
// TransactionMeta DTO must come last in this sequence
// because in a later stage the preceding events
// will be assigned consecutive event sequential ids
// and transaction meta is assigned sequential ids of its first and last event
events ++ divulgences ++ completions ++ Seq(transactionMeta)
}
}

View File

@ -37,6 +37,12 @@ private[backend] class IngestionStorageBackendTemplate(
SQL"DELETE FROM party_entries WHERE ${queryStrategy.offsetIsGreater("ledger_offset", ledgerOffset)}",
SQL"DELETE FROM string_interning WHERE internal_id > $lastStringInterningId",
SQL"DELETE FROM pe_create_id_filter_stakeholder WHERE event_sequential_id > $lastEventSequentialId",
SQL"DELETE FROM pe_create_id_filter_non_stakeholder_informee WHERE event_sequential_id > $lastEventSequentialId",
SQL"DELETE FROM pe_consuming_id_filter_stakeholder WHERE event_sequential_id > $lastEventSequentialId",
SQL"DELETE FROM pe_consuming_id_filter_non_stakeholder_informee WHERE event_sequential_id > $lastEventSequentialId",
SQL"DELETE FROM pe_non_consuming_id_filter_informee WHERE event_sequential_id > $lastEventSequentialId",
SQL"DELETE FROM participant_transaction_meta WHERE ${queryStrategy
.offsetIsGreater("event_offset", ledgerOffset)}",
SQL"DELETE FROM transaction_metering WHERE ${queryStrategy.offsetIsGreater("ledger_offset", ledgerOffset)}",
).map(_.execute()(connection))

View File

@ -281,7 +281,7 @@ private[backend] object AppendOnlySchema {
"external_string" -> fieldStrategy.string(_ => _.externalString),
)
val createFilter: Table[DbDto.CreateFilter] =
val idFilterCreateStakeholderTable: Table[DbDto.IdFilterCreateStakeholder] =
fieldStrategy.insert("pe_create_id_filter_stakeholder")(
"event_sequential_id" -> fieldStrategy.bigint(_ => _.event_sequential_id),
"template_id" -> fieldStrategy.int(stringInterning =>
@ -292,6 +292,51 @@ private[backend] object AppendOnlySchema {
),
)
val idFilterCreateNonStakeholderInformeeTable
: Table[DbDto.IdFilterCreateNonStakeholderInformee] =
fieldStrategy.insert("pe_create_id_filter_non_stakeholder_informee")(
"event_sequential_id" -> fieldStrategy.bigint(_ => _.event_sequential_id),
"party_id" -> fieldStrategy.int(stringInterning =>
dto => stringInterning.party.unsafe.internalize(dto.party_id)
),
)
val idFilterConsumingStakeholderTable: Table[DbDto.IdFilterConsumingStakeholder] =
fieldStrategy.insert("pe_consuming_id_filter_stakeholder")(
"event_sequential_id" -> fieldStrategy.bigint(_ => _.event_sequential_id),
"template_id" -> fieldStrategy.int(stringInterning =>
dto => stringInterning.templateId.unsafe.internalize(dto.template_id)
),
"party_id" -> fieldStrategy.int(stringInterning =>
dto => stringInterning.party.unsafe.internalize(dto.party_id)
),
)
val idFilterConsumingNonStakeholderInformeeTable
: Table[DbDto.IdFilterConsumingNonStakeholderInformee] =
fieldStrategy.insert("pe_consuming_id_filter_non_stakeholder_informee")(
"event_sequential_id" -> fieldStrategy.bigint(_ => _.event_sequential_id),
"party_id" -> fieldStrategy.int(stringInterning =>
dto => stringInterning.party.unsafe.internalize(dto.party_id)
),
)
val idFilterNonConsumingInformeeTable: Table[DbDto.IdFilterNonConsumingInformee] =
fieldStrategy.insert("pe_non_consuming_id_filter_informee")(
"event_sequential_id" -> fieldStrategy.bigint(_ => _.event_sequential_id),
"party_id" -> fieldStrategy.int(stringInterning =>
dto => stringInterning.party.unsafe.internalize(dto.party_id)
),
)
val transactionMeta: Table[DbDto.TransactionMeta] =
fieldStrategy.insert("participant_transaction_meta")(
"transaction_id" -> fieldStrategy.string(_ => _.transaction_id),
"event_offset" -> fieldStrategy.string(_ => _.event_offset),
"event_sequential_id_first" -> fieldStrategy.bigint(_ => _.event_sequential_id_first),
"event_sequential_id_last" -> fieldStrategy.bigint(_ => _.event_sequential_id_last),
)
val transactionMetering: Table[DbDto.TransactionMetering] =
fieldStrategy.insert("transaction_metering")(
fields = "application_id" -> fieldStrategy.string(_ => _.application_id),
@ -311,7 +356,12 @@ private[backend] object AppendOnlySchema {
partyEntries.executeUpdate,
commandCompletions.executeUpdate,
stringInterningTable.executeUpdate,
createFilter.executeUpdate,
idFilterCreateStakeholderTable.executeUpdate,
idFilterCreateNonStakeholderInformeeTable.executeUpdate,
idFilterConsumingStakeholderTable.executeUpdate,
idFilterConsumingNonStakeholderInformeeTable.executeUpdate,
idFilterNonConsumingInformeeTable.executeUpdate,
transactionMeta.executeUpdate,
transactionMetering.executeUpdate,
)
@ -337,7 +387,17 @@ private[backend] object AppendOnlySchema {
partyEntries.prepareData(collect[PartyEntry], stringInterning),
commandCompletions.prepareData(collect[CommandCompletion], stringInterning),
stringInterningTable.prepareData(collect[StringInterningDto], stringInterning),
createFilter.prepareData(collect[CreateFilter], stringInterning),
idFilterCreateStakeholderTable
.prepareData(collect[IdFilterCreateStakeholder], stringInterning),
idFilterCreateNonStakeholderInformeeTable
.prepareData(collect[IdFilterCreateNonStakeholderInformee], stringInterning),
idFilterConsumingStakeholderTable
.prepareData(collect[IdFilterConsumingStakeholder], stringInterning),
idFilterConsumingNonStakeholderInformeeTable
.prepareData(collect[IdFilterConsumingNonStakeholderInformee], stringInterning),
idFilterNonConsumingInformeeTable
.prepareData(collect[IdFilterNonConsumingInformee], stringInterning),
transactionMeta.prepareData(collect[TransactionMeta], stringInterning),
transactionMetering.prepareData(collect[TransactionMetering], stringInterning),
)
}

View File

@ -27,6 +27,11 @@ object H2ResetStorageBackend extends ResetStorageBackend {
truncate table participant_party_record_annotations;
truncate table string_interning;
truncate table pe_create_id_filter_stakeholder;
truncate table pe_create_id_filter_non_stakeholder_informee;
truncate table pe_consuming_id_filter_stakeholder;
truncate table pe_consuming_id_filter_non_stakeholder_informee;
truncate table pe_non_consuming_id_filter_informee;
truncate table participant_transaction_meta;
truncate table participant_users;
truncate table participant_user_rights;
truncate table participant_user_annotations;

View File

@ -26,6 +26,11 @@ object OracleResetStorageBackend extends ResetStorageBackend {
"participant_party_record_annotations",
"string_interning",
"pe_create_id_filter_stakeholder",
"pe_create_id_filter_non_stakeholder_informee",
"pe_consuming_id_filter_stakeholder",
"pe_consuming_id_filter_non_stakeholder_informee",
"pe_non_consuming_id_filter_informee",
"participant_transaction_meta",
"participant_users",
"participant_user_rights",
"participant_user_annotations",

View File

@ -26,6 +26,11 @@ object PostgresResetStorageBackend extends ResetStorageBackend {
truncate table participant_party_record_annotations cascade;
truncate table string_interning cascade;
truncate table pe_create_id_filter_stakeholder cascade;
truncate table pe_create_id_filter_non_stakeholder_informee cascade;
truncate table pe_consuming_id_filter_stakeholder cascade;
truncate table pe_consuming_id_filter_non_stakeholder_informee cascade;
truncate table pe_non_consuming_id_filter_informee cascade;
truncate table participant_transaction_meta cascade;
truncate table participant_users cascade;
truncate table participant_users cascade;
truncate table participant_user_annotations cascade;

View File

@ -79,11 +79,13 @@ private[dao] case class SequentialWriteDaoImpl[DB_BATCH](
private var lastEventSeqId: Long = _
private var lastStringInterningId: Int = _
private var lastEventSeqIdInitialized = false
private var previousTransactionMetaToEventSeqId: Long = _
private def lazyInit(connection: Connection): Unit =
if (!lastEventSeqIdInitialized) {
val ledgerEnd = parameterStorageBackend.ledgerEnd(connection)
lastEventSeqId = ledgerEnd.lastEventSeqId
previousTransactionMetaToEventSeqId = ledgerEnd.lastEventSeqId
lastStringInterningId = ledgerEnd.lastStringInterningId
lastEventSeqIdInitialized = true
}
@ -98,7 +100,23 @@ private[dao] case class SequentialWriteDaoImpl[DB_BATCH](
case e: DbDto.EventCreate => e.copy(event_sequential_id = nextEventSeqId)
case e: DbDto.EventDivulgence => e.copy(event_sequential_id = nextEventSeqId)
case e: DbDto.EventExercise => e.copy(event_sequential_id = nextEventSeqId)
case e: DbDto.CreateFilter => e.copy(event_sequential_id = lastEventSeqId)
case e: DbDto.IdFilterCreateStakeholder =>
e.copy(event_sequential_id = lastEventSeqId)
case e: DbDto.IdFilterCreateNonStakeholderInformee =>
e.copy(event_sequential_id = lastEventSeqId)
case e: DbDto.IdFilterConsumingStakeholder =>
e.copy(event_sequential_id = lastEventSeqId)
case e: DbDto.IdFilterConsumingNonStakeholderInformee =>
e.copy(event_sequential_id = lastEventSeqId)
case e: DbDto.IdFilterNonConsumingInformee =>
e.copy(event_sequential_id = lastEventSeqId)
case e: DbDto.TransactionMeta =>
val dto = e.copy(
event_sequential_id_first = (previousTransactionMetaToEventSeqId + 1),
event_sequential_id_last = lastEventSeqId,
)
previousTransactionMetaToEventSeqId = lastEventSeqId
dto
case notEvent => notEvent
}.toVector

View File

@ -252,6 +252,17 @@ private[backend] object StorageBackendTestValues {
deduplication_start = deduplicationStart.map(_.micros),
)
def dtoTransactionMeta(
offset: Offset,
event_sequential_id_first: Long,
event_sequential_id_last: Long,
): DbDto.TransactionMeta = DbDto.TransactionMeta(
transactionIdFromOffset(offset),
event_offset = offset.toHexString,
event_sequential_id_first = event_sequential_id_first,
event_sequential_id_last = event_sequential_id_last,
)
def dtoTransactionMetering(
metering: TransactionMetering
): DbDto.TransactionMetering = {
@ -268,7 +279,8 @@ private[backend] object StorageBackendTestValues {
event_sequential_id: Long,
template_id: Ref.Identifier,
party_id: String,
): DbDto.CreateFilter = DbDto.CreateFilter(event_sequential_id, template_id.toString, party_id)
): DbDto.IdFilterCreateStakeholder =
DbDto.IdFilterCreateStakeholder(event_sequential_id, template_id.toString, party_id)
def dtoInterning(
internal: Int,

View File

@ -25,7 +25,7 @@ private[backend] trait StorageBackendTestsContracts
val dtos: Vector[DbDto] = Vector(
// 1: transaction with create node
dtoCreate(offset(1), 1L, contractId = contractId, signatory = signatory),
DbDto.CreateFilter(1L, someTemplateId.toString, signatory),
DbDto.IdFilterCreateStakeholder(1L, someTemplateId.toString, signatory),
dtoCompletion(offset(1)),
)
@ -55,7 +55,7 @@ private[backend] trait StorageBackendTestsContracts
val dtos: Vector[DbDto] = Vector(
// 1: transaction with create node
dtoCreate(offset(1), 1L, contractId = contractId, signatory = signatory),
DbDto.CreateFilter(1L, someTemplateId.toString, signatory),
DbDto.IdFilterCreateStakeholder(1L, someTemplateId.toString, signatory),
dtoCompletion(offset(1)),
// 2: transaction that archives the contract
dtoExercise(offset(2), 2L, true, contractId),
@ -85,7 +85,7 @@ private[backend] trait StorageBackendTestsContracts
val dtos: Vector[DbDto] = Vector(
// 1: divulgence
dtoDivulgence(Some(offset(1)), 1L, contractId = contractId, divulgee = divulgee),
DbDto.CreateFilter(1L, someTemplateId.toString, divulgee),
DbDto.IdFilterCreateStakeholder(1L, someTemplateId.toString, divulgee),
dtoCompletion(offset(1)),
)
@ -116,12 +116,12 @@ private[backend] trait StorageBackendTestsContracts
val dtos: Vector[DbDto] = Vector(
// 1: transaction with create node
dtoCreate(offset(1), 1L, contractId = contractId, signatory = signatory),
DbDto.CreateFilter(1L, someTemplateId.toString, signatory),
DbDto.IdFilterCreateStakeholder(1L, someTemplateId.toString, signatory),
dtoCompletion(offset(1)),
// 2: divulgence without any optional information
dtoDivulgence(Some(offset(2)), 2L, contractId = contractId, divulgee = divulgee)
.copy(template_id = None, create_argument = None, create_argument_compression = None),
DbDto.CreateFilter(2L, someTemplateId.toString, divulgee),
DbDto.IdFilterCreateStakeholder(2L, someTemplateId.toString, divulgee),
dtoCompletion(offset(2)),
)
@ -152,11 +152,11 @@ private[backend] trait StorageBackendTestsContracts
val dtos: Vector[DbDto] = Vector(
// 1: transaction with create node
dtoCreate(offset(1), 1L, contractId = contractId, signatory = signatory),
DbDto.CreateFilter(1L, someTemplateId.toString, signatory),
DbDto.IdFilterCreateStakeholder(1L, someTemplateId.toString, signatory),
dtoCompletion(offset(1)),
// 2: divulgence
dtoDivulgence(Some(offset(2)), 2L, contractId = contractId, divulgee = divulgee),
DbDto.CreateFilter(2L, someTemplateId.toString, divulgee),
DbDto.IdFilterCreateStakeholder(2L, someTemplateId.toString, divulgee),
dtoCompletion(offset(2)),
// 3: transaction that archives the contract
dtoExercise(offset(3), 3L, true, contractId, signatory = signatory),

View File

@ -7,6 +7,7 @@ import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import org.scalatest.compatible.Assertion
import org.scalatest.Inside
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
@ -26,10 +27,11 @@ private[backend] trait StorageBackendTestsInitializeIngestion
TransactionMetering(Ref.ApplicationId.assertFromString(app), 1, someTime, offset)
)
it should "delete overspill entries" in {
val signatory = Ref.Party.assertFromString("signatory")
private val signatory = Ref.Party.assertFromString("signatory")
private val readers = Set(signatory)
val dtos1: Vector[DbDto] = Vector(
{
val dtos = Vector(
// 1: config change
dtoConfiguration(offset(1), someConfiguration),
// 2: party allocation
@ -37,223 +39,285 @@ private[backend] trait StorageBackendTestsInitializeIngestion
// 3: package upload
dtoPackage(offset(3)),
dtoPackageEntry(offset(3)),
// 4: transaction with create node
dtoCreate(offset(4), 1L, hashCid("#4"), signatory = signatory),
DbDto.CreateFilter(1L, someTemplateId.toString, someParty.toString),
dtoCompletion(offset(4)),
// 5: transaction with exercise node and retroactive divulgence
dtoExercise(offset(5), 2L, false, hashCid("#4")),
dtoDivulgence(Some(offset(5)), 3L, hashCid("#4")),
dtoCompletion(offset(5)),
// Transaction Metering
)
it should "delete overspill entries - config, parties, packages" in {
fixture(
dtos1 = dtos,
lastOffset1 = 3L,
lastEventSeqId1 = 0L,
dtos2 = Vector(
// 4: config change
dtoConfiguration(offset(4), someConfiguration),
// 5: party allocation
dtoPartyEntry(offset(5), "party2"),
// 6: package upload
dtoPackage(offset(6)),
dtoPackageEntry(offset(6)),
),
lastOffset2 = 6L,
lastEventSeqId2 = 0L,
checkContentsBefore = () => {
val parties = executeSql(backend.party.knownParties)
val config = executeSql(backend.configuration.ledgerConfiguration)
val packages = executeSql(backend.packageBackend.lfPackages)
parties should have length 1
packages should have size 1
config shouldBe Some(offset(1) -> someConfiguration)
},
checkContentsAfter = () => {
val parties = executeSql(backend.party.knownParties)
val config = executeSql(backend.configuration.ledgerConfiguration)
val packages = executeSql(backend.packageBackend.lfPackages)
parties should have length 1
packages should have size 1
config shouldBe Some(offset(1) -> someConfiguration)
},
)
}
it should "delete overspill entries written before first ledger end update - config, parties, packages" in {
fixtureOverspillBeforeFirstLedgerEndUpdate(
dtos = dtos,
lastOffset = 3,
lastEventSeqId = 0L,
checkContentsAfter = () => {
val parties2 = executeSql(backend.party.knownParties)
val config2 = executeSql(backend.configuration.ledgerConfiguration)
val packages2 = executeSql(backend.packageBackend.lfPackages)
parties2 shouldBe empty
packages2 shouldBe empty
config2 shouldBe empty
},
)
}
}
{
val dtos = Vector(
dtoMetering("AppA", offset(1)),
dtoMetering("AppB", offset(4)),
)
it should "delete overspill entries - metering" in {
fixture(
dtos1 = dtos,
lastOffset1 = 4L,
lastEventSeqId1 = 0L,
dtos2 = Vector(
dtoMetering("AppC", offset(6))
),
lastOffset2 = 6L,
lastEventSeqId2 = 0L,
checkContentsBefore = () => {
val metering =
executeSql(backend.metering.read.reportData(Timestamp.Epoch, None, None))
// Metering report can include partially ingested data in non-final reports
metering.applicationData should have size 3
metering.isFinal shouldBe false
},
checkContentsAfter = () => {
val metering =
executeSql(backend.metering.read.reportData(Timestamp.Epoch, None, None))
metering.applicationData should have size 2 // Partially ingested data removed
},
)
}
val dtos2: Vector[DbDto] = Vector(
// 6: config change
dtoConfiguration(offset(6), someConfiguration),
// 7: party allocation
dtoPartyEntry(offset(7), "party2"),
// 8: package upload
dtoPackage(offset(8)),
dtoPackageEntry(offset(8)),
// 9: transaction with create node
dtoCreate(offset(9), 4L, hashCid("#9"), signatory = signatory),
DbDto.CreateFilter(4L, someTemplateId.toString, someParty.toString),
dtoCompletion(offset(9)),
// 10: transaction with exercise node and retroactive divulgence
dtoExercise(offset(10), 5L, false, hashCid("#9")),
dtoDivulgence(Some(offset(10)), 6L, hashCid("#9")),
dtoCompletion(offset(10)),
// Transaction Metering
dtoMetering("AppC", offset(6)),
it should "delete overspill entries written before first ledger end update - metering" in {
fixtureOverspillBeforeFirstLedgerEndUpdate(
dtos = dtos,
lastOffset = 4,
lastEventSeqId = 0L,
checkContentsAfter = () => {
val metering2 =
executeSql(backend.metering.read.reportData(Timestamp.Epoch, None, None))
metering2.applicationData shouldBe empty
},
)
}
}
{
val dtos = Vector(
// 1: transaction with a create node
dtoCreate(offset(1), 1L, hashCid("#101"), signatory = signatory),
DbDto.IdFilterCreateStakeholder(1L, someTemplateId.toString, someParty),
DbDto.IdFilterCreateNonStakeholderInformee(1L, someParty),
dtoTransactionMeta(
offset(1),
event_sequential_id_first = 1L,
event_sequential_id_last = 1L,
),
dtoCompletion(offset(41)),
// 2: transaction with exercise node and retroactive divulgence
dtoExercise(offset(2), 2L, false, hashCid("#101")),
DbDto.IdFilterNonConsumingInformee(2L, someParty),
dtoExercise(offset(2), 3L, true, hashCid("#102")),
DbDto.IdFilterConsumingStakeholder(3L, someTemplateId.toString, someParty),
DbDto.IdFilterConsumingNonStakeholderInformee(3L, someParty),
dtoDivulgence(Some(offset(2)), 4L, hashCid("#101")),
dtoTransactionMeta(
offset(2),
event_sequential_id_first = 2L,
event_sequential_id_last = 4L,
),
dtoCompletion(offset(2)),
)
val readers = Set(signatory)
it should "delete overspill entries - events, transaction meta, completions" in {
fixture(
dtos1 = dtos,
lastOffset1 = 2L,
lastEventSeqId1 = 4L,
dtos2 = Vector(
// 3: transaction with create node
dtoCreate(offset(3), 5L, hashCid("#201"), signatory = signatory),
DbDto.IdFilterCreateStakeholder(5L, someTemplateId.toString, someParty),
DbDto.IdFilterCreateNonStakeholderInformee(5L, someParty),
dtoTransactionMeta(
offset(3),
event_sequential_id_first = 5L,
event_sequential_id_last = 5L,
),
dtoCompletion(offset(3)),
// 4: transaction with exercise node and retroactive divulgence
dtoExercise(offset(4), 6L, false, hashCid("#201")),
DbDto.IdFilterNonConsumingInformee(6L, someParty),
dtoExercise(offset(4), 7L, true, hashCid("#202")),
DbDto.IdFilterConsumingStakeholder(7L, someTemplateId.toString, someParty),
DbDto.IdFilterConsumingNonStakeholderInformee(7L, someParty),
dtoDivulgence(Some(offset(4)), 8L, hashCid("#201")),
dtoTransactionMeta(
offset(4),
event_sequential_id_first = 6L,
event_sequential_id_last = 8L,
),
dtoCompletion(offset(4)),
),
lastOffset2 = 10L,
lastEventSeqId2 = 6L,
checkContentsBefore = () => {
val contract101 =
executeSql(backend.contract.activeContractWithoutArgument(readers, hashCid("#101")))
val contract202 =
executeSql(backend.contract.activeContractWithoutArgument(readers, hashCid("#201")))
// TODO etq: Add assertion for the remaining filter tables & transaction_meta table
val idsCreateStakeholder = executeSql(
backend.event.activeContractEventIds(
partyFilter = someParty,
templateIdFilter = None,
startExclusive = 0,
endInclusive = 1000,
limit = 1000,
)
)
contract101 should not be empty
contract202 shouldBe None
idsCreateStakeholder shouldBe List(
1L,
5L,
) // since ledger-end does not limit the range query
},
checkContentsAfter = () => {
val contract101 = executeSql(
backend.contract.activeContractWithoutArgument(readers, hashCid("#101"))
)
val contract202 = executeSql(
backend.contract.activeContractWithoutArgument(readers, hashCid("#201"))
)
// TODO etq: Add assertion for the remaining filter tables & transaction_meta table
val idsCreateStakeholder = executeSql(
backend.event.activeContractEventIds(
partyFilter = someParty,
templateIdFilter = None,
startExclusive = 0,
endInclusive = 1000,
limit = 1000,
)
)
contract101 should not be empty
contract202 shouldBe None
idsCreateStakeholder shouldBe List(1L)
},
)
}
it should "delete overspill entries written before first ledger end update - events, transaction meta, completions" in {
fixtureOverspillBeforeFirstLedgerEndUpdate(
dtos = dtos,
lastOffset = 2,
lastEventSeqId = 3L,
checkContentsAfter = () => {
val contract101 = executeSql(
backend.contract.activeContractWithoutArgument(readers, hashCid("#101"))
)
// TODO etq: Add assertion for the remaining filter tables & transaction_meta table
val idsCreateStakeholder = executeSql(
backend.event.activeContractEventIds(
partyFilter = someParty,
templateIdFilter = None,
startExclusive = 0,
endInclusive = 1000,
limit = 1000,
)
)
contract101 shouldBe None
idsCreateStakeholder shouldBe empty
},
)
}
}
private def fixture(
dtos1: Vector[DbDto],
lastOffset1: Long,
lastEventSeqId1: Long,
dtos2: Vector[DbDto],
lastOffset2: Long,
lastEventSeqId2: Long,
checkContentsBefore: () => Assertion,
checkContentsAfter: () => Assertion,
): Assertion = {
// Initialize
executeSql(backend.parameter.initializeParameters(someIdentityParams))
executeSql(backend.meteringParameter.initializeLedgerMeteringEnd(someLedgerMeteringEnd))
// Start the indexer (a no-op in this case)
val end1 = executeSql(backend.parameter.ledgerEnd)
executeSql(backend.ingestion.deletePartiallyIngestedData(end1))
// Fully insert first batch of updates
executeSql(ingest(dtos1, _))
executeSql(updateLedgerEnd(ledgerEnd(5, 3L)))
executeSql(updateLedgerEnd(ledgerEnd(lastOffset1, lastEventSeqId1)))
// Partially insert second batch of updates (indexer crashes before updating ledger end)
executeSql(ingest(dtos2, _))
// Check the contents
val parties1 = executeSql(backend.party.knownParties)
val config1 = executeSql(backend.configuration.ledgerConfiguration)
val packages1 = executeSql(backend.packageBackend.lfPackages)
val contract41 = executeSql(
backend.contract.activeContractWithoutArgument(
readers,
hashCid("#4"),
)
)
val contract91 = executeSql(
backend.contract.activeContractWithoutArgument(
readers,
hashCid("#9"),
)
)
val filterIds1 = executeSql(
backend.event.activeContractEventIds(
partyFilter = someParty,
templateIdFilter = None,
startExclusive = 0,
endInclusive = 1000,
limit = 1000,
)
)
val metering1 =
executeSql(backend.metering.read.reportData(Timestamp.Epoch, None, None))
checkContentsBefore()
// Restart the indexer - should delete data from the partial insert above
val end2 = executeSql(backend.parameter.ledgerEnd)
executeSql(backend.ingestion.deletePartiallyIngestedData(end2))
// Move the ledger end so that any non-deleted data would become visible
executeSql(updateLedgerEnd(ledgerEnd(10, 6L)))
executeSql(updateLedgerEnd(ledgerEnd(lastOffset2 + 1, lastEventSeqId2 + 1)))
// Check the contents
val parties2 = executeSql(backend.party.knownParties)
val config2 = executeSql(backend.configuration.ledgerConfiguration)
val packages2 = executeSql(backend.packageBackend.lfPackages)
val contract42 = executeSql(
backend.contract.activeContractWithoutArgument(
readers,
hashCid("#4"),
)
)
val contract92 = executeSql(
backend.contract.activeContractWithoutArgument(
readers,
hashCid("#9"),
)
)
val filterIds2 = executeSql(
backend.event.activeContractEventIds(
partyFilter = someParty,
templateIdFilter = None,
startExclusive = 0,
endInclusive = 1000,
limit = 1000,
)
)
val metering2 =
executeSql(backend.metering.read.reportData(Timestamp.Epoch, None, None))
parties1 should have length 1
packages1 should have size 1
config1 shouldBe Some(offset(1) -> someConfiguration)
contract41 should not be empty
contract91 shouldBe None
filterIds1 shouldBe List(1L, 4L) // since ledger-end does not limit the range query
// Metering report can include partially ingested data in non-final reports
metering1.applicationData should have size 3
metering1.isFinal shouldBe false
parties2 should have length 1
packages2 should have size 1
config2 shouldBe Some(offset(1) -> someConfiguration)
contract42 should not be empty
contract92 shouldBe None
filterIds2 shouldBe List(1L)
metering2.applicationData should have size 2 // Partially ingested data removed
checkContentsAfter()
}
it should "delete overspill entries written before first ledger end update" in {
val signatory = Ref.Party.assertFromString("signatory")
val dtos1: Vector[DbDto] = Vector(
// 1: config change
dtoConfiguration(offset(1), someConfiguration),
// 2: party allocation
dtoPartyEntry(offset(2), "party1"),
// 3: package upload
dtoPackage(offset(3)),
dtoPackageEntry(offset(3)),
// 4: transaction with create node
dtoCreate(offset(4), 1L, hashCid("#4"), signatory = signatory),
DbDto.CreateFilter(1L, someTemplateId.toString, someParty.toString),
dtoCompletion(offset(4)),
// 5: transaction with exercise node and retroactive divulgence
dtoExercise(offset(5), 2L, false, hashCid("#4")),
dtoDivulgence(Some(offset(5)), 3L, hashCid("#4")),
dtoCompletion(offset(5)),
// Transaction Metering
dtoMetering("AppA", offset(1)),
dtoMetering("AppB", offset(4)),
)
val readers = Set(signatory)
private def fixtureOverspillBeforeFirstLedgerEndUpdate(
dtos: Vector[DbDto],
lastOffset: Long,
lastEventSeqId: Long,
checkContentsAfter: () => Assertion,
): Assertion = {
// Initialize
executeSql(backend.parameter.initializeParameters(someIdentityParams))
executeSql(backend.meteringParameter.initializeLedgerMeteringEnd(someLedgerMeteringEnd))
// Start the indexer (a no-op in this case)
val end1 = executeSql(backend.parameter.ledgerEnd)
executeSql(backend.ingestion.deletePartiallyIngestedData(end1))
// Insert first batch of updates, but crash before writing the first ledger end
executeSql(ingest(dtos1, _))
executeSql(ingest(dtos, _))
// Restart the indexer - should delete data from the partial insert above
val end2 = executeSql(backend.parameter.ledgerEnd)
executeSql(backend.ingestion.deletePartiallyIngestedData(end2))
// Move the ledger end so that any non-deleted data would become visible
executeSql(updateLedgerEnd(ledgerEnd(10, 6L)))
// Check the contents
val parties2 = executeSql(backend.party.knownParties)
val config2 = executeSql(backend.configuration.ledgerConfiguration)
val packages2 = executeSql(backend.packageBackend.lfPackages)
val contract42 = executeSql(
backend.contract.activeContractWithoutArgument(
readers,
hashCid("#4"),
)
)
val contract92 = executeSql(
backend.contract.activeContractWithoutArgument(
readers,
hashCid("#9"),
)
)
val filterIds2 = executeSql(
backend.event.activeContractEventIds(
partyFilter = someParty,
templateIdFilter = None,
startExclusive = 0,
endInclusive = 1000,
limit = 1000,
)
)
val metering2 =
executeSql(backend.metering.read.reportData(Timestamp.Epoch, None, None))
parties2 shouldBe empty
packages2 shouldBe empty
config2 shouldBe empty
contract42 shouldBe None
contract92 shouldBe None
filterIds2 shouldBe empty
metering2.applicationData shouldBe empty
executeSql(updateLedgerEnd(ledgerEnd(lastOffset + 1, lastEventSeqId + 1)))
checkContentsAfter()
}
}

View File

@ -85,8 +85,8 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
contractId = hashCid("#1"),
signatory = someParty,
)
val createFilter1 = DbDto.CreateFilter(1L, someTemplateId.toString, "signatory")
val createFilter2 = DbDto.CreateFilter(1L, someTemplateId.toString, "observer")
val createFilter1 = DbDto.IdFilterCreateStakeholder(1L, someTemplateId.toString, "signatory")
val createFilter2 = DbDto.IdFilterCreateStakeholder(1L, someTemplateId.toString, "observer")
val createTransactionId = dtoTransactionId(create)
val archive = dtoExercise(
offset = offset(2),
@ -170,8 +170,8 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
contractId = hashCid("#1"),
signatory = someParty,
)
val createFilter1 = DbDto.CreateFilter(1L, someTemplateId.toString, "signatory")
val createFilter2 = DbDto.CreateFilter(1L, someTemplateId.toString, "observer")
val createFilter1 = DbDto.IdFilterCreateStakeholder(1L, someTemplateId.toString, "signatory")
val createFilter2 = DbDto.IdFilterCreateStakeholder(1L, someTemplateId.toString, "observer")
val createTransactionId = dtoTransactionId(create)
val range = RangeParams(0L, 1L, None, None)
val filter = FilterParams(Set(someParty), Set.empty)

View File

@ -55,7 +55,7 @@ private[backend] trait StorageBackendTestsReset extends Matchers with StorageBac
dtoPackageEntry(offset(3)),
// 4: transaction with create node
dtoCreate(offset(4), 1L, hashCid("#4")),
DbDto.CreateFilter(1L, someTemplateId.toString, someParty.toString),
DbDto.IdFilterCreateStakeholder(1L, someTemplateId.toString, someParty.toString),
dtoCompletion(offset(4)),
// 5: transaction with exercise node and retroactive divulgence
dtoExercise(offset(5), 2L, true, hashCid("#4")),

View File

@ -263,11 +263,11 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
it should "assign sequence ids correctly, and populate string-interning entries correctly in happy path case" in {
val result = ParallelIndexerSubscription.seqMapper(
_.zipWithIndex.map(x => x._2 -> x._2.toString).take(2),
internize = _.zipWithIndex.map(x => x._2 -> x._2.toString).take(2),
metrics,
)(
ParallelIndexerSubscription.seqMapperZero(15, 26),
Batch(
previous = ParallelIndexerSubscription.seqMapperZero(15, 26),
current = Batch(
lastOffset = offset("02"),
lastSeqEventId = 0,
lastStringInterningId = 0,
@ -277,27 +277,53 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
someEventDivulgence,
someParty,
someEventCreated,
DbDto.CreateFilter(0L, "", ""),
DbDto.CreateFilter(0L, "", ""),
DbDto.IdFilterCreateStakeholder(0L, "", ""),
DbDto.IdFilterCreateNonStakeholderInformee(0L, ""),
DbDto.IdFilterConsumingStakeholder(0L, "", ""),
DbDto.IdFilterConsumingNonStakeholderInformee(0L, ""),
DbDto.IdFilterNonConsumingInformee(0L, ""),
someEventCreated,
someEventCreated,
DbDto.TransactionMeta("", "", 0L, 0L),
someParty,
someEventExercise,
DbDto.TransactionMeta("", "", 0L, 0L),
someParty,
),
batchSize = 3,
offsetsUpdates = offsetsAndUpdates,
),
)
result.lastSeqEventId shouldBe 18
import scala.util.chaining._
result.lastSeqEventId shouldBe 20
result.lastStringInterningId shouldBe 1
result.batch(1).asInstanceOf[DbDto.EventDivulgence].event_sequential_id shouldBe 16
result.batch(3).asInstanceOf[DbDto.EventCreate].event_sequential_id shouldBe 17
result.batch(4).asInstanceOf[DbDto.CreateFilter].event_sequential_id shouldBe 17
result.batch(5).asInstanceOf[DbDto.CreateFilter].event_sequential_id shouldBe 17
result.batch(7).asInstanceOf[DbDto.EventExercise].event_sequential_id shouldBe 18
result.batch(9).asInstanceOf[DbDto.StringInterningDto].internalId shouldBe 0
result.batch(9).asInstanceOf[DbDto.StringInterningDto].externalString shouldBe "0"
result.batch(10).asInstanceOf[DbDto.StringInterningDto].internalId shouldBe 1
result.batch(10).asInstanceOf[DbDto.StringInterningDto].externalString shouldBe "1"
result.batch(4).asInstanceOf[DbDto.IdFilterCreateStakeholder].event_sequential_id shouldBe 17
result
.batch(5)
.asInstanceOf[DbDto.IdFilterCreateNonStakeholderInformee]
.event_sequential_id shouldBe 17
result.batch(6).asInstanceOf[DbDto.IdFilterConsumingStakeholder].event_sequential_id shouldBe 17
result
.batch(7)
.asInstanceOf[DbDto.IdFilterConsumingNonStakeholderInformee]
.event_sequential_id shouldBe 17
result.batch(8).asInstanceOf[DbDto.IdFilterNonConsumingInformee].event_sequential_id shouldBe 17
result.batch(11).asInstanceOf[DbDto.TransactionMeta].tap { transactionMeta =>
transactionMeta.event_sequential_id_first shouldBe 16L
transactionMeta.event_sequential_id_last shouldBe 19L
}
result.batch(13).asInstanceOf[DbDto.EventExercise].event_sequential_id shouldBe 20
result.batch(14).asInstanceOf[DbDto.TransactionMeta].tap { transactionMeta =>
transactionMeta.event_sequential_id_first shouldBe 20L
transactionMeta.event_sequential_id_last shouldBe 20L
}
result.batch(16).asInstanceOf[DbDto.StringInterningDto].internalId shouldBe 0
result.batch(16).asInstanceOf[DbDto.StringInterningDto].externalString shouldBe "0"
result.batch(17).asInstanceOf[DbDto.StringInterningDto].internalId shouldBe 1
result.batch(17).asInstanceOf[DbDto.StringInterningDto].externalString shouldBe "1"
}
it should "preserve sequence id if nothing to assign" in {

View File

@ -269,6 +269,8 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
)
}
val transactionId = Ref.TransactionId.assertFromString("TransactionId")
"handle TransactionAccepted (single create node)" in {
val completionInfo = someCompletionInfo
val transactionMeta = someTransactionMeta
@ -290,7 +292,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
optCompletionInfo = Some(completionInfo),
transactionMeta = transactionMeta,
transaction = transaction,
transactionId = Ref.TransactionId.assertFromString("TransactionId"),
transactionId = transactionId,
recordTime = someRecordTime,
divulgedContracts = List.empty,
blindingInfo = None,
@ -341,11 +343,17 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
deduplication_duration_seconds = None,
deduplication_start = None,
)
Set(dtos(1), dtos(2)) should contain theSameElementsAs Set(
DbDto.CreateFilter(0L, createNode.templateId.toString, "signatory"),
DbDto.CreateFilter(0L, createNode.templateId.toString, "observer"),
dtos(4) shouldEqual DbDto.TransactionMeta(
transaction_id = transactionId,
event_offset = someOffset.toHexString,
event_sequential_id_first = 0,
event_sequential_id_last = 0,
)
dtos.size shouldEqual 4
Set(dtos(1), dtos(2)) should contain theSameElementsAs Set(
DbDto.IdFilterCreateStakeholder(0L, createNode.templateId.toString, "signatory"),
DbDto.IdFilterCreateStakeholder(0L, createNode.templateId.toString, "observer"),
)
dtos.size shouldEqual 5
}
"handle TransactionAccepted (single consuming exercise node)" in {
@ -378,7 +386,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
optCompletionInfo = Some(completionInfo),
transactionMeta = transactionMeta,
transaction = transaction,
transactionId = Ref.TransactionId.assertFromString("TransactionId"),
transactionId = transactionId,
recordTime = someRecordTime,
divulgedContracts = List.empty,
blindingInfo = None,
@ -415,6 +423,16 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
exercise_result_compression = compressionAlgorithmId,
event_sequential_id = 0,
),
DbDto.IdFilterConsumingStakeholder(
event_sequential_id = 0,
template_id = exerciseNode.templateId.toString,
party_id = "signatory",
),
DbDto.IdFilterConsumingStakeholder(
event_sequential_id = 0,
template_id = exerciseNode.templateId.toString,
party_id = "observer",
),
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.micros,
@ -431,6 +449,12 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
deduplication_duration_seconds = None,
deduplication_start = None,
),
DbDto.TransactionMeta(
transaction_id = transactionId,
event_offset = someOffset.toHexString,
event_sequential_id_first = 0,
event_sequential_id_last = 0,
),
)
}
@ -464,7 +488,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
optCompletionInfo = Some(completionInfo),
transactionMeta = transactionMeta,
transaction = transaction,
transactionId = Ref.TransactionId.assertFromString("TransactionId"),
transactionId = transactionId,
recordTime = someRecordTime,
divulgedContracts = List.empty,
blindingInfo = None,
@ -501,6 +525,10 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
exercise_result_compression = compressionAlgorithmId,
event_sequential_id = 0,
),
DbDto.IdFilterNonConsumingInformee(
event_sequential_id = 0,
party_id = "signatory",
),
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.micros,
@ -517,6 +545,12 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
deduplication_duration_seconds = None,
deduplication_start = None,
),
DbDto.TransactionMeta(
transaction_id = transactionId,
event_offset = someOffset.toHexString,
event_sequential_id_first = 0,
event_sequential_id_last = 0,
),
)
}
@ -576,7 +610,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
optCompletionInfo = Some(completionInfo),
transactionMeta = transactionMeta,
transaction = transaction,
transactionId = Ref.TransactionId.assertFromString("TransactionId"),
transactionId = transactionId,
recordTime = someRecordTime,
divulgedContracts = List.empty,
blindingInfo = None,
@ -618,6 +652,10 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
exercise_result_compression = compressionAlgorithmId,
event_sequential_id = 0,
),
DbDto.IdFilterNonConsumingInformee(
event_sequential_id = 0,
party_id = "signatory",
),
DbDto.EventExercise(
consuming = false,
event_offset = Some(someOffset.toHexString),
@ -644,6 +682,10 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
exercise_result_compression = compressionAlgorithmId,
event_sequential_id = 0,
),
DbDto.IdFilterNonConsumingInformee(
event_sequential_id = 0,
party_id = "signatory",
),
DbDto.EventExercise(
consuming = false,
event_offset = Some(someOffset.toHexString),
@ -670,6 +712,10 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
exercise_result_compression = compressionAlgorithmId,
event_sequential_id = 0,
),
DbDto.IdFilterNonConsumingInformee(
event_sequential_id = 0,
party_id = "signatory",
),
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.micros,
@ -686,6 +732,12 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
deduplication_duration_seconds = None,
deduplication_start = None,
),
DbDto.TransactionMeta(
transaction_id = transactionId,
event_offset = someOffset.toHexString,
event_sequential_id_first = 0,
event_sequential_id_last = 0,
),
)
}
@ -727,7 +779,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
optCompletionInfo = Some(completionInfo),
transactionMeta = transactionMeta,
transaction = transaction,
transactionId = Ref.TransactionId.assertFromString("TransactionId"),
transactionId = transactionId,
recordTime = someRecordTime,
divulgedContracts = List.empty,
blindingInfo = None,
@ -754,7 +806,13 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
deduplication_duration_nanos = None,
deduplication_duration_seconds = None,
deduplication_start = None,
)
),
DbDto.TransactionMeta(
transaction_id = transactionId,
event_offset = someOffset.toHexString,
event_sequential_id_first = 0,
event_sequential_id_last = 0,
),
)
}
@ -790,7 +848,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
optCompletionInfo = Some(completionInfo),
transactionMeta = transactionMeta,
transaction = transaction,
transactionId = Ref.TransactionId.assertFromString("TransactionId"),
transactionId = transactionId,
recordTime = someRecordTime,
divulgedContracts = List.empty,
blindingInfo = None,
@ -827,6 +885,20 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
exercise_result_compression = compressionAlgorithmId,
event_sequential_id = 0,
),
DbDto.IdFilterConsumingStakeholder(
event_sequential_id = 0,
template_id = exerciseNode.templateId.toString,
party_id = "signatory",
),
DbDto.IdFilterConsumingStakeholder(
event_sequential_id = 0,
template_id = exerciseNode.templateId.toString,
party_id = "observer",
),
DbDto.IdFilterConsumingNonStakeholderInformee(
event_sequential_id = 0,
party_id = "divulgee",
),
DbDto.EventDivulgence(
event_offset = Some(someOffset.toHexString),
command_id = Some(completionInfo.commandId),
@ -858,6 +930,12 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
deduplication_duration_seconds = None,
deduplication_start = None,
),
DbDto.TransactionMeta(
transaction_id = transactionId,
event_offset = someOffset.toHexString,
event_sequential_id_first = 0,
event_sequential_id_last = 0,
),
)
}
@ -894,7 +972,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
optCompletionInfo = Some(completionInfo),
transactionMeta = transactionMeta,
transaction = transaction,
transactionId = Ref.TransactionId.assertFromString("TransactionId"),
transactionId = transactionId,
recordTime = someRecordTime,
divulgedContracts = List.empty,
blindingInfo = None,
@ -930,8 +1008,8 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
driver_metadata = Some(someContractDriverMetadata.toByteArray),
)
Set(dtos(1), dtos(2)) should contain theSameElementsAs Set(
DbDto.CreateFilter(0L, createNode.templateId.toString, "signatory"),
DbDto.CreateFilter(0L, createNode.templateId.toString, "observer"),
DbDto.IdFilterCreateStakeholder(0L, createNode.templateId.toString, "signatory"),
DbDto.IdFilterCreateStakeholder(0L, createNode.templateId.toString, "observer"),
)
dtos(3) shouldEqual DbDto.EventExercise(
consuming = true,
@ -959,7 +1037,21 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
exercise_result_compression = compressionAlgorithmId,
event_sequential_id = 0,
)
dtos(4) shouldEqual DbDto.EventDivulgence(
dtos(4) shouldEqual DbDto.IdFilterConsumingStakeholder(
event_sequential_id = 0,
template_id = exerciseNode.templateId.toString,
party_id = "signatory",
)
dtos(5) shouldEqual DbDto.IdFilterConsumingStakeholder(
event_sequential_id = 0,
template_id = exerciseNode.templateId.toString,
party_id = "observer",
)
dtos(6) shouldEqual DbDto.IdFilterConsumingNonStakeholderInformee(
event_sequential_id = 0,
party_id = "divulgee",
)
dtos(7) shouldEqual DbDto.EventDivulgence(
event_offset = Some(someOffset.toHexString),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
@ -974,7 +1066,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
create_argument_compression = compressionAlgorithmId,
event_sequential_id = 0,
)
dtos(5) shouldEqual DbDto.CommandCompletion(
dtos(8) shouldEqual DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.micros,
application_id = completionInfo.applicationId,
@ -990,7 +1082,13 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
deduplication_duration_seconds = None,
deduplication_start = None,
)
dtos.size shouldEqual 6
dtos(9) shouldEqual DbDto.TransactionMeta(
transaction_id = transactionId,
event_offset = someOffset.toHexString,
event_sequential_id_first = 0,
event_sequential_id_last = 0,
)
dtos.size shouldEqual 10
}
"handle TransactionAccepted (explicit blinding info)" in {
@ -1021,7 +1119,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
optCompletionInfo = Some(completionInfo),
transactionMeta = transactionMeta,
transaction = transaction,
transactionId = Ref.TransactionId.assertFromString("TransactionId"),
transactionId = transactionId,
recordTime = someRecordTime,
divulgedContracts =
List(state.DivulgedContract(createNode.coid, createNode.versionedCoinst)),
@ -1037,64 +1135,82 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
someOffset
)(update).toList
dtos should contain theSameElementsInOrderAs List(
DbDto.EventExercise(
consuming = true,
event_offset = Some(someOffset.toHexString),
transaction_id = Some(update.transactionId),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.micros),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
submitters = Some(completionInfo.actAs.toSet),
node_index = Some(exerciseNodeId.index),
event_id = Some(EventId(update.transactionId, exerciseNodeId).toLedgerString),
contract_id = exerciseNode.targetCoid.coid,
template_id = Some(exerciseNode.templateId.toString),
flat_event_witnesses = Set("signatory", "observer"),
tree_event_witnesses = Set("disclosee"), // taken from explicit blinding info
create_key_value = None,
exercise_choice = Some(exerciseNode.choiceId),
exercise_argument = Some(emptyArray),
exercise_result = Some(emptyArray),
exercise_actors = Some(Set("signatory")),
exercise_child_event_ids = Some(Vector.empty),
create_key_value_compression = compressionAlgorithmId,
exercise_argument_compression = compressionAlgorithmId,
exercise_result_compression = compressionAlgorithmId,
event_sequential_id = 0,
),
DbDto.EventDivulgence(
event_offset = Some(someOffset.toHexString),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
submitters = Some(completionInfo.actAs.toSet),
contract_id = exerciseNode.targetCoid.coid,
template_id =
Some(createNode.templateId.toString), // taken from explicit divulgedContracts
tree_event_witnesses = Set("divulgee"), // taken from explicit blinding info
create_argument = Some(emptyArray), // taken from explicit divulgedContracts
create_argument_compression = compressionAlgorithmId,
event_sequential_id = 0,
),
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.micros,
application_id = completionInfo.applicationId,
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
rejection_status_code = None,
rejection_status_message = None,
rejection_status_details = None,
submission_id = completionInfo.submissionId,
deduplication_offset = None,
deduplication_duration_nanos = None,
deduplication_duration_seconds = None,
deduplication_start = None,
),
dtos(0) shouldEqual DbDto.EventExercise(
consuming = true,
event_offset = Some(someOffset.toHexString),
transaction_id = Some(update.transactionId),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.micros),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
submitters = Some(completionInfo.actAs.toSet),
node_index = Some(exerciseNodeId.index),
event_id = Some(EventId(update.transactionId, exerciseNodeId).toLedgerString),
contract_id = exerciseNode.targetCoid.coid,
template_id = Some(exerciseNode.templateId.toString),
flat_event_witnesses = Set("signatory", "observer"),
tree_event_witnesses = Set("disclosee"), // taken from explicit blinding info
create_key_value = None,
exercise_choice = Some(exerciseNode.choiceId),
exercise_argument = Some(emptyArray),
exercise_result = Some(emptyArray),
exercise_actors = Some(Set("signatory")),
exercise_child_event_ids = Some(Vector.empty),
create_key_value_compression = compressionAlgorithmId,
exercise_argument_compression = compressionAlgorithmId,
exercise_result_compression = compressionAlgorithmId,
event_sequential_id = 0,
)
dtos(1) shouldEqual DbDto.IdFilterConsumingStakeholder(
event_sequential_id = 0,
template_id = exerciseNode.templateId.toString,
party_id = "signatory",
)
dtos(2) shouldEqual DbDto.IdFilterConsumingStakeholder(
event_sequential_id = 0,
template_id = exerciseNode.templateId.toString,
party_id = "observer",
)
dtos(3) shouldEqual DbDto.IdFilterConsumingNonStakeholderInformee(
event_sequential_id = 0,
party_id = "disclosee",
)
dtos(4) shouldEqual DbDto.EventDivulgence(
event_offset = Some(someOffset.toHexString),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
submitters = Some(completionInfo.actAs.toSet),
contract_id = exerciseNode.targetCoid.coid,
template_id = Some(createNode.templateId.toString), // taken from explicit divulgedContracts
tree_event_witnesses = Set("divulgee"), // taken from explicit blinding info
create_argument = Some(emptyArray), // taken from explicit divulgedContracts
create_argument_compression = compressionAlgorithmId,
event_sequential_id = 0,
)
dtos(5) shouldEqual DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.micros,
application_id = completionInfo.applicationId,
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
rejection_status_code = None,
rejection_status_message = None,
rejection_status_details = None,
submission_id = completionInfo.submissionId,
deduplication_offset = None,
deduplication_duration_nanos = None,
deduplication_duration_seconds = None,
deduplication_start = None,
)
dtos(6) shouldEqual DbDto.TransactionMeta(
transaction_id = transactionId,
event_offset = someOffset.toHexString,
event_sequential_id_first = 0,
event_sequential_id_last = 0,
)
dtos should have length 7
}
"handle TransactionAccepted (rollback node)" in {
@ -1134,7 +1250,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
optCompletionInfo = Some(completionInfo),
transactionMeta = transactionMeta,
transaction = transaction,
transactionId = Ref.TransactionId.assertFromString("TransactionId"),
transactionId = transactionId,
recordTime = someRecordTime,
divulgedContracts = List.empty,
blindingInfo = None,
@ -1177,6 +1293,12 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
deduplication_duration_seconds = None,
deduplication_start = None,
),
DbDto.TransactionMeta(
transaction_id = transactionId,
event_offset = someOffset.toHexString,
event_sequential_id_first = 0,
event_sequential_id_last = 0,
),
)
}
@ -1236,10 +1358,10 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
driver_metadata = Some(someContractDriverMetadata.toByteArray),
)
Set(dtos(1), dtos(2)) should contain theSameElementsAs Set(
DbDto.CreateFilter(0L, createNode.templateId.toString, "signatory"),
DbDto.CreateFilter(0L, createNode.templateId.toString, "observer"),
DbDto.IdFilterCreateStakeholder(0L, createNode.templateId.toString, "signatory"),
DbDto.IdFilterCreateStakeholder(0L, createNode.templateId.toString, "observer"),
)
dtos.size shouldEqual 3
dtos.size shouldEqual 4
}
"handle TransactionAccepted (no contract metadata)" in {
@ -1263,7 +1385,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
optCompletionInfo = None,
transactionMeta = transactionMeta,
transaction = transaction,
transactionId = Ref.TransactionId.assertFromString("TransactionId"),
transactionId = transactionId,
recordTime = someRecordTime,
divulgedContracts = List.empty,
blindingInfo = None,
@ -1298,11 +1420,17 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
event_sequential_id = 0,
driver_metadata = None,
)
Set(dtos(1), dtos(2)) should contain theSameElementsAs Set(
DbDto.CreateFilter(0L, createNode.templateId.toString, "signatory"),
DbDto.CreateFilter(0L, createNode.templateId.toString, "observer"),
dtos(3) shouldEqual DbDto.TransactionMeta(
transaction_id = transactionId,
event_offset = someOffset.toHexString,
event_sequential_id_first = 0,
event_sequential_id_last = 0,
)
dtos.size shouldEqual 3
Set(dtos(1), dtos(2)) should contain theSameElementsAs Set(
DbDto.IdFilterCreateStakeholder(0L, createNode.templateId.toString, "signatory"),
DbDto.IdFilterCreateStakeholder(0L, createNode.templateId.toString, "observer"),
)
dtos.size shouldEqual 4
}
val deduplicationPeriods = Table(
@ -1394,7 +1522,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
optCompletionInfo = Some(completionInfo),
transactionMeta = transactionMeta,
transaction = transaction,
transactionId = Ref.TransactionId.assertFromString("TransactionId"),
transactionId = transactionId,
recordTime = someRecordTime,
divulgedContracts = List.empty,
blindingInfo = None,
@ -1430,8 +1558,8 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
driver_metadata = Some(someContractDriverMetadata.toByteArray),
)
Set(dtos(1), dtos(2)) should contain theSameElementsAs Set(
DbDto.CreateFilter(0L, createNode.templateId.toString, "signatory"),
DbDto.CreateFilter(0L, createNode.templateId.toString, "observer"),
DbDto.IdFilterCreateStakeholder(0L, createNode.templateId.toString, "signatory"),
DbDto.IdFilterCreateStakeholder(0L, createNode.templateId.toString, "observer"),
)
dtos(3) shouldEqual DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
@ -1449,7 +1577,13 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
deduplication_duration_nanos = expectedDeduplicationDurationNanos,
deduplication_start = None,
)
dtos.size shouldEqual 4
dtos(4) shouldEqual DbDto.TransactionMeta(
transaction_id = transactionId,
event_offset = someOffset.toHexString,
event_sequential_id_first = 0,
event_sequential_id_last = 0,
)
dtos.size shouldEqual 5
}
}
}

View File

@ -52,8 +52,14 @@ class SequentialWriteDaoSpec extends AnyFlatSpec with Matchers {
storageBackendCaptor.captured(0) shouldBe someParty
storageBackendCaptor.captured(1) shouldBe LedgerEnd(offset("01"), 5, 1)
storageBackendCaptor.captured(2).asInstanceOf[DbDto.EventCreate].event_sequential_id shouldBe 6
storageBackendCaptor.captured(3).asInstanceOf[DbDto.CreateFilter].event_sequential_id shouldBe 6
storageBackendCaptor.captured(4).asInstanceOf[DbDto.CreateFilter].event_sequential_id shouldBe 6
storageBackendCaptor
.captured(3)
.asInstanceOf[DbDto.IdFilterCreateStakeholder]
.event_sequential_id shouldBe 6
storageBackendCaptor
.captured(4)
.asInstanceOf[DbDto.IdFilterCreateStakeholder]
.event_sequential_id shouldBe 6
storageBackendCaptor
.captured(5)
.asInstanceOf[DbDto.EventExercise]
@ -271,8 +277,8 @@ object SequentialWriteDaoSpec {
partyAndCreateFixture.get.rejectionReason -> List(someParty, someEventCreated),
allEventsFixture.get.rejectionReason -> List(
someEventCreated,
DbDto.CreateFilter(0L, "", ""),
DbDto.CreateFilter(0L, "", ""),
DbDto.IdFilterCreateStakeholder(0L, "", ""),
DbDto.IdFilterCreateStakeholder(0L, "", ""),
someEventExercise,
someEventDivulgence,
),

View File

@ -4,13 +4,10 @@
package com.daml.platform.store.migration.oracle
import com.daml.platform.store.migration.tests.MigrationEtqTests
import org.scalatest.Ignore
// TODO etq: Enable when data migration is added
@Ignore
class OracleMigrationFrom015To016EtqTest
class OracleMigrationFrom017To020EtqTest
extends MigrationEtqTests
with OracleAroundEachForMigrations {
override def srcMigration: String = "16"
override def dstMigration: String = "17"
override def srcMigration: String = "17"
override def dstMigration: String = "20"
}

View File

@ -5,13 +5,10 @@ package com.daml
package platform.store.migration.postgres
import com.daml.platform.store.migration.tests.MigrationEtqTests
import org.scalatest.Ignore
// TODO etq: Enable when data migration is added
@Ignore
class PostgresMigrationFrom125To126EtqTest
class PostgresMigrationFrom127To130EtqTest
extends MigrationEtqTests
with PostgresAroundEachForMigrations {
override def srcMigration: String = "126"
override def dstMigration: String = "127"
override def srcMigration: String = "127"
override def dstMigration: String = "130"
}