Split CommonStorageBackend (#10871)

changelog_begin
changelog_end
This commit is contained in:
Robert Autenrieth 2021-09-20 15:22:49 +02:00 committed by GitHub
parent dc71a6aea8
commit 02c8a9dfb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 834 additions and 744 deletions

View File

@ -47,8 +47,11 @@ private[platform] case class InitializeParallelIngestion(
)
)
)
ledgerEnd <- dbDispatcher.executeSql(metrics.daml.parallelIndexer.initialization)(
storageBackend.initializeIngestion
ledgerEnd <- dbDispatcher.executeSql(metrics.daml.index.db.getLedgerEnd)(
storageBackend.ledgerEnd
)
_ <- dbDispatcher.executeSql(metrics.daml.parallelIndexer.initialization)(
storageBackend.deletePartiallyIngestedData(ledgerEnd)
)
} yield InitializeParallelIngestion.Initialized(
initialEventSeqId = ledgerEnd.map(_.lastEventSeqId).getOrElse(EventSequentialId.beforeBegin),

View File

@ -84,14 +84,15 @@ trait IngestionStorageBackend[DB_BATCH] {
*/
def insertBatch(connection: Connection, batch: DB_BATCH): Unit
/** Custom initialization code before the start of an ingestion.
* This method is responsible for the recovery after a possibly non-graceful stop of previous indexing.
/** Deletes all partially ingested data, written during a non-graceful stop of previous indexing.
* No significant CPU load, mostly blocking JDBC communication with the database backend.
*
* @param connection to be used when initializing
* @return the LedgerEnd, which should be the basis for further indexing.
* @param ledgerEnd the current ledger end, or None if no ledger end exists
* @param connection to be used when inserting the batch
*/
def initializeIngestion(connection: Connection): Option[ParameterStorageBackend.LedgerEnd]
def deletePartiallyIngestedData(ledgerEnd: Option[ParameterStorageBackend.LedgerEnd])(
connection: Connection
): Unit
}
trait ParameterStorageBackend {

View File

@ -1,711 +0,0 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend.common
import java.sql.Connection
import java.time.Instant
import anorm.SqlParser.{array, binaryStream, byteArray, flatten, get, int, long, str}
import anorm.{Macro, Row, RowParser, SQL, SimpleSql, SqlParser, SqlQuery, ~}
import com.daml.ledger.api.domain.{LedgerId, ParticipantId}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.platform.store.Conversions.{
contractId,
eventId,
identifier,
instantFromMicros,
ledgerString,
offset,
}
import com.daml.lf.data.Ref.PackageId
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.common.MismatchException
import com.daml.platform.store.Conversions
import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf
import com.daml.platform.store.appendonlydao.JdbcLedgerDao.{acceptType, rejectType}
import com.daml.platform.store.backend.{ParameterStorageBackend, StorageBackend}
import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry}
import com.daml.scalautil.Statement.discard
import scalaz.syntax.tag._
private[backend] trait CommonStorageBackend[DB_BATCH] extends StorageBackend[DB_BATCH] {
private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
// Ingestion
private val SQL_DELETE_OVERSPILL_ENTRIES: List[SqlQuery] =
List(
SQL("DELETE FROM configuration_entries WHERE ledger_offset > {ledger_offset}"),
SQL("DELETE FROM package_entries WHERE ledger_offset > {ledger_offset}"),
SQL("DELETE FROM packages WHERE ledger_offset > {ledger_offset}"),
SQL("DELETE FROM participant_command_completions WHERE completion_offset > {ledger_offset}"),
SQL("DELETE FROM participant_events_divulgence WHERE event_offset > {ledger_offset}"),
SQL("DELETE FROM participant_events_create WHERE event_offset > {ledger_offset}"),
SQL("DELETE FROM participant_events_consuming_exercise WHERE event_offset > {ledger_offset}"),
SQL(
"DELETE FROM participant_events_non_consuming_exercise WHERE event_offset > {ledger_offset}"
),
SQL("DELETE FROM party_entries WHERE ledger_offset > {ledger_offset}"),
)
def queryStrategy: QueryStrategy
override def initializeIngestion(
connection: Connection
): Option[ParameterStorageBackend.LedgerEnd] = {
val result = ledgerEnd(connection)
result.foreach { existingLedgerEnd =>
SQL_DELETE_OVERSPILL_ENTRIES.foreach { query =>
import com.daml.platform.store.Conversions.OffsetToStatement
query
.on("ledger_offset" -> existingLedgerEnd.lastOffset)
.execute()(connection)
()
}
}
result
}
// Parameters
private val SQL_UPDATE_LEDGER_END = SQL(
"""
|UPDATE
| parameters
|SET
| ledger_end = {ledger_end},
| ledger_end_sequential_id = {ledger_end_sequential_id}
|""".stripMargin
)
override def updateLedgerEnd(
ledgerEnd: ParameterStorageBackend.LedgerEnd
)(connection: Connection): Unit = {
import com.daml.platform.store.Conversions.OffsetToStatement
SQL_UPDATE_LEDGER_END
.on("ledger_end" -> ledgerEnd.lastOffset)
.on("ledger_end_sequential_id" -> ledgerEnd.lastEventSeqId)
.execute()(connection)
()
}
private val SQL_GET_LEDGER_END = SQL(
"""
|SELECT
| ledger_end,
| ledger_end_sequential_id
|FROM
| parameters
|
|""".stripMargin
)
override def ledgerEnd(connection: Connection): Option[ParameterStorageBackend.LedgerEnd] =
SQL_GET_LEDGER_END.as(LedgerEndParser.singleOpt)(connection).flatten
private val TableName: String = "parameters"
private val LedgerIdColumnName: String = "ledger_id"
private val ParticipantIdColumnName: String = "participant_id"
private val LedgerEndColumnName: String = "ledger_end"
private val LedgerEndSequentialIdColumnName: String = "ledger_end_sequential_id"
private val LedgerIdParser: RowParser[LedgerId] =
ledgerString(LedgerIdColumnName).map(LedgerId(_))
private val ParticipantIdParser: RowParser[ParticipantId] =
Conversions.participantId(ParticipantIdColumnName).map(ParticipantId(_))
private val LedgerEndOffsetParser: RowParser[Option[Offset]] =
offset(LedgerEndColumnName).?
private val LedgerEndSequentialIdParser: RowParser[Option[Long]] =
long(LedgerEndSequentialIdColumnName).?
private val LedgerIdentityParser: RowParser[ParameterStorageBackend.IdentityParams] =
LedgerIdParser ~ ParticipantIdParser map { case ledgerId ~ participantId =>
ParameterStorageBackend.IdentityParams(ledgerId, participantId)
}
private val LedgerEndParser: RowParser[Option[ParameterStorageBackend.LedgerEnd]] =
LedgerEndOffsetParser ~ LedgerEndSequentialIdParser map {
case Some(lastOffset) ~ Some(lastEventSequentialId) =>
Some(ParameterStorageBackend.LedgerEnd(lastOffset, lastEventSequentialId))
case _ =>
// Note: offset and event sequential id are always written together.
// Cases where only one of them is defined are not handled here.
None
}
override def initializeParameters(
params: ParameterStorageBackend.IdentityParams
)(connection: Connection)(implicit loggingContext: LoggingContext): Unit = {
// Note: this method is the only one that inserts a row into the parameters table
val previous = ledgerIdentity(connection)
val ledgerId = params.ledgerId
val participantId = params.participantId
previous match {
case None =>
logger.info(
s"Initializing new database for ledgerId '${params.ledgerId}' and participantId '${params.participantId}'"
)
discard(
SQL"insert into #$TableName(#$LedgerIdColumnName, #$ParticipantIdColumnName) values(${ledgerId.unwrap}, ${participantId.unwrap: String})"
.execute()(connection)
)
case Some(ParameterStorageBackend.IdentityParams(`ledgerId`, `participantId`)) =>
logger.info(
s"Found existing database for ledgerId '${params.ledgerId}' and participantId '${params.participantId}'"
)
case Some(ParameterStorageBackend.IdentityParams(existing, _))
if existing != params.ledgerId =>
logger.error(
s"Found existing database with mismatching ledgerId: existing '$existing', provided '${params.ledgerId}'"
)
throw MismatchException.LedgerId(
existing = existing,
provided = params.ledgerId,
)
case Some(ParameterStorageBackend.IdentityParams(_, existing)) =>
logger.error(
s"Found existing database with mismatching participantId: existing '$existing', provided '${params.participantId}'"
)
throw MismatchException.ParticipantId(
existing = existing,
provided = params.participantId,
)
}
}
override def ledgerIdentity(
connection: Connection
): Option[ParameterStorageBackend.IdentityParams] =
SQL"select #$LedgerIdColumnName, #$ParticipantIdColumnName from #$TableName"
.as(LedgerIdentityParser.singleOpt)(connection)
private val SQL_UPDATE_MOST_RECENT_PRUNING =
SQL("""
|update parameters set participant_pruned_up_to_inclusive={pruned_up_to_inclusive}
|where participant_pruned_up_to_inclusive < {pruned_up_to_inclusive} or participant_pruned_up_to_inclusive is null
|""".stripMargin)
private val SQL_UPDATE_MOST_RECENT_PRUNING_INCLUDING_ALL_DIVULGED_CONTRACTS =
SQL("""
|update parameters set participant_all_divulged_contracts_pruned_up_to_inclusive={prune_all_divulged_contracts_up_to_inclusive}
|where participant_pruned_up_to_inclusive < {prune_all_divulged_contracts_up_to_inclusive} or participant_all_divulged_contracts_pruned_up_to_inclusive is null
|""".stripMargin)
def updatePrunedUptoInclusive(prunedUpToInclusive: Offset)(connection: Connection): Unit = {
import com.daml.platform.store.Conversions.OffsetToStatement
SQL_UPDATE_MOST_RECENT_PRUNING
.on("pruned_up_to_inclusive" -> prunedUpToInclusive)
.execute()(connection)
()
}
def updatePrunedAllDivulgedContractsUpToInclusive(
prunedUpToInclusive: Offset
)(connection: Connection): Unit = {
import com.daml.platform.store.Conversions.OffsetToStatement
SQL_UPDATE_MOST_RECENT_PRUNING_INCLUDING_ALL_DIVULGED_CONTRACTS
.on("prune_all_divulged_contracts_up_to_inclusive" -> prunedUpToInclusive)
.execute()(connection)
()
}
private val SQL_SELECT_MOST_RECENT_PRUNING = SQL(
"select participant_pruned_up_to_inclusive from parameters"
)
def prunedUptoInclusive(connection: Connection): Option[Offset] =
SQL_SELECT_MOST_RECENT_PRUNING
.as(offset("participant_pruned_up_to_inclusive").?.single)(connection)
// Configurations
private val SQL_GET_CONFIGURATION_ENTRIES = SQL(
"""select
| configuration_entries.ledger_offset,
| configuration_entries.recorded_at,
| configuration_entries.submission_id,
| configuration_entries.typ,
| configuration_entries.configuration,
| configuration_entries.rejection_reason
| from
| configuration_entries,
| parameters
| where
| ({startExclusive} is null or ledger_offset>{startExclusive}) and
| ledger_offset <= {endInclusive} and
| parameters.ledger_end >= ledger_offset
| order by ledger_offset asc
| offset {queryOffset} rows
| fetch next {pageSize} rows only
| """.stripMargin
)
private val SQL_GET_LATEST_CONFIGURATION_ENTRY = SQL(
s"""select
| configuration_entries.ledger_offset,
| configuration_entries.recorded_at,
| configuration_entries.submission_id,
| configuration_entries.typ,
| configuration_entries.configuration,
| configuration_entries.rejection_reason
| from
| configuration_entries,
| parameters
| where
| configuration_entries.typ = '$acceptType' and
| parameters.ledger_end >= ledger_offset
| order by ledger_offset desc
| fetch next 1 row only""".stripMargin
)
private val configurationEntryParser: RowParser[(Offset, ConfigurationEntry)] =
(offset("ledger_offset") ~
str("typ") ~
str("submission_id") ~
str("rejection_reason").map(s => if (s.isEmpty) null else s).? ~
byteArray("configuration"))
.map(flatten)
.map { case (offset, typ, submissionId, rejectionReason, configBytes) =>
val config = Configuration
.decode(configBytes)
.fold(err => sys.error(s"Failed to decode configuration: $err"), identity)
offset ->
(typ match {
case `acceptType` =>
ConfigurationEntry.Accepted(
submissionId = submissionId,
configuration = config,
)
case `rejectType` =>
ConfigurationEntry.Rejected(
submissionId = submissionId,
rejectionReason = rejectionReason.getOrElse("<missing reason>"),
proposedConfiguration = config,
)
case _ =>
sys.error(s"getConfigurationEntries: Unknown configuration entry type: $typ")
})
}
def ledgerConfiguration(connection: Connection): Option[(Offset, Configuration)] =
SQL_GET_LATEST_CONFIGURATION_ENTRY
.on()
.asVectorOf(configurationEntryParser)(connection)
.collectFirst { case (offset, ConfigurationEntry.Accepted(_, configuration)) =>
offset -> configuration
}
def configurationEntries(
startExclusive: Offset,
endInclusive: Offset,
pageSize: Int,
queryOffset: Long,
)(connection: Connection): Vector[(Offset, ConfigurationEntry)] = {
import com.daml.platform.store.Conversions.OffsetToStatement
SQL_GET_CONFIGURATION_ENTRIES
.on(
"startExclusive" -> startExclusive,
"endInclusive" -> endInclusive,
"pageSize" -> pageSize,
"queryOffset" -> queryOffset,
)
.asVectorOf(configurationEntryParser)(connection)
}
// Packages
private val SQL_SELECT_PACKAGES =
SQL(
"""select packages.package_id, packages.source_description, packages.known_since, packages.package_size
|from packages, parameters
|where packages.ledger_offset <= parameters.ledger_end
|""".stripMargin
)
private case class ParsedPackageData(
packageId: String,
sourceDescription: Option[String],
size: Long,
knownSince: Long,
)
private val PackageDataParser: RowParser[ParsedPackageData] =
Macro.parser[ParsedPackageData](
"package_id",
"source_description",
"package_size",
"known_since",
)
def lfPackages(connection: Connection): Map[PackageId, PackageDetails] =
SQL_SELECT_PACKAGES
.as(PackageDataParser.*)(connection)
.map(d =>
PackageId.assertFromString(d.packageId) -> PackageDetails(
d.size,
instantFromMicros(d.knownSince),
d.sourceDescription,
)
)
.toMap
private val SQL_SELECT_PACKAGE =
SQL("""select packages.package
|from packages, parameters
|where package_id = {package_id}
|and packages.ledger_offset <= parameters.ledger_end
|""".stripMargin)
def lfArchive(packageId: PackageId)(connection: Connection): Option[Array[Byte]] = {
import com.daml.platform.store.Conversions.packageIdToStatement
SQL_SELECT_PACKAGE
.on(
"package_id" -> packageId
)
.as[Option[Array[Byte]]](SqlParser.byteArray("package").singleOpt)(connection)
}
private val SQL_GET_PACKAGE_ENTRIES = SQL(
"""select * from package_entries
|where ({startExclusive} is null or ledger_offset>{startExclusive})
|and ledger_offset<={endInclusive}
|order by ledger_offset asc
|offset {queryOffset} rows
|fetch next {pageSize} rows only""".stripMargin
)
private val packageEntryParser: RowParser[(Offset, PackageLedgerEntry)] =
(offset("ledger_offset") ~
instantFromMicros("recorded_at") ~
ledgerString("submission_id").? ~
str("typ") ~
str("rejection_reason").?)
.map(flatten)
.map {
case (offset, recordTime, Some(submissionId), `acceptType`, None) =>
offset ->
PackageLedgerEntry.PackageUploadAccepted(submissionId, recordTime)
case (offset, recordTime, Some(submissionId), `rejectType`, Some(reason)) =>
offset ->
PackageLedgerEntry.PackageUploadRejected(submissionId, recordTime, reason)
case invalidRow =>
sys.error(s"packageEntryParser: invalid party entry row: $invalidRow")
}
def packageEntries(
startExclusive: Offset,
endInclusive: Offset,
pageSize: Int,
queryOffset: Long,
)(connection: Connection): Vector[(Offset, PackageLedgerEntry)] = {
import com.daml.platform.store.Conversions.OffsetToStatement
SQL_GET_PACKAGE_ENTRIES
.on(
"startExclusive" -> startExclusive,
"endInclusive" -> endInclusive,
"pageSize" -> pageSize,
"queryOffset" -> queryOffset,
)
.asVectorOf(packageEntryParser)(connection)
}
// Deduplication
private val SQL_SELECT_COMMAND = SQL("""
|select deduplicate_until
|from participant_command_submissions
|where deduplication_key = {deduplicationKey}
""".stripMargin)
private case class ParsedCommandData(deduplicateUntil: Instant)
private val CommandDataParser: RowParser[ParsedCommandData] =
instantFromMicros("deduplicate_until")
.map(ParsedCommandData)
def deduplicatedUntil(deduplicationKey: String)(connection: Connection): Instant =
SQL_SELECT_COMMAND
.on("deduplicationKey" -> deduplicationKey)
.as(CommandDataParser.single)(connection)
.deduplicateUntil
private val SQL_DELETE_EXPIRED_COMMANDS = SQL("""
|delete from participant_command_submissions
|where deduplicate_until < {currentTime}
""".stripMargin)
def removeExpiredDeduplicationData(currentTime: Instant)(connection: Connection): Unit = {
SQL_DELETE_EXPIRED_COMMANDS
.on("currentTime" -> Timestamp.instantToMicros(currentTime))
.execute()(connection)
()
}
private val SQL_DELETE_COMMAND = SQL("""
|delete from participant_command_submissions
|where deduplication_key = {deduplicationKey}
""".stripMargin)
def stopDeduplicatingCommand(deduplicationKey: String)(connection: Connection): Unit = {
SQL_DELETE_COMMAND
.on("deduplicationKey" -> deduplicationKey)
.execute()(connection)
()
}
// Completions
def pruneCompletions(
pruneUpToInclusive: Offset
)(connection: Connection, loggingContext: LoggingContext): Unit = {
pruneWithLogging(queryDescription = "Command completions pruning") {
import com.daml.platform.store.Conversions.OffsetToStatement
SQL"delete from participant_command_completions where completion_offset <= $pruneUpToInclusive"
}(connection, loggingContext)
}
// Events
// TODO Cleanup: Extract to [[EventStorageBackendTemplate]]
def pruneEvents(
pruneUpToInclusive: Offset,
pruneAllDivulgedContracts: Boolean,
)(connection: Connection, loggingContext: LoggingContext): Unit = {
import com.daml.platform.store.Conversions.OffsetToStatement
if (pruneAllDivulgedContracts) {
pruneWithLogging(queryDescription = "All retroactive divulgence events pruning") {
SQL"""
-- Retroactive divulgence events
delete from participant_events_divulgence delete_events
where delete_events.event_offset <= $pruneUpToInclusive
or delete_events.event_offset is null
"""
}(connection, loggingContext)
} else {
pruneWithLogging(queryDescription = "Archived retroactive divulgence events pruning") {
SQL"""
-- Retroactive divulgence events (only for contracts archived before the specified offset)
delete from participant_events_divulgence delete_events
where
delete_events.event_offset <= $pruneUpToInclusive
and exists (
select 1 from participant_events_consuming_exercise archive_events
where
archive_events.event_offset <= $pruneUpToInclusive and
archive_events.contract_id = delete_events.contract_id
)"""
}(connection, loggingContext)
}
pruneWithLogging(queryDescription = "Create events pruning") {
SQL"""
-- Create events (only for contracts archived before the specified offset)
delete from participant_events_create delete_events
where
delete_events.event_offset <= $pruneUpToInclusive and
exists (
SELECT 1 FROM participant_events_consuming_exercise archive_events
WHERE
archive_events.event_offset <= $pruneUpToInclusive AND
archive_events.contract_id = delete_events.contract_id
)"""
}(connection, loggingContext)
if (pruneAllDivulgedContracts) {
val pruneAfterClause = {
// We need to distinguish between the two cases since lexicographical comparison
// in Oracle doesn't work with '' (empty strings are treated as NULLs) as one of the operands
participantAllDivulgedContractsPrunedUpToInclusive(connection) match {
case Some(pruneAfter) => cSQL"and event_offset > $pruneAfter"
case None => cSQL""
}
}
pruneWithLogging(queryDescription = "Immediate divulgence events pruning") {
SQL"""
-- Immediate divulgence pruning
delete from participant_events_create c
where event_offset <= $pruneUpToInclusive
-- Only prune create events which did not have a locally hosted party before their creation offset
and not exists (
select 1
from party_entries p
where p.typ = 'accept'
and p.ledger_offset <= c.event_offset
and #${queryStrategy.isTrue("p.is_local")}
and #${queryStrategy.arrayContains("c.flat_event_witnesses", "p.party")}
)
$pruneAfterClause
"""
}(connection, loggingContext)
}
pruneWithLogging(queryDescription = "Exercise (consuming) events pruning") {
SQL"""
-- Exercise events (consuming)
delete from participant_events_consuming_exercise delete_events
where
delete_events.event_offset <= $pruneUpToInclusive"""
}(connection, loggingContext)
pruneWithLogging(queryDescription = "Exercise (non-consuming) events pruning") {
SQL"""
-- Exercise events (non-consuming)
delete from participant_events_non_consuming_exercise delete_events
where
delete_events.event_offset <= $pruneUpToInclusive"""
}(connection, loggingContext)
}
private def participantAllDivulgedContractsPrunedUpToInclusive(
connection: Connection
): Option[Offset] =
SQL"select participant_all_divulged_contracts_pruned_up_to_inclusive from parameters"
.as(offset("participant_all_divulged_contracts_pruned_up_to_inclusive").?.single)(
connection
)
private def pruneWithLogging(queryDescription: String)(query: SimpleSql[Row])(
connection: Connection,
loggingContext: LoggingContext,
): Unit = {
val deletedRows = query.executeUpdate()(connection)
logger.info(s"$queryDescription finished: deleted $deletedRows rows.")(loggingContext)
}
private val rawTransactionEventParser: RowParser[RawTransactionEvent] = {
import com.daml.platform.store.Conversions.ArrayColumnToStringArray.arrayColumnToStringArray
(int("event_kind") ~
str("transaction_id") ~
int("node_index") ~
str("command_id").? ~
str("workflow_id").? ~
eventId("event_id") ~
contractId("contract_id") ~
identifier("template_id").? ~
instantFromMicros("ledger_effective_time").? ~
array[String]("create_signatories").? ~
array[String]("create_observers").? ~
str("create_agreement_text").? ~
binaryStream("create_key_value").? ~
int("create_key_value_compression").? ~
binaryStream("create_argument").? ~
int("create_argument_compression").? ~
array[String]("tree_event_witnesses") ~
array[String]("flat_event_witnesses") ~
array[String]("submitters").? ~
str("exercise_choice").? ~
binaryStream("exercise_argument").? ~
int("exercise_argument_compression").? ~
binaryStream("exercise_result").? ~
int("exercise_result_compression").? ~
array[String]("exercise_actors").? ~
array[String]("exercise_child_event_ids").? ~
long("event_sequential_id") ~
offset("event_offset")).map {
case eventKind ~ transactionId ~ nodeIndex ~ commandId ~ workflowId ~ eventId ~ contractId ~ templateId ~ ledgerEffectiveTime ~ createSignatories ~
createObservers ~ createAgreementText ~ createKeyValue ~ createKeyCompression ~
createArgument ~ createArgumentCompression ~ treeEventWitnesses ~ flatEventWitnesses ~ submitters ~ exerciseChoice ~
exerciseArgument ~ exerciseArgumentCompression ~ exerciseResult ~ exerciseResultCompression ~ exerciseActors ~
exerciseChildEventIds ~ eventSequentialId ~ offset =>
RawTransactionEvent(
eventKind,
transactionId,
nodeIndex,
commandId,
workflowId,
eventId,
contractId,
templateId,
ledgerEffectiveTime,
createSignatories,
createObservers,
createAgreementText,
createKeyValue,
createKeyCompression,
createArgument,
createArgumentCompression,
treeEventWitnesses.toSet,
flatEventWitnesses.toSet,
submitters.map(_.toSet).getOrElse(Set.empty),
exerciseChoice,
exerciseArgument,
exerciseArgumentCompression,
exerciseResult,
exerciseResultCompression,
exerciseActors,
exerciseChildEventIds,
eventSequentialId,
offset,
)
}
}
def rawEvents(startExclusive: Long, endInclusive: Long)(
connection: Connection
): Vector[RawTransactionEvent] =
SQL"""
SELECT
event_kind,
transaction_id,
node_index,
command_id,
workflow_id,
event_id,
contract_id,
template_id,
ledger_effective_time,
create_signatories,
create_observers,
create_agreement_text,
create_key_value,
create_key_value_compression,
create_argument,
create_argument_compression,
tree_event_witnesses,
flat_event_witnesses,
submitters,
exercise_choice,
exercise_argument,
exercise_argument_compression,
exercise_result,
exercise_result_compression,
exercise_actors,
exercise_child_event_ids,
event_sequential_id,
event_offset
FROM
participant_events
WHERE
event_sequential_id > $startExclusive
and event_sequential_id <= $endInclusive
and event_kind != 0
ORDER BY event_sequential_id ASC"""
.asVectorOf(rawTransactionEventParser)(connection)
protected def exe(statement: String): Connection => Unit = { connection =>
val stmnt = connection.createStatement()
try {
stmnt.execute(statement)
()
} finally {
stmnt.close()
}
}
override def checkDatabaseAvailable(connection: Connection): Unit =
assert(SQL"SELECT 1".as(get[Int](1).single)(connection) == 1)
}

View File

@ -8,20 +8,24 @@ import java.sql.Connection
import java.time.Instant
import anorm.SqlParser.{binaryStream, int, long, str}
import anorm.{~, RowParser}
import anorm.{Row, RowParser, SimpleSql, ~}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.index.index.StatusDetails
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions.{instantFromMicros, offset}
import com.daml.platform.store.backend.CompletionStorageBackend
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.google.protobuf.any
import com.google.rpc.status.{Status => StatusProto}
trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
def queryStrategy: QueryStrategy
override def commandCompletions(
@ -150,4 +154,21 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
rejectionStatusDetails
.map(stream => StatusDetails.parseFrom(stream).details)
.getOrElse(Seq.empty)
def pruneCompletions(
pruneUpToInclusive: Offset
)(connection: Connection, loggingContext: LoggingContext): Unit = {
pruneWithLogging(queryDescription = "Command completions pruning") {
import com.daml.platform.store.Conversions.OffsetToStatement
SQL"delete from participant_command_completions where completion_offset <= $pruneUpToInclusive"
}(connection, loggingContext)
}
private def pruneWithLogging(queryDescription: String)(query: SimpleSql[Row])(
connection: Connection,
loggingContext: LoggingContext,
): Unit = {
val deletedRows = query.executeUpdate()(connection)
logger.info(s"$queryDescription finished: deleted $deletedRows rows.")(loggingContext)
}
}

View File

@ -0,0 +1,113 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend.common
import java.sql.Connection
import anorm.SqlParser.{byteArray, flatten, str}
import anorm.{RowParser, SQL}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.platform.store.Conversions.offset
import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf
import com.daml.platform.store.appendonlydao.JdbcLedgerDao.{acceptType, rejectType}
import com.daml.platform.store.backend.ConfigurationStorageBackend
import com.daml.platform.store.entries.ConfigurationEntry
private[backend] trait ConfigurationStorageBackendTemplate extends ConfigurationStorageBackend {
private val SQL_GET_CONFIGURATION_ENTRIES = SQL(
"""select
| configuration_entries.ledger_offset,
| configuration_entries.recorded_at,
| configuration_entries.submission_id,
| configuration_entries.typ,
| configuration_entries.configuration,
| configuration_entries.rejection_reason
| from
| configuration_entries,
| parameters
| where
| ({startExclusive} is null or ledger_offset>{startExclusive}) and
| ledger_offset <= {endInclusive} and
| parameters.ledger_end >= ledger_offset
| order by ledger_offset asc
| offset {queryOffset} rows
| fetch next {pageSize} rows only
| """.stripMargin
)
private val SQL_GET_LATEST_CONFIGURATION_ENTRY = SQL(
s"""select
| configuration_entries.ledger_offset,
| configuration_entries.recorded_at,
| configuration_entries.submission_id,
| configuration_entries.typ,
| configuration_entries.configuration,
| configuration_entries.rejection_reason
| from
| configuration_entries,
| parameters
| where
| configuration_entries.typ = '$acceptType' and
| parameters.ledger_end >= ledger_offset
| order by ledger_offset desc
| fetch next 1 row only""".stripMargin
)
private val configurationEntryParser: RowParser[(Offset, ConfigurationEntry)] =
(offset("ledger_offset") ~
str("typ") ~
str("submission_id") ~
str("rejection_reason").map(s => if (s.isEmpty) null else s).? ~
byteArray("configuration"))
.map(flatten)
.map { case (offset, typ, submissionId, rejectionReason, configBytes) =>
val config = Configuration
.decode(configBytes)
.fold(err => sys.error(s"Failed to decode configuration: $err"), identity)
offset ->
(typ match {
case `acceptType` =>
ConfigurationEntry.Accepted(
submissionId = submissionId,
configuration = config,
)
case `rejectType` =>
ConfigurationEntry.Rejected(
submissionId = submissionId,
rejectionReason = rejectionReason.getOrElse("<missing reason>"),
proposedConfiguration = config,
)
case _ =>
sys.error(s"getConfigurationEntries: Unknown configuration entry type: $typ")
})
}
def ledgerConfiguration(connection: Connection): Option[(Offset, Configuration)] =
SQL_GET_LATEST_CONFIGURATION_ENTRY
.on()
.asVectorOf(configurationEntryParser)(connection)
.collectFirst { case (offset, ConfigurationEntry.Accepted(_, configuration)) =>
offset -> configuration
}
def configurationEntries(
startExclusive: Offset,
endInclusive: Offset,
pageSize: Int,
queryOffset: Long,
)(connection: Connection): Vector[(Offset, ConfigurationEntry)] = {
import com.daml.platform.store.Conversions.OffsetToStatement
SQL_GET_CONFIGURATION_ENTRIES
.on(
"startExclusive" -> startExclusive,
"endInclusive" -> endInclusive,
"pageSize" -> pageSize,
"queryOffset" -> queryOffset,
)
.asVectorOf(configurationEntryParser)(connection)
}
}

View File

@ -0,0 +1,26 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend.common
import java.sql.Connection
import anorm.SqlParser.get
import com.daml.platform.store.backend.DataSourceStorageBackend
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
private[backend] trait DataSourceStorageBackendTemplate extends DataSourceStorageBackend {
protected def exe(statement: String): Connection => Unit = { connection =>
val stmnt = connection.createStatement()
try {
stmnt.execute(statement)
()
} finally {
stmnt.close()
}
}
override def checkDatabaseAvailable(connection: Connection): Unit =
assert(SQL"SELECT 1".as(get[Int](1).single)(connection) == 1)
}

View File

@ -0,0 +1,57 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend.common
import java.sql.Connection
import java.time.Instant
import anorm.{RowParser, SQL}
import com.daml.platform.store.Conversions.instantFromMicros
import com.daml.platform.store.backend.DeduplicationStorageBackend
private[backend] trait DeduplicationStorageBackendTemplate extends DeduplicationStorageBackend {
private val SQL_SELECT_COMMAND = SQL("""
|select deduplicate_until
|from participant_command_submissions
|where deduplication_key = {deduplicationKey}
""".stripMargin)
private case class ParsedCommandData(deduplicateUntil: Instant)
private val CommandDataParser: RowParser[ParsedCommandData] =
instantFromMicros("deduplicate_until")
.map(ParsedCommandData)
def deduplicatedUntil(deduplicationKey: String)(connection: Connection): Instant =
SQL_SELECT_COMMAND
.on("deduplicationKey" -> deduplicationKey)
.as(CommandDataParser.single)(connection)
.deduplicateUntil
private val SQL_DELETE_EXPIRED_COMMANDS = SQL("""
|delete from participant_command_submissions
|where deduplicate_until < {currentTime}
""".stripMargin)
def removeExpiredDeduplicationData(currentTime: Instant)(connection: Connection): Unit = {
SQL_DELETE_EXPIRED_COMMANDS
.on("currentTime" -> Timestamp.instantToMicros(currentTime))
.execute()(connection)
()
}
private val SQL_DELETE_COMMAND = SQL("""
|delete from participant_command_submissions
|where deduplication_key = {deduplicationKey}
""".stripMargin)
def stopDeduplicatingCommand(deduplicationKey: String)(connection: Connection): Unit = {
SQL_DELETE_COMMAND
.on("deduplicationKey" -> deduplicationKey)
.execute()(connection)
()
}
}

View File

@ -8,14 +8,22 @@ import java.sql.Connection
import java.time.Instant
import anorm.SqlParser.{array, binaryStream, bool, int, long, str}
import anorm.{RowParser, ~}
import anorm.{Row, RowParser, SimpleSql, ~}
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.platform.store.Conversions.{identifier, instantFromMicros, offset}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.store.Conversions.{
contractId,
eventId,
identifier,
instantFromMicros,
offset,
}
import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf
import com.daml.platform.store.appendonlydao.events.{EventsTable, Identifier, Raw}
import com.daml.platform.store.backend.EventStorageBackend
import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeParams}
import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
import scala.collection.compat.immutable.ArraySeq
@ -23,6 +31,8 @@ import scala.collection.compat.immutable.ArraySeq
trait EventStorageBackendTemplate extends EventStorageBackend {
import com.daml.platform.store.Conversions.ArrayColumnToStringArray.arrayColumnToStringArray
private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
def eventStrategy: EventStrategy
def queryStrategy: QueryStrategy
@ -392,6 +402,222 @@ trait EventStorageBackendTemplate extends EventStorageBackend {
)(connection)
}
override def pruneEvents(
pruneUpToInclusive: Offset,
pruneAllDivulgedContracts: Boolean,
)(connection: Connection, loggingContext: LoggingContext): Unit = {
import com.daml.platform.store.Conversions.OffsetToStatement
if (pruneAllDivulgedContracts) {
pruneWithLogging(queryDescription = "All retroactive divulgence events pruning") {
SQL"""
-- Retroactive divulgence events
delete from participant_events_divulgence delete_events
where delete_events.event_offset <= $pruneUpToInclusive
or delete_events.event_offset is null
"""
}(connection, loggingContext)
} else {
pruneWithLogging(queryDescription = "Archived retroactive divulgence events pruning") {
SQL"""
-- Retroactive divulgence events (only for contracts archived before the specified offset)
delete from participant_events_divulgence delete_events
where
delete_events.event_offset <= $pruneUpToInclusive
and exists (
select 1 from participant_events_consuming_exercise archive_events
where
archive_events.event_offset <= $pruneUpToInclusive and
archive_events.contract_id = delete_events.contract_id
)"""
}(connection, loggingContext)
}
pruneWithLogging(queryDescription = "Create events pruning") {
SQL"""
-- Create events (only for contracts archived before the specified offset)
delete from participant_events_create delete_events
where
delete_events.event_offset <= $pruneUpToInclusive and
exists (
SELECT 1 FROM participant_events_consuming_exercise archive_events
WHERE
archive_events.event_offset <= $pruneUpToInclusive AND
archive_events.contract_id = delete_events.contract_id
)"""
}(connection, loggingContext)
if (pruneAllDivulgedContracts) {
val pruneAfterClause = {
// We need to distinguish between the two cases since lexicographical comparison
// in Oracle doesn't work with '' (empty strings are treated as NULLs) as one of the operands
participantAllDivulgedContractsPrunedUpToInclusive(connection) match {
case Some(pruneAfter) => cSQL"and event_offset > $pruneAfter"
case None => cSQL""
}
}
pruneWithLogging(queryDescription = "Immediate divulgence events pruning") {
SQL"""
-- Immediate divulgence pruning
delete from participant_events_create c
where event_offset <= $pruneUpToInclusive
-- Only prune create events which did not have a locally hosted party before their creation offset
and not exists (
select 1
from party_entries p
where p.typ = 'accept'
and p.ledger_offset <= c.event_offset
and #${queryStrategy.isTrue("p.is_local")}
and #${queryStrategy.arrayContains("c.flat_event_witnesses", "p.party")}
)
$pruneAfterClause
"""
}(connection, loggingContext)
}
pruneWithLogging(queryDescription = "Exercise (consuming) events pruning") {
SQL"""
-- Exercise events (consuming)
delete from participant_events_consuming_exercise delete_events
where
delete_events.event_offset <= $pruneUpToInclusive"""
}(connection, loggingContext)
pruneWithLogging(queryDescription = "Exercise (non-consuming) events pruning") {
SQL"""
-- Exercise events (non-consuming)
delete from participant_events_non_consuming_exercise delete_events
where
delete_events.event_offset <= $pruneUpToInclusive"""
}(connection, loggingContext)
}
private def participantAllDivulgedContractsPrunedUpToInclusive(
connection: Connection
): Option[Offset] =
SQL"select participant_all_divulged_contracts_pruned_up_to_inclusive from parameters"
.as(offset("participant_all_divulged_contracts_pruned_up_to_inclusive").?.single)(
connection
)
private def pruneWithLogging(queryDescription: String)(query: SimpleSql[Row])(
connection: Connection,
loggingContext: LoggingContext,
): Unit = {
val deletedRows = query.executeUpdate()(connection)
logger.info(s"$queryDescription finished: deleted $deletedRows rows.")(loggingContext)
}
private val rawTransactionEventParser: RowParser[RawTransactionEvent] = {
import com.daml.platform.store.Conversions.ArrayColumnToStringArray.arrayColumnToStringArray
(int("event_kind") ~
str("transaction_id") ~
int("node_index") ~
str("command_id").? ~
str("workflow_id").? ~
eventId("event_id") ~
contractId("contract_id") ~
identifier("template_id").? ~
instantFromMicros("ledger_effective_time").? ~
array[String]("create_signatories").? ~
array[String]("create_observers").? ~
str("create_agreement_text").? ~
binaryStream("create_key_value").? ~
int("create_key_value_compression").? ~
binaryStream("create_argument").? ~
int("create_argument_compression").? ~
array[String]("tree_event_witnesses") ~
array[String]("flat_event_witnesses") ~
array[String]("submitters").? ~
str("exercise_choice").? ~
binaryStream("exercise_argument").? ~
int("exercise_argument_compression").? ~
binaryStream("exercise_result").? ~
int("exercise_result_compression").? ~
array[String]("exercise_actors").? ~
array[String]("exercise_child_event_ids").? ~
long("event_sequential_id") ~
offset("event_offset")).map {
case eventKind ~ transactionId ~ nodeIndex ~ commandId ~ workflowId ~ eventId ~ contractId ~ templateId ~ ledgerEffectiveTime ~ createSignatories ~
createObservers ~ createAgreementText ~ createKeyValue ~ createKeyCompression ~
createArgument ~ createArgumentCompression ~ treeEventWitnesses ~ flatEventWitnesses ~ submitters ~ exerciseChoice ~
exerciseArgument ~ exerciseArgumentCompression ~ exerciseResult ~ exerciseResultCompression ~ exerciseActors ~
exerciseChildEventIds ~ eventSequentialId ~ offset =>
RawTransactionEvent(
eventKind,
transactionId,
nodeIndex,
commandId,
workflowId,
eventId,
contractId,
templateId,
ledgerEffectiveTime,
createSignatories,
createObservers,
createAgreementText,
createKeyValue,
createKeyCompression,
createArgument,
createArgumentCompression,
treeEventWitnesses.toSet,
flatEventWitnesses.toSet,
submitters.map(_.toSet).getOrElse(Set.empty),
exerciseChoice,
exerciseArgument,
exerciseArgumentCompression,
exerciseResult,
exerciseResultCompression,
exerciseActors,
exerciseChildEventIds,
eventSequentialId,
offset,
)
}
}
override def rawEvents(startExclusive: Long, endInclusive: Long)(
connection: Connection
): Vector[RawTransactionEvent] =
SQL"""
SELECT
event_kind,
transaction_id,
node_index,
command_id,
workflow_id,
event_id,
contract_id,
template_id,
ledger_effective_time,
create_signatories,
create_observers,
create_agreement_text,
create_key_value,
create_key_value_compression,
create_argument,
create_argument_compression,
tree_event_witnesses,
flat_event_witnesses,
submitters,
exercise_choice,
exercise_argument,
exercise_argument_compression,
exercise_result,
exercise_result_compression,
exercise_actors,
exercise_child_event_ids,
event_sequential_id,
event_offset
FROM
participant_events
WHERE
event_sequential_id > $startExclusive
and event_sequential_id <= $endInclusive
and event_kind != 0
ORDER BY event_sequential_id ASC"""
.asVectorOf(rawTransactionEventParser)(connection)
}
/** This encapsulates the moving part as composing various Events queries.

View File

@ -0,0 +1,42 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend.common
import java.sql.Connection
import anorm.{SQL, SqlQuery}
import com.daml.platform.store.backend.{IngestionStorageBackend, ParameterStorageBackend}
private[backend] trait IngestionStorageBackendTemplate[DB_BATCH]
extends IngestionStorageBackend[DB_BATCH] {
private val SQL_DELETE_OVERSPILL_ENTRIES: List[SqlQuery] =
List(
SQL("DELETE FROM configuration_entries WHERE ledger_offset > {ledger_offset}"),
SQL("DELETE FROM package_entries WHERE ledger_offset > {ledger_offset}"),
SQL("DELETE FROM packages WHERE ledger_offset > {ledger_offset}"),
SQL("DELETE FROM participant_command_completions WHERE completion_offset > {ledger_offset}"),
SQL("DELETE FROM participant_events_divulgence WHERE event_offset > {ledger_offset}"),
SQL("DELETE FROM participant_events_create WHERE event_offset > {ledger_offset}"),
SQL("DELETE FROM participant_events_consuming_exercise WHERE event_offset > {ledger_offset}"),
SQL(
"DELETE FROM participant_events_non_consuming_exercise WHERE event_offset > {ledger_offset}"
),
SQL("DELETE FROM party_entries WHERE ledger_offset > {ledger_offset}"),
)
override def deletePartiallyIngestedData(
ledgerEnd: Option[ParameterStorageBackend.LedgerEnd]
)(connection: Connection): Unit = {
ledgerEnd.foreach { existingLedgerEnd =>
SQL_DELETE_OVERSPILL_ENTRIES.foreach { query =>
import com.daml.platform.store.Conversions.OffsetToStatement
query
.on("ledger_offset" -> existingLedgerEnd.lastOffset)
.execute()(connection)
()
}
}
}
}

View File

@ -0,0 +1,116 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend.common
import java.sql.Connection
import anorm.SqlParser.{flatten, str}
import anorm.{Macro, RowParser, SQL, SqlParser}
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.platform.store.Conversions.{instantFromMicros, ledgerString, offset}
import com.daml.lf.data.Ref.PackageId
import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf
import com.daml.platform.store.appendonlydao.JdbcLedgerDao.{acceptType, rejectType}
import com.daml.platform.store.backend.PackageStorageBackend
import com.daml.platform.store.entries.PackageLedgerEntry
private[backend] trait PackageStorageBackendTemplate extends PackageStorageBackend {
private val SQL_SELECT_PACKAGES =
SQL(
"""select packages.package_id, packages.source_description, packages.known_since, packages.package_size
|from packages, parameters
|where packages.ledger_offset <= parameters.ledger_end
|""".stripMargin
)
private case class ParsedPackageData(
packageId: String,
sourceDescription: Option[String],
size: Long,
knownSince: Long,
)
private val PackageDataParser: RowParser[ParsedPackageData] =
Macro.parser[ParsedPackageData](
"package_id",
"source_description",
"package_size",
"known_since",
)
def lfPackages(connection: Connection): Map[PackageId, PackageDetails] =
SQL_SELECT_PACKAGES
.as(PackageDataParser.*)(connection)
.map(d =>
PackageId.assertFromString(d.packageId) -> PackageDetails(
d.size,
instantFromMicros(d.knownSince),
d.sourceDescription,
)
)
.toMap
private val SQL_SELECT_PACKAGE =
SQL("""select packages.package
|from packages, parameters
|where package_id = {package_id}
|and packages.ledger_offset <= parameters.ledger_end
|""".stripMargin)
def lfArchive(packageId: PackageId)(connection: Connection): Option[Array[Byte]] = {
import com.daml.platform.store.Conversions.packageIdToStatement
SQL_SELECT_PACKAGE
.on(
"package_id" -> packageId
)
.as[Option[Array[Byte]]](SqlParser.byteArray("package").singleOpt)(connection)
}
private val SQL_GET_PACKAGE_ENTRIES = SQL(
"""select * from package_entries
|where ({startExclusive} is null or ledger_offset>{startExclusive})
|and ledger_offset<={endInclusive}
|order by ledger_offset asc
|offset {queryOffset} rows
|fetch next {pageSize} rows only""".stripMargin
)
private val packageEntryParser: RowParser[(Offset, PackageLedgerEntry)] =
(offset("ledger_offset") ~
instantFromMicros("recorded_at") ~
ledgerString("submission_id").? ~
str("typ") ~
str("rejection_reason").?)
.map(flatten)
.map {
case (offset, recordTime, Some(submissionId), `acceptType`, None) =>
offset ->
PackageLedgerEntry.PackageUploadAccepted(submissionId, recordTime)
case (offset, recordTime, Some(submissionId), `rejectType`, Some(reason)) =>
offset ->
PackageLedgerEntry.PackageUploadRejected(submissionId, recordTime, reason)
case invalidRow =>
sys.error(s"packageEntryParser: invalid party entry row: $invalidRow")
}
def packageEntries(
startExclusive: Offset,
endInclusive: Offset,
pageSize: Int,
queryOffset: Long,
)(connection: Connection): Vector[(Offset, PackageLedgerEntry)] = {
import com.daml.platform.store.Conversions.OffsetToStatement
SQL_GET_PACKAGE_ENTRIES
.on(
"startExclusive" -> startExclusive,
"endInclusive" -> endInclusive,
"pageSize" -> pageSize,
"queryOffset" -> queryOffset,
)
.asVectorOf(packageEntryParser)(connection)
}
}

View File

@ -0,0 +1,177 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend.common
import java.sql.Connection
import anorm.SqlParser.long
import anorm.{RowParser, SQL, ~}
import com.daml.ledger.api.domain.{LedgerId, ParticipantId}
import com.daml.ledger.offset.Offset
import com.daml.platform.store.Conversions.{ledgerString, offset}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.common.MismatchException
import com.daml.platform.store.Conversions
import com.daml.platform.store.backend.ParameterStorageBackend
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.scalautil.Statement.discard
import scalaz.syntax.tag._
private[backend] trait ParameterStorageBackendTemplate extends ParameterStorageBackend {
private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
private val SQL_UPDATE_LEDGER_END = SQL(
"""
|UPDATE
| parameters
|SET
| ledger_end = {ledger_end},
| ledger_end_sequential_id = {ledger_end_sequential_id}
|""".stripMargin
)
override def updateLedgerEnd(
ledgerEnd: ParameterStorageBackend.LedgerEnd
)(connection: Connection): Unit = {
import com.daml.platform.store.Conversions.OffsetToStatement
SQL_UPDATE_LEDGER_END
.on("ledger_end" -> ledgerEnd.lastOffset)
.on("ledger_end_sequential_id" -> ledgerEnd.lastEventSeqId)
.execute()(connection)
()
}
private val SQL_GET_LEDGER_END = SQL(
"""
|SELECT
| ledger_end,
| ledger_end_sequential_id
|FROM
| parameters
|
|""".stripMargin
)
override def ledgerEnd(connection: Connection): Option[ParameterStorageBackend.LedgerEnd] =
SQL_GET_LEDGER_END.as(LedgerEndParser.singleOpt)(connection).flatten
private val TableName: String = "parameters"
private val LedgerIdColumnName: String = "ledger_id"
private val ParticipantIdColumnName: String = "participant_id"
private val LedgerEndColumnName: String = "ledger_end"
private val LedgerEndSequentialIdColumnName: String = "ledger_end_sequential_id"
private val LedgerIdParser: RowParser[LedgerId] =
ledgerString(LedgerIdColumnName).map(LedgerId(_))
private val ParticipantIdParser: RowParser[ParticipantId] =
Conversions.participantId(ParticipantIdColumnName).map(ParticipantId(_))
private val LedgerEndOffsetParser: RowParser[Option[Offset]] =
offset(LedgerEndColumnName).?
private val LedgerEndSequentialIdParser: RowParser[Option[Long]] =
long(LedgerEndSequentialIdColumnName).?
private val LedgerIdentityParser: RowParser[ParameterStorageBackend.IdentityParams] =
LedgerIdParser ~ ParticipantIdParser map { case ledgerId ~ participantId =>
ParameterStorageBackend.IdentityParams(ledgerId, participantId)
}
private val LedgerEndParser: RowParser[Option[ParameterStorageBackend.LedgerEnd]] =
LedgerEndOffsetParser ~ LedgerEndSequentialIdParser map {
case Some(lastOffset) ~ Some(lastEventSequentialId) =>
Some(ParameterStorageBackend.LedgerEnd(lastOffset, lastEventSequentialId))
case _ =>
// Note: offset and event sequential id are always written together.
// Cases where only one of them is defined are not handled here.
None
}
override def initializeParameters(
params: ParameterStorageBackend.IdentityParams
)(connection: Connection)(implicit loggingContext: LoggingContext): Unit = {
// Note: this method is the only one that inserts a row into the parameters table
val previous = ledgerIdentity(connection)
val ledgerId = params.ledgerId
val participantId = params.participantId
previous match {
case None =>
logger.info(
s"Initializing new database for ledgerId '${params.ledgerId}' and participantId '${params.participantId}'"
)
discard(
SQL"insert into #$TableName(#$LedgerIdColumnName, #$ParticipantIdColumnName) values(${ledgerId.unwrap}, ${participantId.unwrap: String})"
.execute()(connection)
)
case Some(ParameterStorageBackend.IdentityParams(`ledgerId`, `participantId`)) =>
logger.info(
s"Found existing database for ledgerId '${params.ledgerId}' and participantId '${params.participantId}'"
)
case Some(ParameterStorageBackend.IdentityParams(existing, _))
if existing != params.ledgerId =>
logger.error(
s"Found existing database with mismatching ledgerId: existing '$existing', provided '${params.ledgerId}'"
)
throw MismatchException.LedgerId(
existing = existing,
provided = params.ledgerId,
)
case Some(ParameterStorageBackend.IdentityParams(_, existing)) =>
logger.error(
s"Found existing database with mismatching participantId: existing '$existing', provided '${params.participantId}'"
)
throw MismatchException.ParticipantId(
existing = existing,
provided = params.participantId,
)
}
}
override def ledgerIdentity(
connection: Connection
): Option[ParameterStorageBackend.IdentityParams] =
SQL"select #$LedgerIdColumnName, #$ParticipantIdColumnName from #$TableName"
.as(LedgerIdentityParser.singleOpt)(connection)
private val SQL_UPDATE_MOST_RECENT_PRUNING =
SQL("""
|update parameters set participant_pruned_up_to_inclusive={pruned_up_to_inclusive}
|where participant_pruned_up_to_inclusive < {pruned_up_to_inclusive} or participant_pruned_up_to_inclusive is null
|""".stripMargin)
private val SQL_UPDATE_MOST_RECENT_PRUNING_INCLUDING_ALL_DIVULGED_CONTRACTS =
SQL("""
|update parameters set participant_all_divulged_contracts_pruned_up_to_inclusive={prune_all_divulged_contracts_up_to_inclusive}
|where participant_pruned_up_to_inclusive < {prune_all_divulged_contracts_up_to_inclusive} or participant_all_divulged_contracts_pruned_up_to_inclusive is null
|""".stripMargin)
def updatePrunedUptoInclusive(prunedUpToInclusive: Offset)(connection: Connection): Unit = {
import com.daml.platform.store.Conversions.OffsetToStatement
SQL_UPDATE_MOST_RECENT_PRUNING
.on("pruned_up_to_inclusive" -> prunedUpToInclusive)
.execute()(connection)
()
}
def updatePrunedAllDivulgedContractsUpToInclusive(
prunedUpToInclusive: Offset
)(connection: Connection): Unit = {
import com.daml.platform.store.Conversions.OffsetToStatement
SQL_UPDATE_MOST_RECENT_PRUNING_INCLUDING_ALL_DIVULGED_CONTRACTS
.on("prune_all_divulged_contracts_up_to_inclusive" -> prunedUpToInclusive)
.execute()(connection)
()
}
private val SQL_SELECT_MOST_RECENT_PRUNING = SQL(
"select participant_pruned_up_to_inclusive from parameters"
)
def prunedUptoInclusive(connection: Connection): Option[Offset] =
SQL_SELECT_MOST_RECENT_PRUNING
.as(offset("participant_pruned_up_to_inclusive").?.single)(connection)
}

View File

@ -15,12 +15,17 @@ import com.daml.platform.store.backend.EventStorageBackend.FilterParams
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
import com.daml.platform.store.backend.common.{
AppendOnlySchema,
CommonStorageBackend,
CompletionStorageBackendTemplate,
ConfigurationStorageBackendTemplate,
ContractStorageBackendTemplate,
DataSourceStorageBackendTemplate,
DeduplicationStorageBackendTemplate,
EventStorageBackendTemplate,
EventStrategy,
IngestionStorageBackendTemplate,
InitHookDataSourceProxy,
PackageStorageBackendTemplate,
ParameterStorageBackendTemplate,
PartyStorageBackendTemplate,
QueryStrategy,
Timestamp,
@ -37,7 +42,12 @@ import javax.sql.DataSource
private[backend] object H2StorageBackend
extends StorageBackend[AppendOnlySchema.Batch]
with CommonStorageBackend[AppendOnlySchema.Batch]
with DataSourceStorageBackendTemplate
with IngestionStorageBackendTemplate[AppendOnlySchema.Batch]
with ParameterStorageBackendTemplate
with ConfigurationStorageBackendTemplate
with PackageStorageBackendTemplate
with DeduplicationStorageBackendTemplate
with EventStorageBackendTemplate
with ContractStorageBackendTemplate
with CompletionStorageBackendTemplate

View File

@ -8,12 +8,17 @@ import anorm.SQL
import com.daml.lf.data.Ref
import com.daml.platform.store.backend.common.{
AppendOnlySchema,
CommonStorageBackend,
CompletionStorageBackendTemplate,
ConfigurationStorageBackendTemplate,
ContractStorageBackendTemplate,
DataSourceStorageBackendTemplate,
DeduplicationStorageBackendTemplate,
EventStorageBackendTemplate,
EventStrategy,
IngestionStorageBackendTemplate,
InitHookDataSourceProxy,
PackageStorageBackendTemplate,
ParameterStorageBackendTemplate,
PartyStorageBackendTemplate,
QueryStrategy,
Timestamp,
@ -36,7 +41,12 @@ import javax.sql.DataSource
private[backend] object OracleStorageBackend
extends StorageBackend[AppendOnlySchema.Batch]
with CommonStorageBackend[AppendOnlySchema.Batch]
with DataSourceStorageBackendTemplate
with IngestionStorageBackendTemplate[AppendOnlySchema.Batch]
with ParameterStorageBackendTemplate
with ConfigurationStorageBackendTemplate
with PackageStorageBackendTemplate
with DeduplicationStorageBackendTemplate
with EventStorageBackendTemplate
with ContractStorageBackendTemplate
with CompletionStorageBackendTemplate

View File

@ -17,12 +17,17 @@ import com.daml.platform.store.backend.EventStorageBackend.FilterParams
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
import com.daml.platform.store.backend.common.{
AppendOnlySchema,
CommonStorageBackend,
CompletionStorageBackendTemplate,
ConfigurationStorageBackendTemplate,
ContractStorageBackendTemplate,
DataSourceStorageBackendTemplate,
DeduplicationStorageBackendTemplate,
EventStorageBackendTemplate,
EventStrategy,
IngestionStorageBackendTemplate,
InitHookDataSourceProxy,
PackageStorageBackendTemplate,
ParameterStorageBackendTemplate,
PartyStorageBackendTemplate,
QueryStrategy,
Timestamp,
@ -39,7 +44,12 @@ import org.postgresql.ds.PGSimpleDataSource
private[backend] object PostgresStorageBackend
extends StorageBackend[AppendOnlySchema.Batch]
with CommonStorageBackend[AppendOnlySchema.Batch]
with DataSourceStorageBackendTemplate
with IngestionStorageBackendTemplate[AppendOnlySchema.Batch]
with ParameterStorageBackendTemplate
with ConfigurationStorageBackendTemplate
with PackageStorageBackendTemplate
with DeduplicationStorageBackendTemplate
with EventStorageBackendTemplate
with ContractStorageBackendTemplate
with CompletionStorageBackendTemplate

View File

@ -19,19 +19,6 @@ private[backend] trait StorageBackendTestsInitializeIngestion
import StorageBackendTestValues._
it should "report the correct ledger end" in {
val someLedgerEnd = ParameterStorageBackend.LedgerEnd(offset(1), 1L)
for {
_ <- executeSql(backend.initializeParameters(someIdentityParams))
endBefore <- executeSql(backend.initializeIngestion)
_ <- executeSql(backend.updateLedgerEnd(someLedgerEnd))
endAfter <- executeSql(backend.initializeIngestion)
} yield {
endBefore shouldBe empty
endAfter shouldBe Some(someLedgerEnd)
}
}
it should "delete overspill entries" in {
val dtos1: Vector[DbDto] = Vector(
// 1: config change
@ -75,7 +62,8 @@ private[backend] trait StorageBackendTestsInitializeIngestion
_ <- executeSql(backend.initializeParameters(someIdentityParams))
// Start the indexer (a no-op in this case)
_ <- executeSql(backend.initializeIngestion)
end1 <- executeSql(backend.ledgerEnd)
_ <- executeSql(backend.deletePartiallyIngestedData(end1))
// Fully insert first batch of updates
_ <- executeSql(ingest(dtos1, _))
@ -96,7 +84,8 @@ private[backend] trait StorageBackendTestsInitializeIngestion
)
// Restart the indexer - should delete data from the partial insert above
_ <- executeSql(backend.initializeIngestion)
end2 <- executeSql(backend.ledgerEnd)
_ <- executeSql(backend.deletePartiallyIngestedData(end2))
// Move the ledger end so that any non-deleted data would become visible
_ <- executeSql(backend.updateLedgerEnd(ledgerEnd(10, 6L)))

View File

@ -81,9 +81,9 @@ class SequentialWriteDaoSpec extends AnyFlatSpec with Matchers {
captured = captured ++ batch
}
override def initializeIngestion(
override def deletePartiallyIngestedData(ledgerEnd: Option[ParameterStorageBackend.LedgerEnd])(
connection: Connection
): Option[ParameterStorageBackend.LedgerEnd] =
): Unit =
throw new UnsupportedOperationException
override def updateLedgerEnd(