Sandbox on H2 - performance improvements for the append-only schema [DPP-600] (#10888)

* Added --database-connection-pool-size parameter to the sandbox CLI

* Improved maximum ledger time lookup query

* Improved active contract lookup query

* Improved max event sequential id query

CHANGELOG_BEGIN
- [Sandbox] - Added a CLI parameter for configuring the number of connections in the database connection pool used for serving ledger API requests
CHANGELOG_END
This commit is contained in:
Kamil Bozek 2021-09-15 12:21:19 +02:00 committed by GitHub
parent 9a1a1015f2
commit a939594025
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 181 additions and 47 deletions

View File

@ -58,7 +58,7 @@ private[appendonlydao] final class TransactionsReader(
private val dbMetrics = metrics.daml.index.db
private val eventSeqIdReader =
new EventsRange.EventSeqIdReader(storageBackend.maxEventSeqIdForOffset)
new EventsRange.EventSeqIdReader(storageBackend.maxEventSequentialIdOfAnObservableEvent)
private val getTransactions =
new EventsTableFlatEventsRangeQueries.GetTransactions(storageBackend)
private val getActiveContracts =

View File

@ -268,7 +268,9 @@ trait EventStorageBackend {
transactionId: Ref.TransactionId,
filterParams: FilterParams,
)(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]]
def maxEventSeqIdForOffset(offset: Offset)(connection: Connection): Option[Long]
/** Max event sequential id of observable (create, consuming and nonconsuming exercise) events. */
def maxEventSequentialIdOfAnObservableEvent(offset: Offset)(connection: Connection): Option[Long]
def rawEvents(startExclusive: Long, endInclusive: Long)(
connection: Connection
): Vector[RawTransactionEvent]

View File

@ -5,9 +5,8 @@ package com.daml.platform.store.backend.common
import java.sql.Connection
import java.time.Instant
import anorm.SqlParser.{binaryStream, int, long, str}
import anorm.{ResultSetParser, RowParser, SqlParser, ~}
import anorm.{ResultSetParser, Row, RowParser, SimpleSql, SqlParser, ~}
import com.daml.lf.data.Ref
import com.daml.platform.store.Conversions.{
contractId,
@ -52,17 +51,9 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {
s"The following contracts have not been found: ${missingContractIds.map(_.coid).mkString(", ")}"
)
// TODO append-only: revisit this approach when doing cleanup, so we can decide if it is enough or not.
// TODO append-only: consider pulling up traversal logic to upper layer
override def maximumLedgerTime(
ids: Set[ContractId]
)(connection: Connection): Try[Option[Instant]] = {
if (ids.isEmpty) {
Failure(emptyContractIds)
} else {
def lookup(id: ContractId): Option[Option[Instant]] = {
import com.daml.platform.store.Conversions.ContractIdToStatement
SQL"""
protected def maximumLedgerTimeSqlLiteral(id: ContractId): SimpleSql[Row] = {
import com.daml.platform.store.Conversions.ContractIdToStatement
SQL"""
WITH archival_event AS (
SELECT participant_events.*
FROM participant_events, parameters
@ -99,8 +90,20 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {
FROM create_and_divulged_contracts
WHERE NOT EXISTS (SELECT 1 FROM archival_event)
FETCH NEXT 1 ROW ONLY"""
.as(instantFromMicros("ledger_effective_time").?.singleOpt)(connection)
}
}
// TODO append-only: revisit this approach when doing cleanup, so we can decide if it is enough or not.
// TODO append-only: consider pulling up traversal logic to upper layer
override def maximumLedgerTime(
ids: Set[ContractId]
)(connection: Connection): Try[Option[Instant]] = {
if (ids.isEmpty) {
Failure(emptyContractIds)
} else {
def lookup(id: ContractId): Option[Option[Instant]] =
maximumLedgerTimeSqlLiteral(id).as(instantFromMicros("ledger_effective_time").?.singleOpt)(
connection
)
val queriedIds: List[(ContractId, Option[Option[Instant]])] = ids.toList
.map(id => id -> lookup(id))
@ -223,24 +226,13 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {
.map(SqlParser.flatten)
.map(StorageBackend.RawContract.tupled)
private def activeContract[T](
resultSetParser: ResultSetParser[T],
resultColumns: List[String],
)(
readers: Set[Ref.Party],
protected def activeContractSqlLiteral(
contractId: ContractId,
)(connection: Connection): T = {
treeEventWitnessesClause: CompositeSql,
resultColumns: List[String],
coalescedColumns: String,
): SimpleSql[Row] = {
import com.daml.platform.store.Conversions.ContractIdToStatement
val treeEventWitnessesClause =
queryStrategy.arrayIntersectionNonEmptyClause(
columnName = "tree_event_witnesses",
parties = readers,
)
val coalescedColumns = resultColumns
.map(columnName =>
s"COALESCE(divulgence_events.$columnName, create_event_unrestricted.$columnName)"
)
.mkString(", ")
SQL""" WITH archival_event AS (
SELECT participant_events.*
FROM participant_events, parameters
@ -296,6 +288,26 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {
FROM create_and_divulged_contracts
WHERE NOT EXISTS (SELECT 1 FROM archival_event)
FETCH NEXT 1 ROW ONLY"""
}
private def activeContract[T](
resultSetParser: ResultSetParser[T],
resultColumns: List[String],
)(
readers: Set[Ref.Party],
contractId: ContractId,
)(connection: Connection): T = {
val treeEventWitnessesClause: CompositeSql =
queryStrategy.arrayIntersectionNonEmptyClause(
columnName = "tree_event_witnesses",
parties = readers,
)
val coalescedColumns: String = resultColumns
.map(columnName =>
s"COALESCE(divulgence_events.$columnName, create_event_unrestricted.$columnName)"
)
.mkString(", ")
activeContractSqlLiteral(contractId, treeEventWitnessesClause, resultColumns, coalescedColumns)
.as(resultSetParser)(connection)
}

View File

@ -5,12 +5,12 @@ package com.daml.platform.store.backend.h2
import java.sql.Connection
import java.time.Instant
import anorm.SQL
import anorm.{Row, SQL, SimpleSql}
import anorm.SqlParser.get
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext
import com.daml.platform.store.appendonlydao.events.ContractId
import com.daml.platform.store.backend.EventStorageBackend.FilterParams
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
import com.daml.platform.store.backend.common.{
@ -32,6 +32,7 @@ import com.daml.platform.store.backend.{
StorageBackend,
common,
}
import javax.sql.DataSource
private[backend] object H2StorageBackend
@ -105,14 +106,21 @@ private[backend] object H2StorageBackend
override def insertBatch(connection: Connection, batch: AppendOnlySchema.Batch): Unit =
H2Schema.schema.executeUpdate(batch, connection)
// TODO FIXME: this is for postgres not for H2
def maxEventSeqIdForOffset(offset: Offset)(connection: Connection): Option[Long] = {
def maxEventSequentialIdOfAnObservableEvent(
offset: Offset
)(connection: Connection): Option[Long] = {
import com.daml.platform.store.Conversions.OffsetToStatement
// This query could be: "select max(event_sequential_id) from participant_events where event_offset <= ${range.endInclusive}"
// however tests using PostgreSQL 12 with tens of millions of events have shown that the index
// on `event_offset` is not used unless we _hint_ at it by specifying `order by event_offset`
SQL"select max(event_sequential_id) from participant_events where event_offset <= $offset group by event_offset order by event_offset desc limit 1"
.as(get[Long](1).singleOpt)(connection)
SQL"""
SELECT max_esi FROM (
(SELECT max(event_sequential_id) AS max_esi FROM participant_events_consuming_exercise WHERE event_offset <= $offset GROUP BY event_offset ORDER BY event_offset DESC FETCH NEXT 1 ROW ONLY)
UNION ALL
(SELECT max(event_sequential_id) AS max_esi FROM participant_events_non_consuming_exercise WHERE event_offset <= $offset GROUP BY event_offset ORDER BY event_offset DESC FETCH NEXT 1 ROW ONLY)
UNION ALL
(SELECT max(event_sequential_id) AS max_esi FROM participant_events_create WHERE event_offset <= $offset GROUP BY event_offset ORDER BY event_offset DESC FETCH NEXT 1 ROW ONLY)
) AS t
ORDER BY max_esi DESC
FETCH NEXT 1 ROW ONLY;
""".as(get[Long](1).singleOpt)(connection)
}
object H2QueryStrategy extends QueryStrategy {
@ -238,4 +246,103 @@ private[backend] object H2StorageBackend
pruneAllDivulgedContracts: Boolean,
connection: Connection,
): Unit = ()
override def maximumLedgerTimeSqlLiteral(id: ContractId): SimpleSql[Row] = {
import com.daml.platform.store.Conversions.ContractIdToStatement
SQL"""
WITH archival_event AS (
SELECT 1
FROM participant_events_consuming_exercise, parameters
WHERE contract_id = $id
AND event_sequential_id <= parameters.ledger_end_sequential_id
FETCH NEXT 1 ROW ONLY
),
create_event AS (
SELECT ledger_effective_time
FROM participant_events_create, parameters
WHERE contract_id = $id
AND event_sequential_id <= parameters.ledger_end_sequential_id
FETCH NEXT 1 ROW ONLY -- limit here to guide planner wrt expected number of results
),
divulged_contract AS (
SELECT NULL::BIGINT
FROM participant_events_divulgence, parameters
WHERE contract_id = $id
AND event_sequential_id <= parameters.ledger_end_sequential_id
ORDER BY event_sequential_id
-- prudent engineering: make results more stable by preferring earlier divulgence events
-- Results might still change due to pruning.
FETCH NEXT 1 ROW ONLY
),
create_and_divulged_contracts AS (
(SELECT * FROM create_event) -- prefer create over divulgence events
UNION ALL
(SELECT * FROM divulged_contract)
)
SELECT ledger_effective_time
FROM create_and_divulged_contracts
WHERE NOT EXISTS (SELECT 1 FROM archival_event)
FETCH NEXT 1 ROW ONLY"""
}
override def activeContractSqlLiteral(
contractId: ContractId,
treeEventWitnessesClause: CompositeSql,
resultColumns: List[String],
coalescedColumns: String,
): SimpleSql[Row] = {
import com.daml.platform.store.Conversions.ContractIdToStatement
SQL""" WITH archival_event AS (
SELECT 1
FROM participant_events_consuming_exercise, parameters
WHERE contract_id = $contractId
AND event_sequential_id <= parameters.ledger_end_sequential_id
AND $treeEventWitnessesClause -- only use visible archivals
FETCH NEXT 1 ROW ONLY
),
create_event AS (
SELECT contract_id, #${resultColumns.mkString(", ")}
FROM participant_events_create, parameters
WHERE contract_id = $contractId
AND event_sequential_id <= parameters.ledger_end_sequential_id
AND $treeEventWitnessesClause
FETCH NEXT 1 ROW ONLY -- limit here to guide planner wrt expected number of results
),
-- no visibility check, as it is used to backfill missing template_id and create_arguments for divulged contracts
create_event_unrestricted AS (
SELECT contract_id, #${resultColumns.mkString(", ")}
FROM participant_events_create, parameters
WHERE contract_id = $contractId
AND event_sequential_id <= parameters.ledger_end_sequential_id
FETCH NEXT 1 ROW ONLY -- limit here to guide planner wrt expected number of results
),
divulged_contract AS (
SELECT divulgence_events.contract_id,
-- Note: the divulgence_event.template_id can be NULL
-- for certain integrations. For example, the KV integration exploits that
-- every participant node knows about all create events. The integration
-- therefore only communicates the change in visibility to the IndexDB, but
-- does not include a full divulgence event.
#$coalescedColumns
FROM participant_events_divulgence divulgence_events LEFT OUTER JOIN create_event_unrestricted ON (divulgence_events.contract_id = create_event_unrestricted.contract_id),
parameters
WHERE divulgence_events.contract_id = $contractId -- restrict to aid query planner
AND divulgence_events.event_sequential_id <= parameters.ledger_end_sequential_id
AND $treeEventWitnessesClause
ORDER BY divulgence_events.event_sequential_id
-- prudent engineering: make results more stable by preferring earlier divulgence events
-- Results might still change due to pruning.
FETCH NEXT 1 ROW ONLY
),
create_and_divulged_contracts AS (
(SELECT * FROM create_event) -- prefer create over divulgence events
UNION ALL
(SELECT * FROM divulged_contract)
)
SELECT contract_id, #${resultColumns.mkString(", ")}
FROM create_and_divulged_contracts
WHERE NOT EXISTS (SELECT 1 FROM archival_event)
FETCH NEXT 1 ROW ONLY"""
}
}

View File

@ -170,8 +170,10 @@ private[backend] object OracleStorageBackend
override def eventStrategy: common.EventStrategy = OracleEventStrategy
// TODO FIXME: confirm this works for oracle
def maxEventSeqIdForOffset(offset: Offset)(connection: Connection): Option[Long] = {
// TODO FIXME: Use tables directly instead of the participant_events view.
def maxEventSequentialIdOfAnObservableEvent(
offset: Offset
)(connection: Connection): Option[Long] = {
import com.daml.platform.store.Conversions.OffsetToStatement
// This query could be: "select max(event_sequential_id) from participant_events where event_offset <= ${range.endInclusive}"
// however tests using PostgreSQL 12 with tens of millions of events have shown that the index

View File

@ -202,7 +202,10 @@ private[backend] object PostgresStorageBackend
override def eventStrategy: common.EventStrategy = PostgresEventStrategy
override def maxEventSeqIdForOffset(offset: Offset)(connection: Connection): Option[Long] = {
// TODO FIXME: Use tables directly instead of the participant_events view.
override def maxEventSequentialIdOfAnObservableEvent(
offset: Offset
)(connection: Connection): Option[Long] = {
import com.daml.platform.store.Conversions.OffsetToStatement
// This query could be: "select max(event_sequential_id) from participant_events where event_offset <= ${range.endInclusive}"
// however tests using PostgreSQL 12 with tens of millions of events have shown that the index

View File

@ -36,6 +36,15 @@ private[sandboxnext] object Cli extends SandboxCli {
s"Deprecated: Use the Daml Driver for PostgreSQL if you need persistence.\nThe JDBC connection URL to a Postgres database containing the username and password as well. If present, $Name will use the database to persist its data."
)
.action((url, config) => config.copy(jdbcUrl = Some(url)))
parser
.opt[Int]("database-connection-pool-size")
.optional()
.text(
s"The number of connections in the database connection pool used for serving ledger API requests."
)
.action((poolSize, config) => config.copy(databaseConnectionPoolSize = poolSize))
parser
}

View File

@ -240,8 +240,7 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
port = currentPort.getOrElse(config.port),
address = config.address,
jdbcUrl = indexJdbcUrl,
// 16 DB connections has been shown to be sufficient for applications running on the sandbox
databaseConnectionPoolSize = 16,
databaseConnectionPoolSize = config.databaseConnectionPoolSize,
databaseConnectionTimeout = config.databaseConnectionTimeout,
tlsConfig = config.tlsConfig,
maxInboundMessageSize = config.maxInboundMessageSize,