mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
Parallel Ingestion renames (#9894)
* DBDTOV1 -> DbDto * UpdateToDBDTOV1 -> UpdateToDbDto changelog_begin changelog_end
This commit is contained in:
parent
6e48034edc
commit
3c83b77dd8
@ -19,7 +19,7 @@ import com.daml.platform.indexer.{IndexFeedHandle, Indexer}
|
||||
import com.daml.platform.store.{DbType, backend}
|
||||
import com.daml.platform.store.appendonlydao.DbDispatcher
|
||||
import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValueTranslation}
|
||||
import com.daml.platform.store.backend.{DBDTOV1, StorageBackend}
|
||||
import com.daml.platform.store.backend.{DbDto, StorageBackend}
|
||||
import com.daml.resources
|
||||
|
||||
import scala.concurrent.Future
|
||||
@ -69,7 +69,7 @@ object ParallelIndexerFactory {
|
||||
metrics = metrics,
|
||||
connectionAsyncCommitMode = DbType.AsynchronousCommit,
|
||||
)
|
||||
toDbDto = backend.UpdateToDBDTOV1(
|
||||
toDbDto = backend.UpdateToDbDto(
|
||||
participantId = participantId,
|
||||
translation = translation,
|
||||
compressionStrategy = compressionStrategy,
|
||||
@ -136,10 +136,10 @@ object ParallelIndexerFactory {
|
||||
|
||||
def inputMapper(
|
||||
metrics: Metrics,
|
||||
toDbDto: Offset => Update => Iterator[DBDTOV1],
|
||||
toDbDto: Offset => Update => Iterator[DbDto],
|
||||
)(implicit
|
||||
loggingContext: LoggingContext
|
||||
): Iterable[((Offset, Update), Long)] => Batch[Vector[DBDTOV1]] = { input =>
|
||||
): Iterable[((Offset, Update), Long)] => Batch[Vector[DbDto]] = { input =>
|
||||
metrics.daml.parallelIndexer.inputMapping.batchSize.update(input.size)
|
||||
input.foreach { case ((offset, update), _) =>
|
||||
LoggingContext.withEnrichedLoggingContext(
|
||||
@ -162,7 +162,7 @@ object ParallelIndexerFactory {
|
||||
)
|
||||
}
|
||||
|
||||
def seqMapperZero(initialSeqId: Long): Batch[Vector[DBDTOV1]] =
|
||||
def seqMapperZero(initialSeqId: Long): Batch[Vector[DbDto]] =
|
||||
Batch(
|
||||
lastOffset = null,
|
||||
lastSeqEventId = initialSeqId, // this is the only property of interest in the zero element
|
||||
@ -174,20 +174,20 @@ object ParallelIndexerFactory {
|
||||
)
|
||||
|
||||
def seqMapper(metrics: Metrics)(
|
||||
previous: Batch[Vector[DBDTOV1]],
|
||||
current: Batch[Vector[DBDTOV1]],
|
||||
): Batch[Vector[DBDTOV1]] = {
|
||||
previous: Batch[Vector[DbDto]],
|
||||
current: Batch[Vector[DbDto]],
|
||||
): Batch[Vector[DbDto]] = {
|
||||
var eventSeqId = previous.lastSeqEventId
|
||||
val batchWithSeqIds = current.batch.map {
|
||||
case dbDto: DBDTOV1.EventCreate =>
|
||||
case dbDto: DbDto.EventCreate =>
|
||||
eventSeqId += 1
|
||||
dbDto.copy(event_sequential_id = eventSeqId)
|
||||
|
||||
case dbDto: DBDTOV1.EventExercise =>
|
||||
case dbDto: DbDto.EventExercise =>
|
||||
eventSeqId += 1
|
||||
dbDto.copy(event_sequential_id = eventSeqId)
|
||||
|
||||
case dbDto: DBDTOV1.EventDivulgence =>
|
||||
case dbDto: DbDto.EventDivulgence =>
|
||||
eventSeqId += 1
|
||||
dbDto.copy(event_sequential_id = eventSeqId)
|
||||
|
||||
@ -206,9 +206,9 @@ object ParallelIndexerFactory {
|
||||
}
|
||||
|
||||
def batcher[DB_BATCH](
|
||||
batchF: Vector[DBDTOV1] => DB_BATCH,
|
||||
batchF: Vector[DbDto] => DB_BATCH,
|
||||
metrics: Metrics,
|
||||
): Batch[Vector[DBDTOV1]] => Batch[DB_BATCH] = { inBatch =>
|
||||
): Batch[Vector[DbDto]] => Batch[DB_BATCH] = { inBatch =>
|
||||
val dbBatch = batchF(inBatch.batch)
|
||||
val nowNanos = System.nanoTime()
|
||||
metrics.daml.parallelIndexer.batching.duration.update(
|
||||
|
@ -46,7 +46,7 @@ import com.daml.platform.store.appendonlydao.events.{
|
||||
PostCommitValidation,
|
||||
TransactionsReader,
|
||||
}
|
||||
import com.daml.platform.store.backend.{StorageBackend, UpdateToDBDTOV1}
|
||||
import com.daml.platform.store.backend.{StorageBackend, UpdateToDbDto}
|
||||
import com.daml.platform.store.dao.ParametersTable.LedgerEndUpdateError
|
||||
import com.daml.platform.store.dao.events.TransactionsWriter.PreparedInsert
|
||||
import com.daml.platform.store.dao.{
|
||||
@ -298,7 +298,7 @@ private class JdbcLedgerDao(
|
||||
recordTime = Time.Timestamp.assertFromInstant(recordedAt),
|
||||
submissionId = SubmissionId.assertFromString(submissionId),
|
||||
participantId =
|
||||
v1.ParticipantId.assertFromString("1"), // not used for DBDTO generation
|
||||
v1.ParticipantId.assertFromString("1"), // not used for DbDto generation
|
||||
newConfiguration = configuration,
|
||||
)
|
||||
|
||||
@ -307,7 +307,7 @@ private class JdbcLedgerDao(
|
||||
recordTime = Time.Timestamp.assertFromInstant(recordedAt),
|
||||
submissionId = SubmissionId.assertFromString(submissionId),
|
||||
participantId =
|
||||
v1.ParticipantId.assertFromString("1"), // not used for DBDTO generation
|
||||
v1.ParticipantId.assertFromString("1"), // not used for DbDto generation
|
||||
proposedConfiguration = configuration,
|
||||
rejectionReason = reason,
|
||||
)
|
||||
@ -574,11 +574,11 @@ private class JdbcLedgerDao(
|
||||
ledgerEffectiveTime =
|
||||
Time.Timestamp.assertFromInstant(tx.ledgerEffectiveTime),
|
||||
workflowId = tx.workflowId,
|
||||
submissionTime = null, // not used for DBDTO generation
|
||||
submissionSeed = null, // not used for DBDTO generation
|
||||
optUsedPackages = None, // not used for DBDTO generation
|
||||
optNodeSeeds = None, // not used for DBDTO generation
|
||||
optByKeyNodes = None, // not used for DBDTO generation
|
||||
submissionTime = null, // not used for DbDto generation
|
||||
submissionSeed = null, // not used for DbDto generation
|
||||
optUsedPackages = None, // not used for DbDto generation
|
||||
optNodeSeeds = None, // not used for DbDto generation
|
||||
optByKeyNodes = None, // not used for DbDto generation
|
||||
),
|
||||
transaction = tx.transaction,
|
||||
transactionId = tx.transactionId,
|
||||
@ -960,11 +960,11 @@ private class JdbcLedgerDao(
|
||||
transactionMeta = TransactionMeta(
|
||||
ledgerEffectiveTime = Time.Timestamp.assertFromInstant(ledgerEffectiveTime),
|
||||
workflowId = workflowId,
|
||||
submissionTime = null, // not used for DBDTO generation
|
||||
submissionSeed = null, // not used for DBDTO generation
|
||||
optUsedPackages = None, // not used for DBDTO generation
|
||||
optNodeSeeds = None, // not used for DBDTO generation
|
||||
optByKeyNodes = None, // not used for DBDTO generation
|
||||
submissionTime = null, // not used for DbDto generation
|
||||
submissionSeed = null, // not used for DbDto generation
|
||||
optUsedPackages = None, // not used for DbDto generation
|
||||
optNodeSeeds = None, // not used for DbDto generation
|
||||
optByKeyNodes = None, // not used for DbDto generation
|
||||
),
|
||||
transaction = transaction,
|
||||
transactionId = transactionId,
|
||||
@ -1116,7 +1116,7 @@ private[platform] object JdbcLedgerDao {
|
||||
): SequentialWriteDao =
|
||||
SequentialWriteDaoImpl(
|
||||
storageBackend = StorageBackend.of(dbType),
|
||||
updateToDbDtos = UpdateToDBDTOV1(
|
||||
updateToDbDtos = UpdateToDbDto(
|
||||
participantId = participantId,
|
||||
translation = new LfValueTranslation(
|
||||
cache = lfValueTranslationCache,
|
||||
|
@ -6,7 +6,7 @@ package com.daml.platform.store.appendonlydao
|
||||
import java.sql.Connection
|
||||
|
||||
import com.daml.ledger.participant.state.v1.{Offset, Update}
|
||||
import com.daml.platform.store.backend.{DBDTOV1, StorageBackend}
|
||||
import com.daml.platform.store.backend.{DbDto, StorageBackend}
|
||||
|
||||
import scala.util.chaining.scalaUtilChainingOps
|
||||
|
||||
@ -16,7 +16,7 @@ trait SequentialWriteDao {
|
||||
|
||||
case class SequentialWriteDaoImpl[DB_BATCH](
|
||||
storageBackend: StorageBackend[DB_BATCH],
|
||||
updateToDbDtos: Offset => Update => Iterator[DBDTOV1],
|
||||
updateToDbDtos: Offset => Update => Iterator[DbDto],
|
||||
) extends SequentialWriteDao {
|
||||
|
||||
private var lastEventSeqId: Long = _
|
||||
@ -33,11 +33,11 @@ case class SequentialWriteDaoImpl[DB_BATCH](
|
||||
lastEventSeqId
|
||||
}
|
||||
|
||||
private def adaptEventSeqIds(dbDtos: Iterator[DBDTOV1]): Vector[DBDTOV1] =
|
||||
private def adaptEventSeqIds(dbDtos: Iterator[DbDto]): Vector[DbDto] =
|
||||
dbDtos.map {
|
||||
case e: DBDTOV1.EventCreate => e.copy(event_sequential_id = nextEventSeqId)
|
||||
case e: DBDTOV1.EventDivulgence => e.copy(event_sequential_id = nextEventSeqId)
|
||||
case e: DBDTOV1.EventExercise => e.copy(event_sequential_id = nextEventSeqId)
|
||||
case e: DbDto.EventCreate => e.copy(event_sequential_id = nextEventSeqId)
|
||||
case e: DbDto.EventDivulgence => e.copy(event_sequential_id = nextEventSeqId)
|
||||
case e: DbDto.EventExercise => e.copy(event_sequential_id = nextEventSeqId)
|
||||
case notEvent => notEvent
|
||||
}.toVector
|
||||
|
||||
|
@ -7,12 +7,12 @@ import java.time.Instant
|
||||
|
||||
import com.daml.scalautil.NeverEqualsOverride
|
||||
|
||||
sealed trait DBDTOV1
|
||||
sealed trait DbDto
|
||||
extends NeverEqualsOverride
|
||||
with Product
|
||||
with Serializable // to aid type inference for case class implementors
|
||||
|
||||
object DBDTOV1 {
|
||||
object DbDto {
|
||||
|
||||
final case class EventDivulgence(
|
||||
event_offset: Option[String],
|
||||
@ -26,7 +26,7 @@ object DBDTOV1 {
|
||||
create_argument: Option[Array[Byte]],
|
||||
create_argument_compression: Option[Int],
|
||||
event_sequential_id: Long,
|
||||
) extends DBDTOV1
|
||||
) extends DbDto
|
||||
|
||||
final case class EventCreate(
|
||||
event_offset: Option[String],
|
||||
@ -51,7 +51,7 @@ object DBDTOV1 {
|
||||
create_argument_compression: Option[Int],
|
||||
create_key_value_compression: Option[Int],
|
||||
event_sequential_id: Long,
|
||||
) extends DBDTOV1
|
||||
) extends DbDto
|
||||
|
||||
final case class EventExercise(
|
||||
consuming: Boolean,
|
||||
@ -78,7 +78,7 @@ object DBDTOV1 {
|
||||
exercise_argument_compression: Option[Int],
|
||||
exercise_result_compression: Option[Int],
|
||||
event_sequential_id: Long,
|
||||
) extends DBDTOV1
|
||||
) extends DbDto
|
||||
|
||||
final case class ConfigurationEntry(
|
||||
ledger_offset: String,
|
||||
@ -87,7 +87,7 @@ object DBDTOV1 {
|
||||
typ: String,
|
||||
configuration: Array[Byte],
|
||||
rejection_reason: Option[String],
|
||||
) extends DBDTOV1
|
||||
) extends DbDto
|
||||
|
||||
final case class PackageEntry(
|
||||
ledger_offset: String,
|
||||
@ -95,7 +95,7 @@ object DBDTOV1 {
|
||||
submission_id: Option[String],
|
||||
typ: String,
|
||||
rejection_reason: Option[String],
|
||||
) extends DBDTOV1
|
||||
) extends DbDto
|
||||
|
||||
final case class Package(
|
||||
package_id: String,
|
||||
@ -105,7 +105,7 @@ object DBDTOV1 {
|
||||
known_since: Instant,
|
||||
ledger_offset: String,
|
||||
_package: Array[Byte],
|
||||
) extends DBDTOV1
|
||||
) extends DbDto
|
||||
|
||||
final case class PartyEntry(
|
||||
ledger_offset: String,
|
||||
@ -116,7 +116,7 @@ object DBDTOV1 {
|
||||
typ: String,
|
||||
rejection_reason: Option[String],
|
||||
is_local: Option[Boolean],
|
||||
) extends DBDTOV1
|
||||
) extends DbDto
|
||||
|
||||
final case class Party(
|
||||
party: String,
|
||||
@ -124,7 +124,7 @@ object DBDTOV1 {
|
||||
explicit: Boolean,
|
||||
ledger_offset: Option[String],
|
||||
is_local: Boolean,
|
||||
) extends DBDTOV1
|
||||
) extends DbDto
|
||||
|
||||
final case class CommandCompletion(
|
||||
completion_offset: String,
|
||||
@ -135,8 +135,8 @@ object DBDTOV1 {
|
||||
transaction_id: Option[String],
|
||||
status_code: Option[Int],
|
||||
status_message: Option[String],
|
||||
) extends DBDTOV1
|
||||
) extends DbDto
|
||||
|
||||
final case class CommandDeduplication(deduplication_key: String) extends DBDTOV1
|
||||
final case class CommandDeduplication(deduplication_key: String) extends DbDto
|
||||
|
||||
}
|
@ -18,10 +18,10 @@ trait StorageBackend[DB_BATCH] {
|
||||
/** The CPU intensive batching operation hides the batching logic, and the mapping to the database specific representation of the inserted data.
|
||||
* This should be pure CPU logic without IO.
|
||||
*
|
||||
* @param dbDtos is a collection of DBDTOV1 from which the batch is formed
|
||||
* @param dbDtos is a collection of DbDto from which the batch is formed
|
||||
* @return the database-specific batch DTO, which can be inserted via insertBatch
|
||||
*/
|
||||
def batch(dbDtos: Vector[DBDTOV1]): DB_BATCH
|
||||
def batch(dbDtos: Vector[DbDto]): DB_BATCH
|
||||
|
||||
/** Using a JDBC connection, a batch will be inserted into the database.
|
||||
* No significant CPU load, mostly blocking JDBC communication with the database backend.
|
||||
|
@ -13,17 +13,17 @@ import com.daml.platform.store.appendonlydao.JdbcLedgerDao
|
||||
import com.daml.platform.store.appendonlydao.events._
|
||||
import com.daml.platform.store.dao.DeduplicationKeyMaker
|
||||
|
||||
object UpdateToDBDTOV1 {
|
||||
object UpdateToDbDto {
|
||||
|
||||
def apply(
|
||||
participantId: ParticipantId,
|
||||
translation: LfValueTranslation,
|
||||
compressionStrategy: CompressionStrategy,
|
||||
): Offset => Update => Iterator[DBDTOV1] = { offset =>
|
||||
): Offset => Update => Iterator[DbDto] = { offset =>
|
||||
{
|
||||
case u: Update.CommandRejected =>
|
||||
Iterator(
|
||||
DBDTOV1.CommandCompletion(
|
||||
DbDto.CommandCompletion(
|
||||
completion_offset = offset.toHexString,
|
||||
record_time = u.recordTime.toInstant,
|
||||
application_id = u.submitterInfo.applicationId,
|
||||
@ -33,7 +33,7 @@ object UpdateToDBDTOV1 {
|
||||
status_code = Some(u.reason.code.value()),
|
||||
status_message = Some(u.reason.description),
|
||||
),
|
||||
DBDTOV1.CommandDeduplication(
|
||||
DbDto.CommandDeduplication(
|
||||
DeduplicationKeyMaker.make(
|
||||
domain.CommandId(u.submitterInfo.commandId),
|
||||
u.submitterInfo.actAs,
|
||||
@ -43,7 +43,7 @@ object UpdateToDBDTOV1 {
|
||||
|
||||
case u: Update.ConfigurationChanged =>
|
||||
Iterator(
|
||||
DBDTOV1.ConfigurationEntry(
|
||||
DbDto.ConfigurationEntry(
|
||||
ledger_offset = offset.toHexString,
|
||||
recorded_at = u.recordTime.toInstant,
|
||||
submission_id = u.submissionId,
|
||||
@ -55,7 +55,7 @@ object UpdateToDBDTOV1 {
|
||||
|
||||
case u: Update.ConfigurationChangeRejected =>
|
||||
Iterator(
|
||||
DBDTOV1.ConfigurationEntry(
|
||||
DbDto.ConfigurationEntry(
|
||||
ledger_offset = offset.toHexString,
|
||||
recorded_at = u.recordTime.toInstant,
|
||||
submission_id = u.submissionId,
|
||||
@ -67,7 +67,7 @@ object UpdateToDBDTOV1 {
|
||||
|
||||
case u: Update.PartyAddedToParticipant =>
|
||||
Iterator(
|
||||
DBDTOV1.PartyEntry(
|
||||
DbDto.PartyEntry(
|
||||
ledger_offset = offset.toHexString,
|
||||
recorded_at = u.recordTime.toInstant,
|
||||
submission_id = u.submissionId,
|
||||
@ -77,7 +77,7 @@ object UpdateToDBDTOV1 {
|
||||
rejection_reason = None,
|
||||
is_local = Some(u.participantId == participantId),
|
||||
),
|
||||
DBDTOV1.Party(
|
||||
DbDto.Party(
|
||||
party = u.party,
|
||||
display_name = Some(u.displayName),
|
||||
explicit = true,
|
||||
@ -88,7 +88,7 @@ object UpdateToDBDTOV1 {
|
||||
|
||||
case u: Update.PartyAllocationRejected =>
|
||||
Iterator(
|
||||
DBDTOV1.PartyEntry(
|
||||
DbDto.PartyEntry(
|
||||
ledger_offset = offset.toHexString,
|
||||
recorded_at = u.recordTime.toInstant,
|
||||
submission_id = Some(u.submissionId),
|
||||
@ -103,7 +103,7 @@ object UpdateToDBDTOV1 {
|
||||
case u: Update.PublicPackageUpload =>
|
||||
val uploadId = u.submissionId.getOrElse(UUID.randomUUID().toString)
|
||||
val packages = u.archives.iterator.map { archive =>
|
||||
DBDTOV1.Package(
|
||||
DbDto.Package(
|
||||
package_id = archive.getHash,
|
||||
upload_id = uploadId,
|
||||
source_description = u.sourceDescription,
|
||||
@ -114,7 +114,7 @@ object UpdateToDBDTOV1 {
|
||||
)
|
||||
}
|
||||
val packageEntries = u.submissionId.iterator.map(submissionId =>
|
||||
DBDTOV1.PackageEntry(
|
||||
DbDto.PackageEntry(
|
||||
ledger_offset = offset.toHexString,
|
||||
recorded_at = u.recordTime.toInstant,
|
||||
submission_id = Some(submissionId),
|
||||
@ -126,7 +126,7 @@ object UpdateToDBDTOV1 {
|
||||
|
||||
case u: Update.PublicPackageUploadRejected =>
|
||||
Iterator(
|
||||
DBDTOV1.PackageEntry(
|
||||
DbDto.PackageEntry(
|
||||
ledger_offset = offset.toHexString,
|
||||
recorded_at = u.recordTime.toInstant,
|
||||
submission_id = Some(u.submissionId),
|
||||
@ -152,12 +152,12 @@ object UpdateToDBDTOV1 {
|
||||
)
|
||||
.reverse
|
||||
|
||||
val events: Iterator[DBDTOV1] = preorderTraversal.iterator
|
||||
val events: Iterator[DbDto] = preorderTraversal.iterator
|
||||
.collect { // It is okay to collect: blinding info is already there, we are free at hand to filter out the fetch and lookup nodes here already
|
||||
case (nodeId, create: Create) =>
|
||||
val eventId = EventId(u.transactionId, nodeId)
|
||||
val (createArgument, createKeyValue) = translation.serialize(eventId, create)
|
||||
DBDTOV1.EventCreate(
|
||||
DbDto.EventCreate(
|
||||
event_offset = Some(offset.toHexString),
|
||||
transaction_id = Some(u.transactionId),
|
||||
ledger_effective_time = Some(u.transactionMeta.ledgerEffectiveTime.toInstant),
|
||||
@ -195,7 +195,7 @@ object UpdateToDBDTOV1 {
|
||||
val eventId = EventId(u.transactionId, nodeId)
|
||||
val (exerciseArgument, exerciseResult, createKeyValue) =
|
||||
translation.serialize(eventId, exercise)
|
||||
DBDTOV1.EventExercise(
|
||||
DbDto.EventExercise(
|
||||
consuming = exercise.consuming,
|
||||
event_offset = Some(offset.toHexString),
|
||||
transaction_id = Some(u.transactionId),
|
||||
@ -239,7 +239,7 @@ object UpdateToDBDTOV1 {
|
||||
// only store divulgence events, which are divulging to parties
|
||||
case (contractId, visibleToParties) if visibleToParties.nonEmpty =>
|
||||
val contractInst = divulgedContractIndex.get(contractId).map(_.contractInst)
|
||||
DBDTOV1.EventDivulgence(
|
||||
DbDto.EventDivulgence(
|
||||
event_offset = Some(offset.toHexString),
|
||||
command_id = u.optSubmitterInfo.map(_.commandId),
|
||||
workflow_id = u.transactionMeta.workflowId,
|
||||
@ -258,7 +258,7 @@ object UpdateToDBDTOV1 {
|
||||
}
|
||||
|
||||
val completions = u.optSubmitterInfo.iterator.map { submitterInfo =>
|
||||
DBDTOV1.CommandCompletion(
|
||||
DbDto.CommandCompletion(
|
||||
completion_offset = offset.toHexString,
|
||||
record_time = u.recordTime.toInstant,
|
||||
application_id = submitterInfo.applicationId,
|
@ -3,10 +3,10 @@
|
||||
|
||||
package com.daml.platform.store.backend.postgresql
|
||||
|
||||
import com.daml.platform.store.backend.DBDTOV1
|
||||
import com.daml.platform.store.backend.DbDto
|
||||
|
||||
private[postgresql] object PGSchema {
|
||||
val eventsDivulgence: PGTable[DBDTOV1.EventDivulgence] = PGTable("participant_events_divulgence")(
|
||||
val eventsDivulgence: PGTable[DbDto.EventDivulgence] = PGTable("participant_events_divulgence")(
|
||||
"event_offset" -> PGString(_.event_offset.orNull),
|
||||
"command_id" -> PGString(_.command_id.orNull),
|
||||
"workflow_id" -> PGString(_.workflow_id.orNull),
|
||||
@ -20,7 +20,7 @@ private[postgresql] object PGSchema {
|
||||
"create_argument_compression" -> PGSmallintOptional(_.create_argument_compression),
|
||||
)
|
||||
|
||||
val eventsCreate: PGTable[DBDTOV1.EventCreate] = PGTable("participant_events_create")(
|
||||
val eventsCreate: PGTable[DbDto.EventCreate] = PGTable("participant_events_create")(
|
||||
"event_offset" -> PGString(_.event_offset.orNull),
|
||||
"transaction_id" -> PGString(_.transaction_id.orNull),
|
||||
"ledger_effective_time" -> PGTimestamp(_.ledger_effective_time.orNull),
|
||||
@ -45,8 +45,8 @@ private[postgresql] object PGSchema {
|
||||
"create_key_value_compression" -> PGSmallintOptional(_.create_key_value_compression),
|
||||
)
|
||||
|
||||
val exerciseFields: Vector[(String, PGField[DBDTOV1.EventExercise, _, _])] =
|
||||
Vector[(String, PGField[DBDTOV1.EventExercise, _, _])](
|
||||
val exerciseFields: Vector[(String, PGField[DbDto.EventExercise, _, _])] =
|
||||
Vector[(String, PGField[DbDto.EventExercise, _, _])](
|
||||
"event_id" -> PGString(_.event_id.orNull),
|
||||
"event_offset" -> PGString(_.event_offset.orNull),
|
||||
"contract_id" -> PGString(_.contract_id),
|
||||
@ -72,13 +72,13 @@ private[postgresql] object PGSchema {
|
||||
"exercise_result_compression" -> PGSmallintOptional(_.exercise_result_compression),
|
||||
)
|
||||
|
||||
val eventsConsumingExercise: PGTable[DBDTOV1.EventExercise] =
|
||||
val eventsConsumingExercise: PGTable[DbDto.EventExercise] =
|
||||
PGTable(tableName = "participant_events_consuming_exercise", exerciseFields)
|
||||
|
||||
val eventsNonConsumingExercise: PGTable[DBDTOV1.EventExercise] =
|
||||
val eventsNonConsumingExercise: PGTable[DbDto.EventExercise] =
|
||||
PGTable(tableName = "participant_events_non_consuming_exercise", exerciseFields)
|
||||
|
||||
val configurationEntries: PGTable[DBDTOV1.ConfigurationEntry] = PGTable("configuration_entries")(
|
||||
val configurationEntries: PGTable[DbDto.ConfigurationEntry] = PGTable("configuration_entries")(
|
||||
"ledger_offset" -> PGString(_.ledger_offset),
|
||||
"recorded_at" -> PGTimestamp(_.recorded_at),
|
||||
"submission_id" -> PGString(_.submission_id),
|
||||
@ -87,7 +87,7 @@ private[postgresql] object PGSchema {
|
||||
"rejection_reason" -> PGString(_.rejection_reason.orNull),
|
||||
)
|
||||
|
||||
val packageEntries: PGTable[DBDTOV1.PackageEntry] = PGTable("package_entries")(
|
||||
val packageEntries: PGTable[DbDto.PackageEntry] = PGTable("package_entries")(
|
||||
"ledger_offset" -> PGString(_.ledger_offset),
|
||||
"recorded_at" -> PGTimestamp(_.recorded_at),
|
||||
"submission_id" -> PGString(_.submission_id.orNull),
|
||||
@ -95,10 +95,10 @@ private[postgresql] object PGSchema {
|
||||
"rejection_reason" -> PGString(_.rejection_reason.orNull),
|
||||
)
|
||||
|
||||
val packages: PGTable[DBDTOV1.Package] = PGTable(
|
||||
val packages: PGTable[DbDto.Package] = PGTable(
|
||||
tableName = "packages",
|
||||
insertSuffix = "on conflict (package_id) do nothing",
|
||||
fields = Vector[(String, PGField[DBDTOV1.Package, _, _])](
|
||||
fields = Vector[(String, PGField[DbDto.Package, _, _])](
|
||||
"package_id" -> PGString(_.package_id),
|
||||
"upload_id" -> PGString(_.upload_id),
|
||||
"source_description" -> PGString(_.source_description.orNull),
|
||||
@ -109,7 +109,7 @@ private[postgresql] object PGSchema {
|
||||
),
|
||||
)
|
||||
|
||||
val partyEntries: PGTable[DBDTOV1.PartyEntry] = PGTable("party_entries")(
|
||||
val partyEntries: PGTable[DbDto.PartyEntry] = PGTable("party_entries")(
|
||||
"ledger_offset" -> PGString(_.ledger_offset),
|
||||
"recorded_at" -> PGTimestamp(_.recorded_at),
|
||||
"submission_id" -> PGString(_.submission_id.orNull),
|
||||
@ -120,7 +120,7 @@ private[postgresql] object PGSchema {
|
||||
"is_local" -> PGBooleanOptional(_.is_local),
|
||||
)
|
||||
|
||||
val parties: PGTable[DBDTOV1.Party] = PGTable("parties")(
|
||||
val parties: PGTable[DbDto.Party] = PGTable("parties")(
|
||||
"party" -> PGString(_.party),
|
||||
"display_name" -> PGString(_.display_name.orNull),
|
||||
"explicit" -> PGBoolean(_.explicit),
|
||||
@ -128,7 +128,7 @@ private[postgresql] object PGSchema {
|
||||
"is_local" -> PGBoolean(_.is_local),
|
||||
)
|
||||
|
||||
val commandCompletions: PGTable[DBDTOV1.CommandCompletion] =
|
||||
val commandCompletions: PGTable[DbDto.CommandCompletion] =
|
||||
PGTable("participant_command_completions")(
|
||||
"completion_offset" -> PGString(_.completion_offset),
|
||||
"record_time" -> PGTimestamp(_.record_time),
|
||||
|
@ -3,7 +3,7 @@
|
||||
|
||||
package com.daml.platform.store.backend.postgresql
|
||||
|
||||
import com.daml.platform.store.backend.DBDTOV1
|
||||
import com.daml.platform.store.backend.DbDto
|
||||
import com.daml.scalautil.NeverEqualsOverride
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
@ -23,11 +23,11 @@ private[postgresql] case class PostgresDbBatch(
|
||||
) extends NeverEqualsOverride
|
||||
|
||||
private[postgresql] object PostgresDbBatch {
|
||||
def apply(dbDtos: Vector[DBDTOV1]): PostgresDbBatch = {
|
||||
def collectWithFilter[T <: DBDTOV1: ClassTag](filter: T => Boolean): Vector[T] =
|
||||
def apply(dbDtos: Vector[DbDto]): PostgresDbBatch = {
|
||||
def collectWithFilter[T <: DbDto: ClassTag](filter: T => Boolean): Vector[T] =
|
||||
dbDtos.collect { case dbDto: T if filter(dbDto) => dbDto }
|
||||
def collect[T <: DBDTOV1: ClassTag]: Vector[T] = collectWithFilter[T](_ => true)
|
||||
import DBDTOV1._
|
||||
def collect[T <: DbDto: ClassTag]: Vector[T] = collectWithFilter[T](_ => true)
|
||||
import DbDto._
|
||||
import PGSchema._
|
||||
PostgresDbBatch(
|
||||
eventsBatchDivulgence = eventsDivulgence.prepareData(collect[EventDivulgence]),
|
||||
|
@ -7,7 +7,7 @@ import java.sql.{Connection, PreparedStatement, ResultSet}
|
||||
|
||||
import com.daml.ledger.participant.state.v1.Offset
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.platform.store.backend.{DBDTOV1, StorageBackend}
|
||||
import com.daml.platform.store.backend.{DbDto, StorageBackend}
|
||||
|
||||
import scala.collection.mutable
|
||||
|
||||
@ -179,5 +179,5 @@ private[backend] object PostgresStorageBackend extends StorageBackend[PostgresDb
|
||||
buffer.toVector
|
||||
}
|
||||
|
||||
override def batch(dbDtos: Vector[DBDTOV1]): PostgresDbBatch = PostgresDbBatch(dbDtos)
|
||||
override def batch(dbDtos: Vector[DbDto]): PostgresDbBatch = PostgresDbBatch(dbDtos)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user