ledger-on-sql: Migrate tables using Flyway. (#4232)

* ledger-on-sql: Pull out a superclass for the various integration tests.

* ledger-on-sql: Use Flyway to migrate tables in an idempotent manner.

CHANGELOG_BEGIN
CHANGELOG_END

* ledger-on-sql: Make it easy to run it with Bazel for experimentation.

* ledger-on-sql: Test that migrations will never change in the future.

* ledger-on-sql: Add a prefix of "ledger_" to the tables.

This is so we don't accidentally conflict with the index when the
schemas are shared.

I am letting myself modify the migrations because the existing
migrations haven't been merged into `master` yet.

* ledger-on-sql: Explain why we change the pool size after migration.

* ledger-api-test-tool: Tests now specify a timeout scale, not a timeout.

Makes it easier to change the default timeout in one place.

* ledger-api-test-tool: Increase timeouts on slow tests.

These tests produce a lot of volume and can make CI flaky.

* ledger-on-sql: Only tear down PostgreSQL in tests.

For other databases, we just create a new file for each test case.

* ledger-on-sql: Reduce the log output in tests.

* ledger-on-sql: Use a separate connection pool for Flyway when possible.

Apparently `setMaximumPoolSize` doesn't really have the desired effect
after the connection pool has already been used. The new test case will
be flaky if we process more than one commit in parallel.

For SQLite, it seems to be OK.

* ledger-on-sql: Use a separate connection pool for Flyway with SQLite.

Except in memory.

* ledger-on-sql: Use a separate PostgreSQL database for each test.

Because performance, innit. Don't have to tear them down.

* ledger-on-sql: Move the `Queries` values inside the `RDBMS` objects.

* ledger-on-sql: Go into even more detail about pool size hijinks.

Co-authored-by: Samir Talwar <samir.talwar@digitalasset.com>
This commit is contained in:
Samir Talwar 2020-01-28 17:16:23 +01:00 committed by mergify[bot]
parent 0d5ae27e04
commit c81cc2ab2a
32 changed files with 520 additions and 357 deletions

1
.gitignore vendored
View File

@ -58,7 +58,6 @@ da-daml-equities-library.txt
.flattened-pom.xml
*~
*.orig
sqlite
venv
trace.*.dat
node_modules/

View File

@ -6,13 +6,12 @@ package com.daml.ledger.api.testtool.infrastructure
import com.daml.ledger.api.testtool.infrastructure.Allocation.{ParticipantAllocation, Participants}
import com.digitalasset.daml.lf.data.Ref
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}
final class LedgerTestCase(
val shortIdentifier: Ref.LedgerString,
val description: String,
val timeout: Duration,
val timeoutScale: Double,
participants: ParticipantAllocation,
runTestCase: Participants => Future[Unit],
) {

View File

@ -8,7 +8,6 @@ import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite._
import com.digitalasset.daml.lf.data.Ref
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.{Duration, DurationLong}
import scala.concurrent.{ExecutionContext, Future}
private[testtool] abstract class LedgerTestSuite(val session: LedgerSession) {
@ -24,11 +23,11 @@ private[testtool] abstract class LedgerTestSuite(val session: LedgerSession) {
shortIdentifier: String,
description: String,
participants: ParticipantAllocation,
timeout: Duration = 30.seconds,
timeoutScale: Double = 1.0,
)(testCase: Participants => Future[Unit]): Unit = {
val shortIdentifierRef = Ref.LedgerString.assertFromString(shortIdentifier)
testCaseBuffer.append(
new LedgerTestCase(shortIdentifierRef, description, timeout, participants, testCase),
new LedgerTestCase(shortIdentifierRef, description, timeoutScale, participants, testCase),
)
}

View File

@ -14,12 +14,14 @@ import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuiteRunner._
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantSessionManager
import org.slf4j.LoggerFactory
import scala.concurrent.duration.Duration
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.control.NonFatal
import scala.util.{Failure, Try}
object LedgerTestSuiteRunner {
private val DefaultTimeout = 30.seconds
private val timer = new Timer("ledger-test-suite-runner-timer", true)
private val logger = LoggerFactory.getLogger(classOf[LedgerTestSuiteRunner])
@ -35,12 +37,12 @@ final class LedgerTestSuiteRunner(
config: LedgerSessionConfiguration,
suiteConstructors: Vector[LedgerSession => LedgerTestSuite],
identifierSuffix: String,
timeoutScaleFactor: Double,
suiteTimeoutScale: Double,
concurrentTestRuns: Int,
) {
private[this] val verifyRequirements: Try[Unit] =
Try {
require(timeoutScaleFactor > 0, "The timeout scale factor must be strictly positive")
require(suiteTimeoutScale > 0, "The timeout scale factor must be strictly positive")
require(identifierSuffix.nonEmpty, "The identifier suffix cannot be an empty string")
}
@ -48,7 +50,7 @@ final class LedgerTestSuiteRunner(
implicit ec: ExecutionContext,
): Future[Duration] = {
val execution = Promise[Duration]
val scaledTimeout = test.timeout * timeoutScaleFactor
val scaledTimeout = DefaultTimeout * suiteTimeoutScale * test.timeoutScale
val startedTest =
session

View File

@ -13,7 +13,6 @@ import com.digitalasset.ledger.client.binding.Primitive.{ContractId, Party}
import com.digitalasset.ledger.test_stable.Test.WithObservers
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
final class LotsOfParties(session: LedgerSession) extends LedgerTestSuite(session) {
type Parties = Set[Party]
@ -27,13 +26,13 @@ final class LotsOfParties(session: LedgerSession) extends LedgerTestSuite(sessio
allocate(Parties(partyCount / 2), Parties(partyCount / 2))
// allocating parties seems to be really slow on a real database
private val timeout = 120.seconds
private val timeoutScale = 4.0
test(
"LOPseeTransactionsInMultipleSinglePartySubscriptions",
"Observers should see transactions in multiple single-party subscriptions",
allocation,
timeout,
timeoutScale,
) {
case TestParticipants(t) =>
for {
@ -58,7 +57,7 @@ final class LotsOfParties(session: LedgerSession) extends LedgerTestSuite(sessio
"LOPseeTransactionsInSingleMultiPartySubscription",
"Observers should see transactions in a single multi-party subscription",
allocation,
timeout,
timeoutScale,
) {
case TestParticipants(t) =>
for {
@ -83,7 +82,7 @@ final class LotsOfParties(session: LedgerSession) extends LedgerTestSuite(sessio
"LOPseeActiveContractsInMultipleSinglePartySubscriptions",
"Observers should see active contracts in multiple single-party subscriptions",
allocation,
timeout,
timeoutScale,
) {
case TestParticipants(t) =>
for {
@ -108,7 +107,7 @@ final class LotsOfParties(session: LedgerSession) extends LedgerTestSuite(sessio
"LOPseeActiveContractsInSingleMultiPartySubscription",
"Observers should see active contracts in a single multi-party subscription",
allocation,
timeout,
timeoutScale,
) {
case TestParticipants(t) =>
for {

View File

@ -212,6 +212,7 @@ final class SemanticTests(session: LedgerSession) extends LedgerTestSuite(sessio
"SemanticPrivacyProjections",
"Test visibility via contract fetches for the paint-offer flow",
allocate(TwoParties, SingleParty),
timeoutScale = 2.0,
) {
case Participants(Participant(alpha, bank, houseOwner), Participant(beta, painter)) =>
for {

View File

@ -9,6 +9,10 @@ import com.daml.ledger.api.testtool.infrastructure.Eventually.eventually
import com.daml.ledger.api.testtool.infrastructure.Synchronize.synchronize
import com.daml.ledger.api.testtool.infrastructure.TransactionHelpers._
import com.daml.ledger.api.testtool.infrastructure.{LedgerSession, LedgerTestSuite}
import com.daml.ledger.api.testtool.tests.TransactionService.{
comparableTransactionTrees,
comparableTransactions,
}
import com.digitalasset.ledger.api.v1.transaction.{Transaction, TransactionTree}
import com.digitalasset.ledger.client.binding.Primitive
import com.digitalasset.ledger.client.binding.Value.encode
@ -21,8 +25,6 @@ import com.digitalasset.ledger.test_stable.Test.DummyFactory._
import com.digitalasset.ledger.test_stable.Test.ParameterShowcase._
import com.digitalasset.ledger.test_stable.Test.TriProposal._
import com.digitalasset.ledger.test_stable.Test._
import TransactionService.{comparableTransactions, comparableTransactionTrees}
import io.grpc.Status
import scalaz.Tag
@ -1409,6 +1411,7 @@ class TransactionService(session: LedgerSession) extends LedgerTestSuite(session
"TXSingleSubscriptionInOrder",
"Archives should always come after creations when subscribing as a single party",
allocate(SingleParty),
timeoutScale = 2.0,
) {
case Participants(Participant(ledger, party)) =>
val contracts = 50
@ -1430,6 +1433,7 @@ class TransactionService(session: LedgerSession) extends LedgerTestSuite(session
"TXMultiSubscriptionInOrder",
"Archives should always come after creations when subscribing as more than on party",
allocate(TwoParties),
timeoutScale = 2.0,
) {
case Participants(Participant(ledger, alice, bob)) =>
val contracts = 50
@ -1450,6 +1454,7 @@ class TransactionService(session: LedgerSession) extends LedgerTestSuite(session
"TXFlatSubsetOfTrees",
"The event identifiers in the flat stream should be a subset of those in the trees stream",
allocate(SingleParty),
timeoutScale = 2.0,
) {
case Participants(Participant(ledger, party)) =>
val contracts = 50
@ -1481,6 +1486,7 @@ class TransactionService(session: LedgerSession) extends LedgerTestSuite(session
"TXFlatWitnessesSubsetOfTrees",
"The witnesses in the flat stream should be a subset of those in the trees stream",
allocate(SingleParty),
timeoutScale = 2.0,
) {
case Participants(Participant(ledger, party)) =>
val contracts = 50

View File

@ -71,6 +71,8 @@ supported_databases = [
},
]
all_database_runtime_deps = {dep: None for db in supported_databases for dep in db["runtime_deps"]}.keys()
da_scala_library(
name = "ledger-on-sql",
srcs = glob(["src/main/scala/**/*.scala"]),
@ -101,6 +103,17 @@ da_scala_library(
"@maven//:com_typesafe_play_anorm_2_12",
"@maven//:com_typesafe_play_anorm_tokenizer_2_12",
"@maven//:com_zaxxer_HikariCP",
"@maven//:org_flywaydb_flyway_core",
],
)
da_scala_binary(
name = "app",
main_class = "com.daml.ledger.on.sql.Main",
visibility = ["//visibility:public"],
runtime_deps = all_database_runtime_deps,
deps = [
":ledger-on-sql",
],
)
@ -113,14 +126,21 @@ da_scala_library(
deps = [
":ledger-on-sql",
"//daml-lf/data",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/ledger-api-health",
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils:kvutils-tests-lib",
"//ledger/participant-state/kvutils/app",
"//libs-scala/contextualized-logging",
"//libs-scala/postgresql-testing",
"//libs-scala/resources",
"@maven//:com_github_scopt_scopt_2_12",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:org_scalactic_scalactic_2_12",
"@maven//:org_scalatest_scalatest_2_12",
],
)
@ -132,13 +152,10 @@ da_scala_test_suite(
"//ledger/test-common:Test-stable.dar",
],
resources = glob(["src/test/resources/*"]),
runtime_deps = [
"@maven//:com_h2database_h2",
"@maven//:org_postgresql_postgresql",
"@maven//:org_xerial_sqlite_jdbc",
],
runtime_deps = all_database_runtime_deps,
deps = [
":ledger-on-sql",
":ledger-on-sql-test-lib",
"//daml-lf/data",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
@ -147,11 +164,11 @@ da_scala_test_suite(
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils:kvutils-tests-lib",
"//libs-scala/contextualized-logging",
"//libs-scala/postgresql-testing",
"//libs-scala/resources",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:org_flywaydb_flyway_core",
"@maven//:org_scalactic_scalactic_2_12",
"@maven//:org_scalatest_scalatest_2_12",
],

View File

@ -0,0 +1,15 @@
-- Copyright (c) 2020 The DAML Authors. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
CREATE TABLE ledger_log
(
sequence_no IDENTITY PRIMARY KEY NOT NULL,
entry_id VARBINARY(16384) NOT NULL,
envelope BLOB NOT NULL
);
CREATE TABLE ledger_state
(
key VARBINARY(16384) PRIMARY KEY NOT NULL,
value BLOB NOT NULL
);

View File

@ -0,0 +1 @@
e4ff23aa23a4375a603d94f6085b469a3abb44f5d5bcb7ec7158eed666781be4

View File

@ -0,0 +1,15 @@
-- Copyright (c) 2020 The DAML Authors. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
CREATE TABLE ledger_log
(
sequence_no SERIAL PRIMARY KEY,
entry_id BYTEA NOT NULL,
envelope BYTEA NOT NULL
);
CREATE TABLE ledger_state
(
key BYTEA PRIMARY KEY NOT NULL,
value BYTEA NOT NULL
);

View File

@ -0,0 +1 @@
623aa08159061ce95022154850e2e363070caa9f0e390f6b00fb481e9c798e3b

View File

@ -0,0 +1,15 @@
-- Copyright (c) 2020 The DAML Authors. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
CREATE TABLE ledger_log
(
sequence_no INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
entry_id VARBINARY(16384) NOT NULL,
envelope BLOB NOT NULL
);
CREATE TABLE ledger_state
(
key VARBINARY(16384) PRIMARY KEY NOT NULL,
value BLOB NOT NULL
);

View File

@ -0,0 +1 @@
4a9eed0646c9502d2ae25e1962b67886c18d9386ca1277d740472addfea862a6

View File

@ -9,6 +9,7 @@ import com.digitalasset.logging.{ContextualizedLogger, LoggingContext}
import com.digitalasset.resources.ResourceOwner
import com.zaxxer.hikari.HikariDataSource
import javax.sql.DataSource
import org.flywaydb.core.Flyway
case class Database(
queries: Queries,
@ -27,50 +28,156 @@ object Database {
// entries missing.
private val MaximumWriterConnectionPoolSize: Int = 1
def owner(jdbcUrl: String)(implicit logCtx: LoggingContext): ResourceOwner[Database] =
def owner(jdbcUrl: String)(
implicit logCtx: LoggingContext,
): ResourceOwner[UninitializedDatabase] =
(jdbcUrl match {
case url if url.startsWith("jdbc:h2:") =>
MultipleReaderSingleWriterDatabase.owner(jdbcUrl, new H2Queries)
MultipleConnectionDatabase.owner(RDBMS.H2, jdbcUrl)
case url if url.startsWith("jdbc:postgresql:") =>
MultipleReaderSingleWriterDatabase.owner(jdbcUrl, new PostgresqlQueries)
MultipleConnectionDatabase.owner(RDBMS.PostgreSQL, jdbcUrl)
case url if url.startsWith("jdbc:sqlite::memory:") =>
SingleConnectionDatabase.owner(RDBMS.SQLite, jdbcUrl)
case url if url.startsWith("jdbc:sqlite:") =>
SingleConnectionDatabase.owner(jdbcUrl, new SqliteQueries)
SingleConnectionExceptAdminDatabase.owner(RDBMS.SQLite, jdbcUrl)
case _ => throw new InvalidDatabaseException(jdbcUrl)
}).map { database =>
logger.info(s"Connected to the ledger over JDBC: $jdbcUrl")
database
}
object MultipleReaderSingleWriterDatabase {
def owner(jdbcUrl: String, queries: Queries): ResourceOwner[Database] =
object MultipleConnectionDatabase {
def owner(
system: RDBMS,
jdbcUrl: String,
): ResourceOwner[UninitializedDatabase] =
for {
readerConnectionPool <- ResourceOwner.forCloseable(() =>
newHikariDataSource(jdbcUrl, readOnly = true))
newHikariDataSource(jdbcUrl, readOnly = true),
)
writerConnectionPool <- ResourceOwner.forCloseable(() =>
newHikariDataSource(jdbcUrl, maximumPoolSize = Some(MaximumWriterConnectionPoolSize)))
} yield new Database(queries, readerConnectionPool, writerConnectionPool)
newHikariDataSource(jdbcUrl, maxPoolSize = Some(MaximumWriterConnectionPoolSize)),
)
adminConnectionPool <- ResourceOwner.forCloseable(() => newHikariDataSource(jdbcUrl))
} yield new UninitializedDatabase(
system,
readerConnectionPool,
writerConnectionPool,
adminConnectionPool,
)
}
object SingleConnectionDatabase {
def owner(jdbcUrl: String, queries: Queries): ResourceOwner[Database] =
object SingleConnectionExceptAdminDatabase {
def owner(
system: RDBMS,
jdbcUrl: String,
): ResourceOwner[UninitializedDatabase] =
for {
connectionPool <- ResourceOwner.forCloseable(() =>
newHikariDataSource(jdbcUrl, maximumPoolSize = Some(MaximumWriterConnectionPoolSize)))
} yield new Database(queries, connectionPool, connectionPool)
readerWriterConnectionPool <- ResourceOwner.forCloseable(() =>
newHikariDataSource(jdbcUrl, maxPoolSize = Some(MaximumWriterConnectionPoolSize)),
)
adminConnectionPool <- ResourceOwner.forCloseable(() => newHikariDataSource(jdbcUrl))
} yield new UninitializedDatabase(
system,
readerWriterConnectionPool,
readerWriterConnectionPool,
adminConnectionPool,
)
}
// This is used when connecting to SQLite in-memory. Unlike file storage or H2 in-memory, each
// connection established will create a new, separate database. This means we can't create more
// than one connection pool, as each pool will create a new connection and therefore a new
// database.
//
// Because of this, Flyway needs to share the connection pool. However, Flyway _also_ requires
// a connection pool that allows for two concurrent connections. It uses one to lock the
// migrations table (to ensure we don't run migrations in parallel), and then the second to
// actually run the migrations. This is actually unnecessary in this case because it's impossible
// to have two connections, but it doesn't know that.
//
// To make Flyway happy, we create an unbounded connection pool and then drop it to 1 connection
// after migration.
object SingleConnectionDatabase {
def owner(
system: RDBMS,
jdbcUrl: String,
): ResourceOwner[UninitializedDatabase] =
for {
connectionPool <- ResourceOwner.forCloseable(() => newHikariDataSource(jdbcUrl))
} yield new UninitializedDatabase(
system,
readerConnectionPool = connectionPool,
writerConnectionPool = connectionPool,
adminConnectionPool = connectionPool,
afterMigration = () => {
connectionPool.setMaximumPoolSize(MaximumWriterConnectionPoolSize)
},
)
}
private def newHikariDataSource(
jdbcUrl: String,
maximumPoolSize: Option[Int] = None,
readOnly: Boolean = false,
maxPoolSize: Option[Int] = None,
): HikariDataSource = {
val pool = new HikariDataSource()
pool.setAutoCommit(false)
pool.setJdbcUrl(jdbcUrl)
pool.setReadOnly(readOnly)
maximumPoolSize.foreach { maximumPoolSize =>
pool.setMaximumPoolSize(maximumPoolSize)
}
maxPoolSize.foreach(pool.setMaximumPoolSize)
pool
}
sealed trait RDBMS {
val name: String
val queries: Queries
}
object RDBMS {
object H2 extends RDBMS {
override val name: String = "h2"
override val queries: Queries = new H2Queries
}
object PostgreSQL extends RDBMS {
override val name: String = "postgresql"
override val queries: Queries = new PostgresqlQueries
}
object SQLite extends RDBMS {
override val name: String = "sqlite"
override val queries: Queries = new SqliteQueries
}
}
class UninitializedDatabase(
system: RDBMS,
readerConnectionPool: DataSource,
writerConnectionPool: DataSource,
adminConnectionPool: DataSource,
afterMigration: () => Unit = () => (),
) {
private val flyway: Flyway =
Flyway
.configure()
.dataSource(adminConnectionPool)
.locations(s"classpath:/com/daml/ledger/on/sql/migrations/${system.name}")
.load()
def migrate(): Database = {
flyway.migrate()
afterMigration()
Database(system.queries, readerConnectionPool, writerConnectionPool)
}
def clear(): this.type = {
flyway.clean()
this
}
}
}

View File

@ -27,8 +27,9 @@ object Main extends App {
.opt[String]("jdbc-url")
.required()
.text("The URL used to connect to the database.")
.action(
(jdbcUrl, config) => config.copy(extra = config.extra.copy(jdbcUrl = Some(jdbcUrl))))
.action((jdbcUrl, config) =>
config.copy(extra = config.extra.copy(jdbcUrl = Some(jdbcUrl))),
)
()
}

View File

@ -16,7 +16,7 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlLogEntryId,
DamlStateKey,
DamlStateValue,
DamlSubmission
DamlSubmission,
}
import com.daml.ledger.participant.state.kvutils.api.{LedgerReader, LedgerRecord, LedgerWriter}
import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting}
@ -77,7 +77,7 @@ class SqlLedgerReaderWriter(
} else {
Source(result)
}
})
}),
)
.map { case (_, record) => record }
@ -119,7 +119,8 @@ class SqlLedgerReaderWriter(
if (!(actualStateUpdates.keySet subsetOf expectedStateUpdates)) {
val unaccountedKeys = actualStateUpdates.keySet diff expectedStateUpdates
sys.error(
s"CommitActor: State updates not a subset of expected updates! Keys [$unaccountedKeys] are unaccounted for!")
s"CommitActor: State updates not a subset of expected updates! Keys [$unaccountedKeys] are unaccounted for!",
)
}
}
@ -142,14 +143,6 @@ class SqlLedgerReaderWriter(
.result()
}
private def migrate(): Unit = {
inDatabaseWriteTransaction("Migrating the database") { implicit connection =>
queries.createLogTable()
queries.createStateTable()
}
logger.info("Successfully migrated the database.")
}
private def inDatabaseReadTransaction[T](message: String)(
body: Connection => T,
)(implicit logCtx: LoggingContext): T = {
@ -210,18 +203,16 @@ object SqlLedgerReaderWriter {
logCtx: LoggingContext,
): ResourceOwner[SqlLedgerReaderWriter] =
for {
dispatcher <- ResourceOwner.forCloseable(
() =>
Dispatcher(
"sql-participant-state",
zeroIndex = StartIndex,
headAtInitialization = StartIndex,
))
database <- Database.owner(jdbcUrl)
dispatcher <- ResourceOwner.forCloseable(() =>
Dispatcher(
"sql-participant-state",
zeroIndex = StartIndex,
headAtInitialization = StartIndex,
),
)
uninitializedDatabase <- Database.owner(jdbcUrl)
} yield {
val participant =
new SqlLedgerReaderWriter(ledgerId, participantId, database, dispatcher)
participant.migrate()
participant
val database = uninitializedDatabase.migrate()
new SqlLedgerReaderWriter(ledgerId, participantId, database, dispatcher)
}
}

View File

@ -11,7 +11,7 @@ import com.daml.ledger.on.sql.queries.Queries._
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlLogEntryId,
DamlStateKey,
DamlStateValue
DamlStateValue,
}
import com.daml.ledger.participant.state.kvutils.api.LedgerRecord
import com.daml.ledger.participant.state.v1.Offset
@ -24,7 +24,7 @@ trait CommonQueries extends Queries {
start: Index,
end: Index,
)(implicit connection: Connection): immutable.Seq[(Index, LedgerRecord)] =
SQL"SELECT sequence_no, entry_id, envelope FROM log WHERE sequence_no >= $start AND sequence_no < $end"
SQL"SELECT sequence_no, entry_id, envelope FROM #$LogTable WHERE sequence_no >= $start AND sequence_no < $end"
.as(
(long("sequence_no") ~ byteArray("entry_id") ~ byteArray("envelope")).map {
case index ~ entryId ~ envelope =>
@ -36,13 +36,13 @@ trait CommonQueries extends Queries {
.build(),
envelope,
)
}.*
}.*,
)
override def selectStateByKeys(
keys: Iterable[DamlStateKey],
)(implicit connection: Connection): immutable.Seq[(DamlStateKey, Option[DamlStateValue])] =
SQL"SELECT key, value FROM state WHERE key IN (${keys.map(_.toByteArray).toSeq})"
SQL"SELECT key, value FROM #$StateTable WHERE key IN (${keys.map(_.toByteArray).toSeq})"
.as((byteArray("key") ~ byteArray("value")).map {
case key ~ value =>
DamlStateKey.parseFrom(key) -> Some(DamlStateValue.parseFrom(value))
@ -56,7 +56,7 @@ trait CommonQueries extends Queries {
stateUpdates.map {
case (key, value) =>
immutable.Seq[NamedParameter]("key" -> key.toByteArray, "value" -> value.toByteArray)
}
},
)
protected val updateStateQuery: String

View File

@ -7,33 +7,23 @@ import java.sql.Connection
import anorm.SqlParser._
import anorm._
import com.daml.ledger.on.sql.queries.Queries.Index
import com.daml.ledger.on.sql.queries.Queries._
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId
import com.google.protobuf.ByteString
class H2Queries extends Queries with CommonQueries {
override def createLogTable()(implicit connection: Connection): Unit = {
SQL"CREATE TABLE IF NOT EXISTS log (sequence_no IDENTITY PRIMARY KEY NOT NULL, entry_id VARBINARY(16384) NOT NULL, envelope BLOB NOT NULL)"
.execute()
()
}
override def createStateTable()(implicit connection: Connection): Unit = {
SQL"CREATE TABLE IF NOT EXISTS state (key VARBINARY(16384) PRIMARY KEY NOT NULL, value BLOB NOT NULL)"
.execute()
()
}
override def insertIntoLog(
entry: DamlLogEntryId,
envelope: ByteString,
)(implicit connection: Connection): Index = {
SQL"INSERT INTO log (entry_id, envelope) VALUES (${entry.getEntryId.newInput()}, ${envelope.newInput()})"
val entryIdStream = entry.getEntryId.newInput()
val envelopeStream = envelope.newInput()
SQL"INSERT INTO #$LogTable (entry_id, envelope) VALUES ($entryIdStream, $envelopeStream)"
.executeInsert()
SQL"CALL IDENTITY()"
.as(long("IDENTITY()").single)
}
override protected val updateStateQuery: String =
"MERGE INTO state VALUES ({key}, {value})"
s"MERGE INTO $StateTable VALUES ({key}, {value})"
}

View File

@ -7,31 +7,21 @@ import java.sql.Connection
import anorm.SqlParser._
import anorm._
import com.daml.ledger.on.sql.queries.Queries.Index
import com.daml.ledger.on.sql.queries.Queries._
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId
import com.google.protobuf.ByteString
class PostgresqlQueries extends Queries with CommonQueries {
override def createLogTable()(implicit connection: Connection): Unit = {
SQL"CREATE TABLE IF NOT EXISTS log (sequence_no SERIAL PRIMARY KEY, entry_id BYTEA NOT NULL, envelope BYTEA NOT NULL)"
.execute()
()
}
override def createStateTable()(implicit connection: Connection): Unit = {
SQL"CREATE TABLE IF NOT EXISTS state (key BYTEA PRIMARY KEY NOT NULL, value BYTEA NOT NULL)"
.execute()
()
}
override def insertIntoLog(
entry: DamlLogEntryId,
envelope: ByteString,
)(implicit connection: Connection): Index = {
SQL"INSERT INTO log (entry_id, envelope) VALUES (${entry.getEntryId.newInput()}, ${envelope.newInput()}) RETURNING sequence_no"
val entryIdStream = entry.getEntryId.newInput()
val envelopeStream = envelope.newInput()
SQL"INSERT INTO #$LogTable (entry_id, envelope) VALUES ($entryIdStream, $envelopeStream) RETURNING sequence_no"
.as(long("sequence_no").single)
}
override protected val updateStateQuery: String =
"INSERT INTO state VALUES ({key}, {value}) ON CONFLICT(key) DO UPDATE SET value = {value}"
s"INSERT INTO $StateTable VALUES ({key}, {value}) ON CONFLICT(key) DO UPDATE SET value = {value}"
}

View File

@ -14,10 +14,6 @@ import com.google.protobuf.ByteString
import scala.collection.immutable
trait Queries {
def createLogTable()(implicit connection: Connection): Unit
def createStateTable()(implicit connection: Connection): Unit
def selectFromLog(
start: Index,
end: Index,
@ -30,8 +26,9 @@ trait Queries {
def selectStateByKeys(
keys: Iterable[DamlKvutils.DamlStateKey],
)(implicit connection: Connection)
: immutable.Seq[(DamlKvutils.DamlStateKey, Option[DamlKvutils.DamlStateValue])]
)(
implicit connection: Connection,
): immutable.Seq[(DamlKvutils.DamlStateKey, Option[DamlKvutils.DamlStateValue])]
def updateState(
stateUpdates: Map[DamlKvutils.DamlStateKey, DamlKvutils.DamlStateValue],
@ -41,6 +38,10 @@ trait Queries {
object Queries {
type Index = Long
val TablePrefix = "ledger"
val LogTable = s"${TablePrefix}_log"
val StateTable = s"${TablePrefix}_state"
def executeBatchSql(
query: String,
params: Iterable[immutable.Seq[NamedParameter]],

View File

@ -7,33 +7,23 @@ import java.sql.Connection
import anorm.SqlParser._
import anorm._
import com.daml.ledger.on.sql.queries.Queries.Index
import com.daml.ledger.on.sql.queries.Queries._
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId
import com.google.protobuf.ByteString
class SqliteQueries extends Queries with CommonQueries {
override def createLogTable()(implicit connection: Connection): Unit = {
SQL"CREATE TABLE IF NOT EXISTS log (sequence_no INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, entry_id VARBINARY(16384) NOT NULL, envelope BLOB NOT NULL)"
.execute()
()
}
override def createStateTable()(implicit connection: Connection): Unit = {
SQL"CREATE TABLE IF NOT EXISTS state (key VARBINARY(16384) PRIMARY KEY NOT NULL, value BLOB NOT NULL)"
.execute()
()
}
override def insertIntoLog(
entry: DamlLogEntryId,
envelope: ByteString,
)(implicit connection: Connection): Index = {
SQL"INSERT INTO log (entry_id, envelope) VALUES (${entry.getEntryId.toByteArray}, ${envelope.toByteArray})"
val entryIdArray = entry.getEntryId.toByteArray
val envelopeArray = envelope.toByteArray
SQL"INSERT INTO #$LogTable (entry_id, envelope) VALUES ($entryIdArray, $envelopeArray)"
.executeInsert()
SQL"SELECT LAST_INSERT_ROWID()"
.as(long("LAST_INSERT_ROWID()").single)
}
override protected val updateStateQuery: String =
"INSERT INTO state VALUES ({key}, {value}) ON CONFLICT(key) DO UPDATE SET value = {value}"
s"INSERT INTO $StateTable VALUES ({key}, {value}) ON CONFLICT(key) DO UPDATE SET value = {value}"
}

View File

@ -0,0 +1,41 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.on.sql
import java.time.Clock
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase.ParticipantState
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref.LedgerString
import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.resources.ResourceOwner
import scala.concurrent.ExecutionContext
abstract class SqlLedgerReaderWriterIntegrationSpecBase(implementationName: String)
extends ParticipantStateIntegrationSpecBase(implementationName) {
protected final implicit val ec: ExecutionContext = ExecutionContext.global
protected def jdbcUrl: String
override final val startIndex: Long = SqlLedgerReaderWriter.StartIndex
override final def participantStateFactory(
participantId: ParticipantId,
ledgerId: LedgerString,
): ResourceOwner[ParticipantState] = {
val currentJdbcUrl = jdbcUrl
newLoggingContext { implicit logCtx =>
SqlLedgerReaderWriter
.owner(ledgerId, participantId, currentJdbcUrl)
.map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter))
}
}
override final def currentRecordTime(): Timestamp =
Timestamp.assertFromInstant(Clock.systemUTC().instant())
}

View File

@ -6,11 +6,11 @@
</encoder>
</appender>
<root level="TRACE">
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
<logger name="com.zaxxer.hikari" level="WARN">
<appender-ref ref="STDOUT"/>
</logger>
<logger name="org.flywaydb" level="WARN"/>
<logger name="com.zaxxer.hikari" level="WARN"/>
</configuration>

View File

@ -3,45 +3,11 @@
package com.daml.ledger.on.sql
import java.nio.file.{Files, Path}
import java.time.Clock
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase.ParticipantState
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref.LedgerString
import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.resources.ResourceOwner
import scala.concurrent.ExecutionContext
import java.nio.file.Files
class H2FileSqlLedgerReaderWriterIntegrationSpec
extends ParticipantStateIntegrationSpecBase("SQL implementation using H2 with a file") {
private implicit val ec: ExecutionContext = ExecutionContext.global
extends SqlLedgerReaderWriterIntegrationSpecBase("SQL implementation using H2 with a file") {
override val startIndex: Long = SqlLedgerReaderWriter.StartIndex
private var directory: Path = _
override def beforeEach(): Unit = {
directory = Files.createTempDirectory(getClass.getSimpleName)
super.beforeEach()
}
override def participantStateFactory(
participantId: ParticipantId,
ledgerId: LedgerString,
): ResourceOwner[ParticipantState] = {
val jdbcUrl = s"jdbc:h2:file:$directory/test"
newLoggingContext { implicit logCtx =>
SqlLedgerReaderWriter
.owner(ledgerId, participantId, jdbcUrl)
.map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter))
}
}
override def currentRecordTime(): Timestamp =
Timestamp.assertFromInstant(Clock.systemUTC().instant())
override def jdbcUrl: String =
s"jdbc:h2:file:${Files.createTempDirectory(getClass.getSimpleName)}/test"
}

View File

@ -3,39 +3,11 @@
package com.daml.ledger.on.sql
import java.time.Clock
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase.ParticipantState
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref.LedgerString
import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.resources.ResourceOwner
import scala.concurrent.ExecutionContext
import scala.util.Random
class H2MemorySqlLedgerReaderWriterIntegrationSpec
extends ParticipantStateIntegrationSpecBase("SQL implementation using H2 in memory") {
private implicit val ec: ExecutionContext = ExecutionContext.global
extends SqlLedgerReaderWriterIntegrationSpecBase("SQL implementation using H2 in memory") {
override val startIndex: Long = SqlLedgerReaderWriter.StartIndex
override def participantStateFactory(
participantId: ParticipantId,
ledgerId: LedgerString,
): ResourceOwner[ParticipantState] = {
val databaseName = s"${getClass.getSimpleName.toLowerCase()}_${Random.nextInt()}"
val jdbcUrl = s"jdbc:h2:mem:$databaseName"
newLoggingContext { implicit logCtx =>
SqlLedgerReaderWriter
.owner(ledgerId, participantId, jdbcUrl)
.map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter))
}
}
override def currentRecordTime(): Timestamp =
Timestamp.assertFromInstant(Clock.systemUTC().instant())
override protected def jdbcUrl: String =
s"jdbc:h2:mem:${getClass.getSimpleName.toLowerCase()}_${Random.nextInt()}"
}

View File

@ -0,0 +1,87 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.on.sql
import java.io.{BufferedReader, FileNotFoundException}
import java.math.BigInteger
import java.nio.charset.Charset
import java.nio.file.Paths
import java.security.MessageDigest
import java.util
import com.daml.ledger.on.sql.ImmutableMigrationsSpec._
import org.flywaydb.core.Flyway
import org.flywaydb.core.api.configuration.FluentConfiguration
import org.flywaydb.core.internal.resource.LoadableResource
import org.flywaydb.core.internal.scanner.{ResourceNameCache, Scanner}
import org.scalatest.Matchers._
import org.scalatest.WordSpec
import scala.collection.JavaConverters._
class ImmutableMigrationsSpec extends WordSpec {
"migration files" should {
"never change, according to their accompanying digest file" in {
val configuration = Flyway
.configure()
.locations(s"classpath:/$migrationsResourcePath")
val resourceScanner = flywayScanner(configuration)
val resources = resourceScanner.getResources("", ".sql").asScala.toSeq
resources.size should be >= 3
resources.foreach { resource =>
val migrationFile = resource.getRelativePath
val digestFile = migrationFile + ".sha256"
val expectedDigest = readExpectedDigest(migrationFile, digestFile, resourceScanner)
val currentDigest = computeCurrentDigest(resource, configuration.getEncoding)
assert(
currentDigest == expectedDigest,
s"""The contents of the migration file "$migrationFile" have changed! Migrations are immutable; you must not change their contents or their digest.""",
)
}
}
}
}
object ImmutableMigrationsSpec {
private val migrationsResourcePath = "com/daml/ledger/on/sql/migrations"
private val migrationsDirectoryPath =
Paths.get(s"ledger/ledger-on-sql/src/main/resources/$migrationsResourcePath")
private def flywayScanner(configuration: FluentConfiguration) =
new Scanner(
classOf[Object],
util.Arrays.asList(configuration.getLocations: _*),
getClass.getClassLoader,
configuration.getEncoding,
new ResourceNameCache,
)
private def readExpectedDigest(
sourceFile: String,
digestFile: String,
resourceScanner: Scanner[_],
): String = {
val resource = Option(resourceScanner.getResource(digestFile))
.getOrElse(
throw new FileNotFoundException(
s""""$digestFile" is missing. If you are introducing a new Flyway migration step, you need to create an SHA-256 digest file by running:
|shasum -a 256 '${migrationsDirectoryPath.resolve(sourceFile)}' \\
| | awk '{print $$1}' \\
| > '${migrationsDirectoryPath.resolve(digestFile)}'
|""".stripMargin,
),
)
new BufferedReader(resource.read()).readLine()
}
private def computeCurrentDigest(resource: LoadableResource, encoding: Charset): String = {
val sha256 = MessageDigest.getInstance("SHA-256")
new BufferedReader(resource.read())
.lines()
.forEach(line => sha256.update((line + "\n").getBytes(encoding)))
val digest = sha256.digest()
String.format(s"%0${digest.length * 2}x", new BigInteger(1, digest))
}
}

View File

@ -3,51 +3,11 @@
package com.daml.ledger.on.sql
import java.sql.DriverManager
import java.time.Clock
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase.ParticipantState
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref.LedgerString
import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.resources.ResourceOwner
import com.digitalasset.testing.postgresql.PostgresAroundAll
import scala.concurrent.ExecutionContext
class PostgresqlSqlLedgerReaderWriterIntegrationSpec
extends ParticipantStateIntegrationSpecBase("SQL implementation using PostgreSQL")
extends SqlLedgerReaderWriterIntegrationSpecBase("SQL implementation using PostgreSQL")
with PostgresAroundAll {
private implicit val ec: ExecutionContext = ExecutionContext.global
override val startIndex: Long = SqlLedgerReaderWriter.StartIndex
override def participantStateFactory(
participantId: ParticipantId,
ledgerId: LedgerString,
): ResourceOwner[ParticipantState] = {
newLoggingContext { implicit logCtx =>
SqlLedgerReaderWriter
.owner(ledgerId, participantId, postgresFixture.jdbcUrl)
.map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter))
}
}
override protected def beforeEach(): Unit = {
super.beforeEach()
val connection = DriverManager.getConnection(postgresFixture.jdbcUrl)
try {
connection.prepareStatement("TRUNCATE log RESTART IDENTITY").execute()
connection.prepareStatement("TRUNCATE state RESTART IDENTITY").execute()
()
} finally {
connection.close()
}
}
override def currentRecordTime(): Timestamp =
Timestamp.assertFromInstant(Clock.systemUTC().instant())
override protected def jdbcUrl: String = createNewDatabase().jdbcUrl
}

View File

@ -3,45 +3,11 @@
package com.daml.ledger.on.sql
import java.nio.file.{Files, Path}
import java.time.Clock
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase.ParticipantState
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref.LedgerString
import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.resources.ResourceOwner
import scala.concurrent.ExecutionContext
import java.nio.file.Files
class SqliteFileSqlLedgerReaderWriterIntegrationSpec
extends ParticipantStateIntegrationSpecBase("SQL implementation using SQLite with a file") {
private implicit val ec: ExecutionContext = ExecutionContext.global
extends SqlLedgerReaderWriterIntegrationSpecBase("SQL implementation using SQLite with a file") {
override val startIndex: Long = SqlLedgerReaderWriter.StartIndex
private var directory: Path = _
override def beforeEach(): Unit = {
directory = Files.createTempDirectory(getClass.getSimpleName)
super.beforeEach()
}
override def participantStateFactory(
participantId: ParticipantId,
ledgerId: LedgerString,
): ResourceOwner[ParticipantState] = {
val jdbcUrl = s"jdbc:sqlite:$directory/test.sqlite"
newLoggingContext { implicit logCtx =>
SqlLedgerReaderWriter
.owner(ledgerId, participantId, jdbcUrl)
.map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter))
}
}
override def currentRecordTime(): Timestamp =
Timestamp.assertFromInstant(Clock.systemUTC().instant())
override def jdbcUrl: String =
s"jdbc:sqlite:${Files.createTempDirectory(getClass.getSimpleName)}/test.sqlite"
}

View File

@ -3,37 +3,8 @@
package com.daml.ledger.on.sql
import java.time.Clock
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase.ParticipantState
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref.LedgerString
import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.resources.ResourceOwner
import scala.concurrent.ExecutionContext
class SqliteMemorySqlLedgerReaderWriterIntegrationSpec
extends ParticipantStateIntegrationSpecBase("SQL implementation using SQLite in memory") {
private implicit val ec: ExecutionContext = ExecutionContext.global
extends SqlLedgerReaderWriterIntegrationSpecBase("SQL implementation using SQLite in memory") {
override val startIndex: Long = SqlLedgerReaderWriter.StartIndex
override def participantStateFactory(
participantId: ParticipantId,
ledgerId: LedgerString,
): ResourceOwner[ParticipantState] = {
val jdbcUrl = s"jdbc:sqlite::memory:"
newLoggingContext { implicit logCtx =>
SqlLedgerReaderWriter
.owner(ledgerId, participantId, jdbcUrl)
.map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter))
}
}
override def currentRecordTime(): Timestamp =
Timestamp.assertFromInstant(Clock.systemUTC().instant())
override protected val jdbcUrl = s"jdbc:sqlite::memory:"
}

View File

@ -27,7 +27,7 @@ import org.scalatest.{Assertion, AsyncWordSpec, BeforeAndAfterEach}
import scala.collection.immutable.HashMap
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Try
abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
@ -35,10 +35,12 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
with BeforeAndAfterEach
with AkkaBeforeAndAfterAll {
var ledgerId: LedgerString = _
var participantStateResource: Resource[ParticipantState] = _
var ps: ParticipantState = _
var rt: Timestamp = _
private implicit val ec: ExecutionContext = ExecutionContext.global
private var ledgerId: LedgerString = _
private var participantStateResource: Resource[ParticipantState] = _
private var ps: ParticipantState = _
private var rt: Timestamp = _
val startIndex: Long = 0
@ -52,8 +54,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
override protected def beforeEach(): Unit = {
super.beforeEach()
ledgerId = Ref.LedgerString.assertFromString(s"ledger-${UUID.randomUUID()}")
participantStateResource =
participantStateFactory(participantId, ledgerId).acquire()(ExecutionContext.global)
participantStateResource = participantStateFactory(participantId, ledgerId).acquire()
ps = Await.result(participantStateResource.asFuture, 10.seconds)
rt = currentRecordTime()
}
@ -168,9 +169,8 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
.take(2)
.runWith(Sink.seq)
} yield {
List(result1, result2, result3).map(
result =>
assert(result == SubmissionResult.Acknowledged, "unexpected response to package upload")
List(result1, result2, result3).map(result =>
assert(result == SubmissionResult.Acknowledged, "unexpected response to package upload"),
)
update2 match {
case (updateOffset: Offset, update: PublicPackageUpload) =>
@ -194,7 +194,8 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
} yield {
assert(
allocResult == SubmissionResult.Acknowledged,
s"unexpected response to party allocation: $allocResult")
s"unexpected response to party allocation: $allocResult",
)
updateTuple match {
case (updateOffset: Offset, update: PartyAddedToParticipant) =>
assert(updateOffset == offset(0, 0))
@ -225,7 +226,8 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
assert(update.recordTime >= rt)
case _ =>
fail(
s"unexpected update message after a party allocation. Error: ${result.description}")
s"unexpected update message after a party allocation. Error: ${result.description}",
)
}
}
@ -245,11 +247,11 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
// second submission is a duplicate, it fails silently
Seq(_, update2) <- ps.stateUpdates(beginAfter = None).take(2).runWith(Sink.seq)
} yield {
List(result1, result2, result3).map(
result =>
assert(
result == SubmissionResult.Acknowledged,
"unexpected response to party allocation")
List(result1, result2, result3).map(result =>
assert(
result == SubmissionResult.Acknowledged,
"unexpected response to party allocation",
),
)
update2 match {
case (updateOffset: Offset, update: PartyAddedToParticipant) =>
@ -257,7 +259,8 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
assert(update.submissionId.contains(submissionIds._2))
case _ =>
fail(
s"unexpected update message after a party allocation. Error: ${result2.description}")
s"unexpected update message after a party allocation. Error: ${result2.description}",
)
}
}
}
@ -279,7 +282,8 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
assert(update.rejectionReason equalsIgnoreCase "Party already exists")
case _ =>
fail(
s"unexpected update message after a party allocation. Error: ${result2.description}")
s"unexpected update message after a party allocation. Error: ${result2.description}",
)
}
}
}
@ -307,19 +311,22 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
.submitTransaction(
submitterInfo(rt, alice, commandIds._1),
transactionMeta(rt),
emptyTransaction)
emptyTransaction,
)
.toScala
_ <- ps
.submitTransaction(
submitterInfo(rt, alice, commandIds._1),
transactionMeta(rt),
emptyTransaction)
emptyTransaction,
)
.toScala
_ <- ps
.submitTransaction(
submitterInfo(rt, alice, commandIds._2),
transactionMeta(rt),
emptyTransaction)
emptyTransaction,
)
.toScala
updates <- ps.stateUpdates(beginAfter = None).take(3).runWith(Sink.seq)
} yield {
@ -371,7 +378,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
submissionId = randomLedgerString(),
config = lic.config.copy(
generation = lic.config.generation + 1,
)
),
)
.toScala
@ -380,7 +387,8 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
.submitTransaction(
submitterInfo(rt, unallocatedParty),
transactionMeta(rt),
emptyTransaction)
emptyTransaction,
)
.toScala
// Allocate a party and try the submission again with an allocated party.
@ -388,7 +396,8 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
.allocateParty(
None /* no name hint, implementation decides party name */,
Some("Somebody"),
randomLedgerString())
randomLedgerString(),
)
.toScala
_ <- assert(allocResult.isInstanceOf[SubmissionResult])
//get the new party off state updates
@ -400,7 +409,8 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
.submitTransaction(
submitterInfo(rt, party = newParty),
transactionMeta(rt),
emptyTransaction)
emptyTransaction,
)
.toScala
Seq((offset1, update1), (offset2, update2), (offset3, update3), (offset4, update4)) <- ps
@ -414,7 +424,8 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
assert(
update2
.asInstanceOf[Update.CommandRejected]
.reason == RejectionReason.PartyNotKnownOnLedger)
.reason == RejectionReason.PartyNotKnownOnLedger,
)
assert(offset2 == offset(1, 0))
assert(update3.isInstanceOf[Update.PartyAddedToParticipant])
assert(offset3 == offset(2, 0))
@ -436,7 +447,8 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
submissionId = randomLedgerString(),
config = lic.config.copy(
generation = lic.config.generation + 1,
))
),
)
.toScala
// Submit another configuration change that uses stale "current config".
@ -449,8 +461,9 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
timeModel = TimeModel(
Duration.ofSeconds(123),
Duration.ofSeconds(123),
Duration.ofSeconds(123)).get
)
Duration.ofSeconds(123),
).get,
),
)
.toScala
@ -479,7 +492,8 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
submissionId = submissionIds._1,
config = lic.config.copy(
generation = lic.config.generation + 1,
))
),
)
.toScala
result2 <- ps
.submitConfiguration(
@ -487,7 +501,8 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
submissionId = submissionIds._1,
config = lic.config.copy(
generation = lic.config.generation + 2,
))
),
)
.toScala
result3 <- ps
.submitConfiguration(
@ -495,26 +510,60 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
submissionId = submissionIds._2,
config = lic.config.copy(
generation = lic.config.generation + 2,
))
),
)
.toScala
// second submission is a duplicate, it fails silently
Seq(_, update2) <- ps.stateUpdates(beginAfter = None).take(2).runWith(Sink.seq)
} yield {
List(result1, result2, result3).map(
result =>
assert(
result == SubmissionResult.Acknowledged,
"unexpected response to configuration change"))
List(result1, result2, result3).map(result =>
assert(
result == SubmissionResult.Acknowledged,
"unexpected response to configuration change",
),
)
update2 match {
case (updateOffset: Offset, update: ConfigurationChanged) =>
assert(updateOffset == offset(2, 0))
assert(update.submissionId == submissionIds._2)
case _ =>
fail(
s"unexpected update message after a configuration change. Error: ${result2.description}")
s"unexpected update message after a configuration change. Error: ${result2.description}",
)
}
}
}
"process commits serially" in {
val partyCount = 1000L
val partyIds = 1L to partyCount
val partyIdDigits = partyCount.toString.length
val partyNames =
partyIds
.map(i => Ref.Party.assertFromString(s"party-%0${partyIdDigits}d".format(i)))
.toVector
val updatesF = ps.stateUpdates(beginAfter = None).take(partyCount).runWith(Sink.seq)
for {
actualAllocations <- Future.sequence(
partyNames.map(name =>
ps.allocateParty(Some(name), Some(name), randomLedgerString()).toScala,
),
)
updates <- updatesF
} yield {
val expectedAllocations = partyIds.map(_ => SubmissionResult.Acknowledged).toVector
assert(actualAllocations == expectedAllocations)
val expectedOffsets = partyIds.map(i => offset(i - 1, 0)).toVector
val actualOffsets = updates.map(_._1).sorted.toVector
assert(actualOffsets == expectedOffsets)
val actualNames =
updates.map(_._2.asInstanceOf[PartyAddedToParticipant].displayName).sorted.toVector
assert(actualNames == partyNames)
}
}
}
}
@ -538,12 +587,12 @@ object ParticipantStateIntegrationSpecBase {
submitter = party,
applicationId = Ref.LedgerString.assertFromString("tests"),
commandId = Ref.LedgerString.assertFromString(commandId),
maxRecordTime = rt.addMicros(Duration.ofSeconds(10).toNanos / 1000)
maxRecordTime = rt.addMicros(Duration.ofSeconds(10).toNanos / 1000),
)
private def transactionMeta(let: Timestamp) = TransactionMeta(
ledgerEffectiveTime = let,
workflowId = Some(Ref.LedgerString.assertFromString("tests"))
workflowId = Some(Ref.LedgerString.assertFromString("tests")),
)
private def matchPackageUpload(
@ -551,7 +600,7 @@ object ParticipantStateIntegrationSpecBase {
submissionId: SubmissionId,
givenOffset: Offset,
expectedArchives: List[DamlLf.Archive],
rt: Timestamp
rt: Timestamp,
): Assertion = updateTuple match {
case (updateOffset: Offset, update: PublicPackageUpload) =>
assert(update.submissionId.contains(submissionId))
@ -566,7 +615,8 @@ object ParticipantStateIntegrationSpecBase {
updateTuple: (Offset, Update),
commandId: String,
givenOffset: Offset,
rt: Timestamp): Assertion = updateTuple match {
rt: Timestamp,
): Assertion = updateTuple match {
case (updateOffset: Offset, update: TransactionAccepted) =>
update.optSubmitterInfo match {
case Some(info) =>

View File

@ -14,6 +14,7 @@ import org.apache.commons.io.{FileUtils, IOUtils}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.util.Random
import scala.util.control.NonFatal
trait PostgresAround {
@ -36,8 +37,8 @@ trait PostgresAround {
initializeDatabase()
createConfigFile()
startPostgres()
createTestDatabase()
logger.info("PostgreSQL has started.")
createTestDatabase(databaseName)
logger.info(s"PostgreSQL has started on port $port.")
} catch {
case NonFatal(e) =>
stopPostgres()
@ -57,7 +58,8 @@ trait PostgresAround {
protected def startPostgres(): Unit = {
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException(
"Attempted to start PostgreSQL, but it has already been started.")
"Attempted to start PostgreSQL, but it has already been started.",
)
}
try {
run(
@ -97,6 +99,14 @@ trait PostgresAround {
}
}
protected def createNewDatabase(prefix: String = "test_"): PostgresFixture = {
val newDatabaseName = s"$prefix${Random.nextInt(Int.MaxValue)}"
createTestDatabase(newDatabaseName)
val jdbcUrl =
s"jdbc:postgresql://$hostName:${postgresFixture.port}/$newDatabaseName?user=$userName"
postgresFixture.copy(jdbcUrl = jdbcUrl)
}
private def initializeDatabase(): Unit = run(
"initialize the PostgreSQL database",
Tool.initdb,
@ -106,7 +116,7 @@ trait PostgresAround {
"UNICODE",
"-A",
"trust",
postgresFixture.dataDir.toString.replaceAllLiterally("\\", "/")
postgresFixture.dataDir.toString.replaceAllLiterally("\\", "/"),
)
private def createConfigFile(): Unit = {
@ -131,7 +141,7 @@ trait PostgresAround {
()
}
private def createTestDatabase(): Unit = run(
private def createTestDatabase(name: String): Unit = run(
"create the database",
Tool.createdb,
"-h",
@ -140,7 +150,7 @@ trait PostgresAround {
userName,
"-p",
postgresFixture.port.toString,
databaseName,
name,
)
private def run(description: String, tool: Tool, args: String*): Unit = {
@ -209,7 +219,7 @@ object PostgresAround {
stderr.map(output => s"\nSTDERR:\n$output"),
Some(logs).filter(_.nonEmpty).map(lines => s"\nLogs:\n${lines.mkString("\n")}"),
).flatten.mkString("\n"),
cause
cause,
) {
def this(
description: String,