factor common structure in RawBatch.Event (#6216)

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Stephen Compall 2020-06-04 09:55:28 -04:00 committed by GitHub
parent 2c3efc6c4f
commit 8e3e296572
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 60 additions and 93 deletions

View File

@ -111,15 +111,16 @@ private[events] trait EventsTableInsert { this: EventsTable =>
}
private case class AccumulatingBatches(
creates: Vector[RawBatch.Event.Created],
exercises: Vector[RawBatch.Event.Exercised],
creates: Vector[RawBatch.Event[RawBatch.Event.Created]],
exercises: Vector[RawBatch.Event[RawBatch.Event.Exercised]],
archives: Vector[Vector[NamedParameter]],
) {
def add(create: RawBatch.Event.Created): AccumulatingBatches =
def add(create: RawBatch.Event[RawBatch.Event.Created]): AccumulatingBatches =
copy(creates = creates :+ create)
def add(exercise: RawBatch.Event.Exercised): AccumulatingBatches =
def add(exercise: RawBatch.Event[RawBatch.Event.Exercised])(
implicit dummy: DummyImplicit): AccumulatingBatches =
copy(exercises = exercises :+ exercise)
def add(archive: Vector[NamedParameter]): AccumulatingBatches =
@ -127,7 +128,7 @@ private[events] trait EventsTableInsert { this: EventsTable =>
private def prepareRawNonEmpty(
query: String,
params: Vector[RawBatch.Event],
params: Vector[RawBatch.Event[_]],
): Option[RawBatch] =
if (params.nonEmpty) Some(new RawBatch(query, params)) else None
@ -167,42 +168,29 @@ private[events] trait EventsTableInsert { this: EventsTable =>
transaction: GenTransaction.WithTxValue[Nid, ContractId],
flatWitnesses: WitnessRelation[Nid],
treeWitnesses: WitnessRelation[Nid],
): RawBatches =
): RawBatches = {
def event[Sp <: RawBatch.Event.Specific](nodeId: Nid, sp: Sp) =
new RawBatch.Event(
applicationId = submitterInfo.map(_.applicationId),
workflowId = workflowId,
commandId = submitterInfo.map(_.commandId),
transactionId = transactionId,
nodeId = nodeId,
submitter = submitterInfo.map(_.submitter),
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
flatWitnesses = flatWitnesses getOrElse (nodeId, Set.empty),
treeWitnesses = treeWitnesses getOrElse (nodeId, Set.empty),
specific = sp,
)
transaction
.fold(AccumulatingBatches.empty) {
case (batches, (nodeId, node: Create)) =>
batches.add(
new RawBatch.Event.Created(
applicationId = submitterInfo.map(_.applicationId),
workflowId = workflowId,
commandId = submitterInfo.map(_.commandId),
transactionId = transactionId,
nodeId = nodeId,
submitter = submitterInfo.map(_.submitter),
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
flatWitnesses = flatWitnesses getOrElse (nodeId, Set.empty),
treeWitnesses = treeWitnesses getOrElse (nodeId, Set.empty),
create = node,
)
)
batches.add(event(nodeId, new RawBatch.Event.Created(node)))
case (batches, (nodeId, node: Exercise)) =>
val batchWithExercises =
batches.add(
new RawBatch.Event.Exercised(
applicationId = submitterInfo.map(_.applicationId),
workflowId = workflowId,
commandId = submitterInfo.map(_.commandId),
transactionId = transactionId,
nodeId = nodeId,
submitter = submitterInfo.map(_.submitter),
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
flatWitnesses = flatWitnesses getOrElse (nodeId, Set.empty),
treeWitnesses = treeWitnesses getOrElse (nodeId, Set.empty),
exercise = node,
)
)
batches.add(event(nodeId, new RawBatch.Event.Exercised(node)))
if (node.consuming) {
batchWithExercises.add(
archive(
@ -217,5 +205,6 @@ private[events] trait EventsTableInsert { this: EventsTable =>
batches // ignore any event which is neither a create nor an exercise
}
.prepare
}
}

View File

@ -6,6 +6,7 @@ package com.daml.platform.store.dao.events
import java.time.Instant
import anorm.NamedParameter
import com.daml.ledger.EventId
import com.daml.ledger.participant.state.v1.Offset
import com.daml.ledger.{ApplicationId, CommandId, TransactionId, WorkflowId}
import com.daml.platform.events.EventIdFormatter.fromTransactionId
@ -51,7 +52,10 @@ private[events] object RawBatch {
partial :+ lfValueTranslation.serialize(contractId, createArgument)
}
sealed abstract class Event(
// this unfortunate upper bound is here to get access to
// [[Event.Specific#applySerialization]]; we would do away with it if
// [[PartialParameters#applySerialization]] was statically determined
final case class Event[+Specific <: Event.Specific](
applicationId: Option[ApplicationId],
workflowId: Option[WorkflowId],
commandId: Option[CommandId],
@ -62,9 +66,10 @@ private[events] object RawBatch {
offset: Offset,
flatWitnesses: Set[Party],
treeWitnesses: Set[Party],
specific: Specific,
) extends PartialParameters {
final protected val eventId = fromTransactionId(transactionId, nodeId)
final protected val base: Vector[NamedParameter] =
private[this] val eventId = fromTransactionId(transactionId, nodeId)
private[this] val base: Vector[NamedParameter] =
Vector[NamedParameter](
"event_id" -> eventId,
"event_offset" -> offset,
@ -78,87 +83,60 @@ private[events] object RawBatch {
"flat_event_witnesses" -> Party.Array(flatWitnesses.toSeq: _*),
"tree_event_witnesses" -> Party.Array(treeWitnesses.toSeq: _*),
)
override def applySerialization(
lfValueTranslation: LfValueTranslation): Vector[NamedParameter] =
base ++ specific.applySerialization(transactionId, eventId, lfValueTranslation)
}
object Event {
sealed abstract class Specific {
private[Event] def applySerialization(
transactionId: TransactionId,
eventId: EventId,
lfValueTranslation: LfValueTranslation,
): Vector[NamedParameter]
}
final class Created(
applicationId: Option[ApplicationId],
workflowId: Option[WorkflowId],
commandId: Option[CommandId],
transactionId: TransactionId,
nodeId: NodeId,
submitter: Option[Party],
ledgerEffectiveTime: Instant,
offset: Offset,
flatWitnesses: Set[Party],
treeWitnesses: Set[Party],
create: Create,
) extends Event(
applicationId = applicationId,
workflowId = workflowId,
commandId = commandId,
transactionId = transactionId,
nodeId = nodeId,
submitter = submitter,
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
flatWitnesses = flatWitnesses,
treeWitnesses = treeWitnesses,
) {
val partial: Vector[NamedParameter] =
base ++ Vector[NamedParameter](
) extends Specific {
private val partial: Vector[NamedParameter] =
Vector[NamedParameter](
"contract_id" -> create.coid.coid,
"template_id" -> create.coinst.template,
"create_signatories" -> create.signatories.toArray[String],
"create_observers" -> create.stakeholders.diff(create.signatories).toArray[String],
"create_agreement_text" -> Some(create.coinst.agreementText).filter(_.nonEmpty),
)
override def applySerialization(
override private[Event] def applySerialization(
transactionId: TransactionId,
eventId: EventId,
lfValueTranslation: LfValueTranslation,
): Vector[NamedParameter] =
partial ++ lfValueTranslation.serialize(eventId, create)
}
final class Exercised(
applicationId: Option[ApplicationId],
workflowId: Option[WorkflowId],
commandId: Option[CommandId],
transactionId: TransactionId,
nodeId: NodeId,
submitter: Option[Party],
ledgerEffectiveTime: Instant,
offset: Offset,
flatWitnesses: Set[Party],
treeWitnesses: Set[Party],
exercise: Exercise,
) extends Event(
applicationId = applicationId,
workflowId = workflowId,
commandId = commandId,
transactionId = transactionId,
nodeId = nodeId,
submitter = submitter,
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
flatWitnesses = flatWitnesses,
treeWitnesses = treeWitnesses,
) {
val partial: Vector[NamedParameter] =
base ++ Vector[NamedParameter](
) extends Specific {
private val partial: Vector[NamedParameter] =
Vector[NamedParameter](
"contract_id" -> exercise.targetCoid,
"template_id" -> exercise.templateId,
"exercise_consuming" -> exercise.consuming,
"exercise_choice" -> exercise.choiceId,
"exercise_actors" -> exercise.actingParties.toArray[String],
"exercise_child_event_ids" -> exercise.children
.map(fromTransactionId(transactionId, _))
.toArray[String],
)
override def applySerialization(
override private[Event] def applySerialization(
transactionId: TransactionId,
eventId: EventId,
lfValueTranslation: LfValueTranslation,
): Vector[NamedParameter] =
partial ++ lfValueTranslation.serialize(eventId, exercise)
(partial :+ ("exercise_child_event_ids" -> exercise.children
.map(fromTransactionId(transactionId, _))
.toArray[String]: NamedParameter)) ++ lfValueTranslation.serialize(eventId, exercise)
}
}