Ledger API Server H2 Database support (#2425)

* Ledger API Server H2 Database support (#2528)

* Disable parallel batch ledger append due to lack of H2DB isolation
This commit is contained in:
Oliver Seeliger 2019-08-28 14:14:21 +02:00 committed by GitHub
parent 12dee7fd64
commit 834a06a397
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 556 additions and 133 deletions

View File

@ -94,6 +94,7 @@ da_scala_binary(
visibility = ["//visibility:public"],
deps = [
":sandbox",
"//3rdparty/jvm/com/h2database:h2",
],
)
@ -142,6 +143,7 @@ testDependencies = [
"//3rdparty/jvm/org/scalacheck:scalacheck",
"//3rdparty/jvm/org/awaitility:awaitility",
"//3rdparty/jvm/commons_io:commons_io",
"//3rdparty/jvm/com/h2database:h2",
"//bazel_tools/runfiles:scala_runfiles",
] + compileDependencies
@ -250,6 +252,32 @@ client_server_test(
],
) if not is_windows else None
client_server_test(
name = "conformance-test-static-time-h2database",
timeout = "long",
client = "//ledger/ledger-api-test-tool",
client_args = [
"--all-tests",
"--timeout-scale-factor=10",
],
data = [
"//ledger/test-common:dar-files",
],
server = "//ledger/sandbox:sandbox-binary",
server_args = [
"--port 0",
"--static-time",
"--eager-package-loading",
"--sql-backend-jdbcurl jdbc:h2:mem:static_time",
],
server_files = [
"$(rootpaths //ledger/test-common:dar-files)",
],
tags = [
"dont-run-on-darwin",
],
) if not is_windows else None
client_server_test(
name = "conformance-test-wall-clock-postgres",
timeout = "long",
@ -279,3 +307,30 @@ client_server_test(
"dont-run-on-darwin",
],
) if not is_windows else None
client_server_test(
name = "conformance-test-wall-clock-h2database",
timeout = "long",
client = "//ledger/ledger-api-test-tool:ledger-api-test-tool",
client_args = [
"--all-tests",
"--exclude TimeIT",
"--timeout-scale-factor=10",
],
data = [
"//ledger/test-common:dar-files",
],
server = "//ledger/sandbox:sandbox-binary",
server_args = [
"--port 0",
"--wall-clock-time",
"--eager-package-loading",
"--sql-backend-jdbcurl jdbc:h2:mem:wall_clock_time",
],
server_files = [
"$(rootpaths //ledger/test-common:dar-files)",
],
tags = [
"dont-run-on-darwin",
],
) if not is_windows else None

View File

@ -0,0 +1 @@
368c73056ea4b2bbdc5704e2334ac4cb4c9e5fcae886f662719ceca2f1b02401

View File

@ -0,0 +1,241 @@
-- Copyright (c) 2019 The DAML Authors. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
-- PART 1: copied and adapted from postgres/V1__Init.sql
-- Stores the history of the ledger -- mostly transactions. This table
-- is immutable in the sense that rows can never be modified, only
-- added.
CREATE TABLE ledger_entries
(
-- Every entry is indexed by a monotonically growing integer. That is,
-- new rows can only have a ledger_offet which is greater than the
-- larger ledger_offset in ledger_entries. However, note that there
-- might be gaps in the series formed by all the ledger_offsets in the
-- table.
ledger_offset bigint primary key not null,
-- one of 'transaction', 'rejection', or 'checkpoint' -- also see
-- check_entry below. note that we _could_ store the different entries
-- in different tables, but as of now we deem more convient having a
-- single table even if it imposes the constraint below, since this
-- table represents a single unified stream of events and partitioning
-- it across tables would be more inconvient. We might revise this in
-- the future.
typ varchar not null,
-- see ledger API definition for more infos on some of these these fields
transaction_id varchar unique,
command_id varchar,
application_id varchar,
submitter varchar,
workflow_id varchar,
effective_at timestamp, -- with time zone
recorded_at timestamp not null, -- with time zone
-- The transaction is stored using the .proto definition in
-- `daml-lf/transaction/src/main/protobuf/com/digitalasset/daml/lf/transaction.proto`, and
-- encoded using
-- `daml-lf/transaction/src/main/protobuf/com/digitalasset/daml/lf/transaction.proto`.
transaction bytea,
rejection_type varchar,
rejection_description varchar,
-- note that this is not supposed to be a complete check, for example we do not check
-- that fields that are not supposed to be present are indeed null.
constraint check_entry
check (
(typ = 'transaction' and transaction_id != null and command_id != null and application_id != null and
submitter != null and effective_at != null and transaction != null) or
(typ = 'rejection' and command_id != null and application_id != null and submitter != null and
rejection_type != null and rejection_description != null) or
(typ = 'checkpoint'))
);
-- This embodies the deduplication in the Ledger API.
CREATE UNIQUE INDEX idx_transactions_deduplication
ON ledger_entries (command_id, application_id);
CREATE TABLE disclosures (
transaction_id varchar not null,
event_id varchar not null,
party varchar not null,
foreign key (transaction_id) references ledger_entries (transaction_id)
);
-- Note that technically this information is all present in `ledger_entries`,
-- but we store it in this form since it would not be viable to traverse
-- the entries every time we need to gain information as a contract. It's essentially
-- a materialized view of the contracts state.
CREATE TABLE contracts (
id varchar primary key not null,
-- this is the transaction id that _originated_ the contract.
transaction_id varchar not null,
-- this is the workflow id of the transaction above. note that this is
-- a denormalization -- we could simply look up in the transaction table.
-- we cache it here since we do not want to risk impacting performance
-- by looking it up in `ledger_entries`, however we should verify this
-- claim.
workflow_id varchar,
-- This tuple is currently included in `contract`, since we encode
-- the full value including all the identifiers. However we plan to
-- move to a more compact representation that would need a pointer to
-- the "top level" value type, and therefore we store the identifier
-- here separately.
package_id varchar not null,
-- using the QualifiedName#toString format
name varchar not null,
-- this is denormalized much like `transaction_id` -- see comment above.
create_offset bigint not null,
-- this on the other hand _cannot_ be easily found out by looking into
-- `ledger_entries` -- you'd have to traverse from `create_offset` which
-- would be prohibitively expensive.
archive_offset bigint,
-- the serialized contract value, using the definition in
-- `daml-lf/transaction/src/main/protobuf/com/digitalasset/daml/lf/value.proto`
-- and the encoder in `ContractSerializer.scala`.
contract bytea not null,
-- only present in contracts for templates that have a contract key definition.
-- encoded using the definition in
-- `daml-lf/transaction/src/main/protobuf/com/digitalasset/daml/lf/value.proto`.
key bytea,
foreign key (transaction_id) references ledger_entries (transaction_id),
foreign key (create_offset) references ledger_entries (ledger_offset),
foreign key (archive_offset) references ledger_entries (ledger_offset)
);
-- These two indices below could be a source performance bottleneck. Every additional index slows
-- down insertion. The contracts table will grow endlessly and the sole purpose of these indices is
-- to make ACS queries performant, while sacrificing insertion speed.
CREATE INDEX idx_contract_create_offset
ON contracts (create_offset);
CREATE INDEX idx_contract_archive_offset
ON contracts (archive_offset);
-- TODO what's the difference between this and `diclosures`? If we can rely on `event_id`
-- being the `contract_id`, isn't `disclosures` enough?
CREATE TABLE contract_witnesses (
contract_id varchar not null,
witness varchar not null,
foreign key (contract_id) references contracts (id)
);
CREATE UNIQUE INDEX contract_witnesses_idx
ON contract_witnesses (contract_id, witness);
CREATE TABLE contract_key_maintainers (
contract_id varchar not null,
maintainer varchar not null,
foreign key (contract_id) references contracts (id)
);
CREATE UNIQUE INDEX contract_key_maintainers_idx
ON contract_key_maintainers (contract_id, maintainer);
-- this table is meant to have a single row storing all the parameters we have
CREATE TABLE parameters (
-- the generated or configured id identifying the ledger
ledger_id varchar not null,
-- stores the head offset, meant to change with every new ledger entry
ledger_end bigint not null,
-- the external ledger offset that corresponds to the index ledger end (added in Postgres V6)
external_ledger_end varchar
);
-- table to store a mapping from (template_id, contract value) to contract_id
-- contract values are binary blobs of unbounded size, the table therefore only stores a hash of the value
-- and relies for the hash to be collision free
CREATE TABLE contract_keys (
package_id varchar not null,
-- using the QualifiedName#toString format
name varchar not null,
-- stable SHA256 of the protobuf serialized key value, produced using
-- `KeyHasher.scala`.
value_hash varchar not null,
contract_id varchar not null,
PRIMARY KEY (package_id, name, value_hash),
foreign key (contract_id) references contracts (id)
);
-- PART 2: copied and adapted from postgres/V2_0__Contract_divulgence.sql
---------------------------------------------------------------------------------------------------
-- V2: Contract divulgence
--
-- This schema version adds a table for tracking contract divulgence.
-- This is required for making sure contracts can only be fetched by parties that see the contract.
---------------------------------------------------------------------------------------------------
CREATE TABLE contract_divulgences (
contract_id varchar not null,
-- The party to which the given contract was divulged
party varchar not null,
-- The offset at which the contract was divulged to the given party
ledger_offset bigint not null,
-- The transaction ID at which the contract was divulged to the given party
transaction_id varchar not null,
foreign key (contract_id) references contracts (id),
foreign key (ledger_offset) references ledger_entries (ledger_offset),
foreign key (transaction_id) references ledger_entries (transaction_id),
CONSTRAINT contract_divulgences_idx UNIQUE(contract_id, party)
);
-- PART 3: n/a scala-migration only, not applicable to newly introduced database types
-- PART 4: adopted unmodified from postgres/V4_0__Add_parties.sql
---------------------------------------------------------------------------------------------------
-- V4: List of parties
--
-- This schema version adds a table for tracking known parties.
-- In the sandbox, parties are added implicitly when they are first mentioned in a transaction,
-- or explicitly through an API call.
---------------------------------------------------------------------------------------------------
CREATE TABLE parties (
-- The unique identifier of the party
party varchar primary key not null,
-- A human readable name of the party, might not be unique
display_name varchar,
-- True iff the party was added explicitly through an API call
explicit bool not null,
-- For implicitly added parties: the offset of the transaction that introduced the party
-- For explicitly added parties: the ledger end at the time when the party was added
ledger_offset bigint
);
-- PART 5: copied and adapted from postgres/V5__Add_packages.sql
---------------------------------------------------------------------------------------------------
-- V5: List of packages
--
-- This schema version adds a table for tracking DAML-LF packages.
-- Previously, packages were only stored in memory and needed to be specified through the CLI.
---------------------------------------------------------------------------------------------------
CREATE TABLE packages (
-- The unique identifier of the package (the hash of its content)
package_id varchar primary key not null,
-- Packages are uploaded as DAR files (i.e., in groups)
-- This field can be used to find out which packages were uploaded together
upload_id varchar not null,
-- A human readable description of the package source
source_description varchar,
-- The size of the archive payload (i.e., the serialized DAML-LF package), in bytes
size bigint not null,
-- The time when the package was added
known_since timestamp not null, -- with time zone
-- The ledger end at the time when the package was added
ledger_offset bigint not null,
-- The DAML-LF archive, serialized using the protobuf message `daml_lf.Archive`.
-- See also `daml-lf/archive/da/daml_lf.proto`.
package bytea not null
);
-- PART 6: postgres/V6__External_Ledger_Offset.sql not needed as parameters.external_ledger_offset already in PART 1

View File

@ -21,7 +21,7 @@ import com.digitalasset.platform.sandbox.stores.ledger.{
import scala.concurrent.Future
object PostgresIndex {
object JdbcIndex {
def apply(
readService: ReadService,
ledgerId: LedgerId,
@ -30,7 +30,7 @@ object PostgresIndex {
implicit mat: Materializer,
mm: MetricsManager): Future[IndexService with AutoCloseable] =
Ledger
.postgresReadOnly(jdbcUrl, ledgerId)
.jdbcBackedReadOnly(jdbcUrl, ledgerId)
.map { ledger =>
val contractStore = new SandboxContractStore(ledger)
new LedgerBackedIndexService(MeteredReadOnlyLedger(ledger), contractStore, participantId) {

View File

@ -30,7 +30,7 @@ import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlLedger.{
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.{
LedgerDao,
PersistenceEntry,
PostgresLedgerDao
JdbcLedgerDao
}
import com.digitalasset.platform.sandbox.stores.ledger.sql.serialisation.{
ContractSerializer,
@ -47,11 +47,11 @@ import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.control.NonFatal
import scala.util.{Failure, Success}
object PostgresIndexer {
private val logger = LoggerFactory.getLogger(classOf[PostgresIndexer])
object JdbcIndexer {
private val logger = LoggerFactory.getLogger(classOf[JdbcIndexer])
private[index] val asyncTolerance = 30.seconds
def create(readService: ReadService, jdbcUrl: String): Future[PostgresIndexer] = {
def create(readService: ReadService, jdbcUrl: String): Future[JdbcIndexer] = {
val actorSystem = ActorSystem("postgres-indexer")
val materializer: ActorMaterializer = ActorMaterializer()(actorSystem)
val metricsManager = MetricsManager(false)
@ -70,7 +70,7 @@ object PostgresIndexer {
ledgerEnd <- ledgerDao.lookupLedgerEnd()
externalOffset <- ledgerDao.lookupExternalLedgerEnd()
} yield {
new PostgresIndexer(ledgerEnd, externalOffset, ledgerDao)(materializer) {
new JdbcIndexer(ledgerEnd, externalOffset, ledgerDao)(materializer) {
override def close(): Unit = {
super.close()
materializer.shutdown()
@ -82,14 +82,17 @@ object PostgresIndexer {
}
private def initializeDao(jdbcUrl: String, mm: MetricsManager) = {
val dbDispatcher = DbDispatcher(jdbcUrl, noOfShortLivedConnections, noOfStreamingConnections)
val dbType = JdbcLedgerDao.jdbcType(jdbcUrl)
val dbDispatcher =
DbDispatcher(jdbcUrl, dbType, noOfShortLivedConnections, noOfStreamingConnections)
val ledgerDao = LedgerDao.metered(
PostgresLedgerDao(
JdbcLedgerDao(
dbDispatcher,
ContractSerializer,
TransactionSerializer,
ValueSerializer,
KeyHasher))(mm)
KeyHasher,
dbType))(mm)
ledgerDao
}
@ -123,7 +126,7 @@ object PostgresIndexer {
* @param beginAfterExternalOffset The last offset received from the read service.
* This offset has inclusive semantics,
*/
class PostgresIndexer private (
class JdbcIndexer private (
initialInternalOffset: Long,
beginAfterExternalOffset: Option[LedgerString],
ledgerDao: LedgerDao)(implicit mat: Materializer)
@ -175,7 +178,7 @@ class PostgresIndexer private (
case PublicPackageUploaded(archive, sourceDescription, _, _) =>
val uploadId = UUID.randomUUID().toString
val uploadInstant = Instant.now()
val uploadInstant = Instant.now() // TODO: use PublicPackageUploaded.recordTime for multi-ledgers (#2635)
val packages: List[(DamlLf.Archive, v2.PackageDetails)] = List(
archive -> v2.PackageDetails(
size = archive.getPayload.size.toLong,

View File

@ -10,7 +10,6 @@ import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.daml.ledger.participant.state.v1.{ParticipantId, ReadService, WriteService}
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.engine.Engine
import com.digitalasset.grpc.adapter.ExecutionSequencerFactory
@ -140,7 +139,7 @@ class StandaloneIndexServer(
val initF = for {
cond <- readService.getLedgerInitialConditions().runWith(Sink.head)
indexService <- PostgresIndex(
indexService <- JdbcIndex(
readService,
domain.LedgerId(cond.ledgerId),
participantId,
@ -162,7 +161,7 @@ class StandaloneIndexServer(
writeService,
indexService,
StandaloneIndexServer.engine,
TimeProvider.UTC,
config.timeProvider,
timeModel,
SandboxConfig.defaultCommandConfig,
None)(am, esf),

View File

@ -17,14 +17,14 @@ object StandaloneIndexerServer {
private val logger = LoggerFactory.getLogger(this.getClass)
def apply(readService: ReadService, jdbcUrl: String): AutoCloseable = {
val server = PostgresIndexer.create(readService, jdbcUrl)
val server = JdbcIndexer.create(readService, jdbcUrl)
val indexHandleF = server.flatMap(
_.subscribe(
readService,
t => logger.error("error while processing state updates", t),
() => logger.info("successfully finished processing state updates")))(DEC)
val indexFeedHandle = Await.result(indexHandleF, PostgresIndexer.asyncTolerance)
val indexFeedHandle = Await.result(indexHandleF, JdbcIndexer.asyncTolerance)
logger.info("Started Indexer Server")
val closed = new AtomicBoolean(false)
@ -32,7 +32,7 @@ object StandaloneIndexerServer {
new AutoCloseable {
override def close(): Unit = {
if (closed.compareAndSet(false, true)) {
val _ = Await.result(indexFeedHandle.stop(), PostgresIndexer.asyncTolerance)
val _ = Await.result(indexFeedHandle.stop(), JdbcIndexer.asyncTolerance)
}
}
}

View File

@ -5,6 +5,7 @@ package com.digitalasset.platform.index.config
import java.io.File
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.ledger.api.tls.TlsConfiguration
final case class Config(
@ -12,6 +13,7 @@ final case class Config(
portFile: Option[File],
archiveFiles: List[File],
maxInboundMessageSize: Int,
timeProvider: TimeProvider, // enables use of non-wall-clock time in tests
jdbcUrl: String,
tlsConfig: Option[TlsConfiguration]
)
@ -19,5 +21,5 @@ final case class Config(
object Config {
val DefaultMaxInboundMessageSize = 4194304
def default: Config =
new Config(0, None, List.empty, 4194304, "", None)
new Config(0, None, List.empty, 4194304, TimeProvider.UTC, "", None)
}

View File

@ -74,7 +74,7 @@ object SandboxIndexAndWriteService {
implicit mat: Materializer,
mm: MetricsManager): Future[IndexAndWriteService] =
Ledger
.postgres(
.jdbcBacked(
jdbcUrl,
ledgerId,
timeProvider,
@ -353,7 +353,8 @@ abstract class LedgerBackedIndexService(
Checkpoint(
domain.LedgerOffset.Absolute(Ref.LedgerString.assertFromString(offset.toString)),
c.recordedAt)
case (offset, r: LedgerEntry.Rejection) =>
case (offset, r: LedgerEntry.Rejection)
if r.commandId.nonEmpty && r.applicationId.contains(applicationId.unwrap) =>
CommandRejected(
domain.LedgerOffset.Absolute(Ref.LedgerString.assertFromString(offset.toString)),
r.recordTime,

View File

@ -105,7 +105,7 @@ object Ledger {
new InMemoryLedger(ledgerId, timeProvider, acs, packages, ledgerEntries)
/**
* Creates a Postgres backed ledger
* Creates a JDBC backed ledger
*
* @param jdbcUrl the jdbc url string containing the username and password as well
* @param ledgerId the id to be used for the ledger
@ -116,7 +116,7 @@ object Ledger {
* @param startMode whether the ledger should be reset, or continued where it was
* @return a Postgres backed Ledger
*/
def postgres(
def jdbcBacked(
jdbcUrl: String,
ledgerId: LedgerId,
timeProvider: TimeProvider,
@ -137,14 +137,14 @@ object Ledger {
startMode)
/**
* Creates a Postgres backed read only ledger
* Creates a JDBC backed read only ledger
*
* @param jdbcUrl the jdbc url string containing the username and password as well
* @param ledgerId the id to be used for the ledger
* @param timeProvider the provider of time
* @return a Postgres backed Ledger
* @return a jdbc backed Ledger
*/
def postgresReadOnly(
def jdbcBackedReadOnly(
jdbcUrl: String,
ledgerId: LedgerId,
)(implicit mat: Materializer, mm: MetricsManager): Future[ReadOnlyLedger] =

View File

@ -24,7 +24,7 @@ import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.{
LedgerDao,
LedgerReadDao,
PostgresLedgerDao
JdbcLedgerDao
}
import com.digitalasset.platform.sandbox.stores.ledger.sql.serialisation.{
ContractSerializer,
@ -52,14 +52,17 @@ object ReadOnlySqlLedger {
mm: MetricsManager): Future[ReadOnlyLedger] = {
implicit val ec: ExecutionContext = DEC
val dbDispatcher = DbDispatcher(jdbcUrl, noOfShortLivedConnections, noOfStreamingConnections)
val dbType = JdbcLedgerDao.jdbcType(jdbcUrl)
val dbDispatcher =
DbDispatcher(jdbcUrl, dbType, noOfShortLivedConnections, noOfStreamingConnections)
val ledgerReadDao = LedgerDao.meteredRead(
PostgresLedgerDao(
JdbcLedgerDao(
dbDispatcher,
ContractSerializer,
TransactionSerializer,
ValueSerializer,
KeyHasher))
KeyHasher,
dbType))
ReadOnlySqlLedgerFactory(ledgerReadDao).createReadOnlySqlLedger(ledgerId)
}

View File

@ -84,14 +84,17 @@ object SqlLedger {
mm: MetricsManager): Future[Ledger] = {
implicit val ec: ExecutionContext = DEC
val dbDispatcher = DbDispatcher(jdbcUrl, noOfShortLivedConnections, noOfStreamingConnections)
val dbType = JdbcLedgerDao.jdbcType(jdbcUrl)
val dbDispatcher =
DbDispatcher(jdbcUrl, dbType, noOfShortLivedConnections, noOfStreamingConnections)
val ledgerDao = LedgerDao.metered(
PostgresLedgerDao(
JdbcLedgerDao(
dbDispatcher,
ContractSerializer,
TransactionSerializer,
ValueSerializer,
KeyHasher))
KeyHasher,
dbType))
val sqlLedgerFactory = SqlLedgerFactory(ledgerDao)
@ -102,7 +105,8 @@ object SqlLedger {
acs,
packages,
initialLedgerEntries,
queueDepth)
queueDepth,
dbType.supportsParallelLedgerAppend)
}
}
@ -112,7 +116,8 @@ private class SqlLedger(
ledgerDao: LedgerDao,
timeProvider: TimeProvider,
packages: InMemoryPackageStore,
queueDepth: Int)(implicit mat: Materializer)
queueDepth: Int,
parallelLedgerAppend: Boolean)(implicit mat: Materializer)
extends Ledger {
import SqlLedger._
@ -168,12 +173,13 @@ private class SqlLedger(
SourceShape(merge.out)
})
// We process the requests in batches when under pressure (see semantics of `batch`). Note
// By default we process the requests in batches when under pressure (see semantics of `batch`). Note
// that this is safe on the read end because the readers rely on the dispatchers to know the
// ledger end, and not the database itself. This means that they will not start reading from the new
// ledger end until we tell them so, which we do when _all_ the entries have been committed.
val maxBatchSize = if (parallelLedgerAppend) noOfShortLivedConnections * 2L else 1L
mergedSources
.batch(noOfShortLivedConnections * 2L, e => Queue(e))((batch, e) => batch :+ e)
.batch(maxBatchSize, e => Queue(e))((batch, e) => batch :+ e)
.mapAsync(1) { queue =>
val startOffset = headRef // we can only do this because there is no parallelism here!
//shooting the SQL queries in parallel
@ -380,6 +386,7 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
* used if starting from a fresh database.
* @param queueDepth the depth of the buffer for persisting entries. When gets full, the system will signal back-pressure
* upstream
* @param parallelLedgerAppend whether to append to the ledger in parallelized batches
* @return a compliant Ledger implementation
*/
def createSqlLedger(
@ -389,7 +396,9 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
acs: InMemoryActiveContracts,
packages: InMemoryPackageStore,
initialLedgerEntries: ImmArray[LedgerEntryOrBump],
queueDepth: Int)(implicit mat: Materializer): Future[SqlLedger] = {
queueDepth: Int,
parallelLedgerAppend: Boolean
)(implicit mat: Materializer): Future[SqlLedger] = {
@SuppressWarnings(Array("org.wartremover.warts.ExplicitImplicitTypes"))
implicit val ec = DEC
@ -406,7 +415,15 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
for {
ledgerId <- init()
ledgerEnd <- ledgerDao.lookupLedgerEnd()
} yield new SqlLedger(ledgerId, ledgerEnd, ledgerDao, timeProvider, packages, queueDepth)
} yield
new SqlLedger(
ledgerId,
ledgerEnd,
ledgerDao,
timeProvider,
packages,
queueDepth,
parallelLedgerAppend)
}
private def reset(): Future[Unit] =

View File

@ -27,6 +27,7 @@ trait JdbcConnectionProvider extends AutoCloseable {
class HikariJdbcConnectionProvider(
jdbcUrl: String,
dbType: JdbcLedgerDao.DbType,
noOfShortLivedConnections: Int,
noOfStreamingConnections: Int)
extends JdbcConnectionProvider {
@ -60,7 +61,7 @@ class HikariJdbcConnectionProvider(
new HikariDataSource(config)
}
private val flyway = FlywayMigrations(shortLivedDataSource)
private val flyway = FlywayMigrations(shortLivedDataSource, dbType)
flyway.migrate()
override def runSQL[T](block: Connection => T): T = {
@ -95,7 +96,12 @@ class HikariJdbcConnectionProvider(
object HikariJdbcConnectionProvider {
def apply(
jdbcUrl: String,
dbType: JdbcLedgerDao.DbType,
noOfShortLivedConnections: Int,
noOfStreamingConnections: Int): JdbcConnectionProvider =
new HikariJdbcConnectionProvider(jdbcUrl, noOfShortLivedConnections, noOfStreamingConnections)
new HikariJdbcConnectionProvider(
jdbcUrl,
dbType,
noOfShortLivedConnections,
noOfStreamingConnections)
}

View File

@ -45,12 +45,13 @@ import scala.concurrent.Future
import scala.util.Try
import scala.util.control.NonFatal
private class PostgresLedgerDao(
private class JdbcLedgerDao(
dbDispatcher: DbDispatcher,
contractSerializer: ContractSerializer,
transactionSerializer: TransactionSerializer,
valueSerializer: ValueSerializer,
keyHasher: KeyHasher)
keyHasher: KeyHasher,
dbType: JdbcLedgerDao.DbType)
extends LedgerDao {
private val logger = LoggerFactory.getLogger(getClass)
@ -177,7 +178,7 @@ private class PostgresLedgerDao(
private def storeContracts(offset: Long, contracts: immutable.Seq[Contract])(
implicit connection: Connection): Unit = {
// A ACS contract contaixns several collections (e.g., witnesses or divulgences).
// An ACS contract contains several collections (e.g., witnesses or divulgences).
// The contract is therefore stored in several SQL tables.
// Part 1: insert the contract data into the 'contracts' table
@ -253,7 +254,7 @@ private class PostgresLedgerDao(
if (!namedDivulgenceParams.isEmpty) {
executeBatchSql(
SQL_BATCH_INSERT_DIVULGENCES_FROM_TRANSACTION_ID,
dbType.SQL_BATCH_INSERT_DIVULGENCES_FROM_TRANSACTION_ID,
namedDivulgenceParams
)
}
@ -274,7 +275,7 @@ private class PostgresLedgerDao(
if (!namedDivulgenceParams.isEmpty) {
executeBatchSql(
SQL_BATCH_INSERT_DIVULGENCES,
dbType.SQL_BATCH_INSERT_DIVULGENCES,
namedDivulgenceParams
)
}
@ -323,36 +324,10 @@ private class PostgresLedgerDao(
private val SQL_BATCH_INSERT_DISCLOSURES =
"insert into disclosures(transaction_id, event_id, party) values({transaction_id}, {event_id}, {party})"
// Note: the SQL backend may receive divulgence information for the same (contract, party) tuple
// more than once through BlindingInfo.globalImplicitDisclosure.
// The ledger offsets for the same (contract, party) tuple should always be increasing, and the database
// stores the offset at which the contract was first disclosed.
// We therefore don't need to update anything if there is already some data for the given (contract, party) tuple.
private val SQL_BATCH_INSERT_DIVULGENCES =
"""insert into contract_divulgences(contract_id, party, ledger_offset, transaction_id)
|values({contract_id}, {party}, {ledger_offset}, {transaction_id})
|on conflict on constraint contract_divulgences_idx
|do nothing""".stripMargin
private val SQL_BATCH_INSERT_DIVULGENCES_FROM_TRANSACTION_ID =
"""insert into contract_divulgences(contract_id, party, ledger_offset, transaction_id)
|select {contract_id}, {party}, ledger_offset, {transaction_id}
|from ledger_entries
|where transaction_id={transaction_id}
|on conflict on constraint contract_divulgences_idx
|do nothing""".stripMargin
private val SQL_INSERT_CHECKPOINT =
SQL(
"insert into ledger_entries(typ, ledger_offset, recorded_at) values('checkpoint', {ledger_offset}, {recorded_at})")
private val SQL_IMPLICITLY_INSERT_PARTIES =
"""insert into parties(party, explicit, ledger_offset)
|values({name}, {explicit}, {ledger_offset})
|on conflict (party)
|do nothing
|""".stripMargin
/**
* Updates the active contract set from the given DAML transaction.
* Note: This involves checking the validity of the given DAML transaction.
@ -405,7 +380,7 @@ private class PostgresLedgerDao(
"explicit" -> false
))
if (partyParams.nonEmpty) {
executeBatchSql(SQL_IMPLICITLY_INSERT_PARTIES, partyParams)
executeBatchSql(dbType.SQL_IMPLICITLY_INSERT_PARTIES, partyParams)
}
this
}
@ -429,7 +404,7 @@ private class PostgresLedgerDao(
// Do we need here the equivalent to 'contracts.intersectWith(global)', used in the in-memory
// implementation of implicitlyDisclose?
if (divulgenceParams.nonEmpty) {
executeBatchSql(SQL_BATCH_INSERT_DIVULGENCES, divulgenceParams)
executeBatchSql(dbType.SQL_BATCH_INSERT_DIVULGENCES, divulgenceParams)
}
this
}
@ -564,7 +539,7 @@ private class PostgresLedgerDao(
}
} getOrElse Ok
}.recover {
case NonFatal(e) if (e.getMessage.contains("duplicate key")) =>
case NonFatal(e) if e.getMessage.contains(dbType.DUPLICATE_KEY_ERROR) =>
logger.warn(
"Ignoring duplicate submission for applicationId {}, commandId {}",
tx.applicationId: Any,
@ -1002,7 +977,7 @@ private class PostgresLedgerDao(
.execute()
PersistenceResponse.Ok
}.recover {
case NonFatal(e) if e.getMessage.contains("duplicate key") =>
case NonFatal(e) if e.getMessage.contains(dbType.DUPLICATE_KEY_ERROR) =>
logger.warn("Party with ID {} already exists", party)
conn.rollback()
PersistenceResponse.Duplicate
@ -1010,13 +985,6 @@ private class PostgresLedgerDao(
}
}
private val SQL_INSERT_PACKAGE =
"""insert into packages(package_id, upload_id, source_description, size, known_since, ledger_offset, package)
|select {package_id}, {upload_id}, {source_description}, {size}, {known_since}, ledger_end, {package}
|from parameters
|on conflict (package_id) do nothing
|""".stripMargin
private val SQL_SELECT_PACKAGES =
SQL("""select package_id, source_description, known_since, size
|from packages
@ -1088,7 +1056,7 @@ private class PostgresLedgerDao(
"package" -> p._1.toByteArray
)
)
val updated = executeBatchSql(SQL_INSERT_PACKAGE, params).map(math.max(0, _)).sum
val updated = executeBatchSql(dbType.SQL_INSERT_PACKAGE, params).map(math.max(0, _)).sum
val duplicates = packages.length - updated
Map(
@ -1127,17 +1095,112 @@ private class PostgresLedgerDao(
}
object PostgresLedgerDao {
object JdbcLedgerDao {
def apply(
dbDispatcher: DbDispatcher,
contractSerializer: ContractSerializer,
transactionSerializer: TransactionSerializer,
valueSerializer: ValueSerializer,
keyHasher: KeyHasher): LedgerDao =
new PostgresLedgerDao(
keyHasher: KeyHasher,
dbType: JdbcLedgerDao.DbType): LedgerDao =
new JdbcLedgerDao(
dbDispatcher,
contractSerializer,
transactionSerializer,
valueSerializer,
keyHasher)
keyHasher,
dbType)
sealed trait DbType {
def name: String
val supportsParallelLedgerAppend: Boolean = true
// SQL statements using the proprietary Postgres on conflict .. do nothing clause
protected[JdbcLedgerDao] def SQL_INSERT_PACKAGE: String
protected[JdbcLedgerDao] def SQL_IMPLICITLY_INSERT_PARTIES: String
// Note: the SQL backend may receive divulgence information for the same (contract, party) tuple
// more than once through BlindingInfo.globalImplicitDisclosure.
// The ledger offsets for the same (contract, party) tuple should always be increasing, and the database
// stores the offset at which the contract was first disclosed.
// We therefore don't need to update anything if there is already some data for the given (contract, party) tuple.
protected[JdbcLedgerDao] def SQL_BATCH_INSERT_DIVULGENCES: String
protected[JdbcLedgerDao] def SQL_BATCH_INSERT_DIVULGENCES_FROM_TRANSACTION_ID: String
protected[JdbcLedgerDao] def DUPLICATE_KEY_ERROR
: String // TODO: Avoid brittleness of error message checks
}
object Postgres extends DbType {
override val name: String = "postgres"
override protected[JdbcLedgerDao] val SQL_INSERT_PACKAGE: String =
"""insert into packages(package_id, upload_id, source_description, size, known_since, ledger_offset, package)
|select {package_id}, {upload_id}, {source_description}, {size}, {known_since}, ledger_end, {package}
|from parameters
|on conflict (package_id) do nothing""".stripMargin
override protected[JdbcLedgerDao] val SQL_IMPLICITLY_INSERT_PARTIES: String =
"""insert into parties(party, explicit, ledger_offset)
|values({name}, {explicit}, {ledger_offset})
|on conflict (party) do nothing""".stripMargin
override protected[JdbcLedgerDao] val SQL_BATCH_INSERT_DIVULGENCES: String =
"""insert into contract_divulgences(contract_id, party, ledger_offset, transaction_id)
|values({contract_id}, {party}, {ledger_offset}, {transaction_id})
|on conflict on constraint contract_divulgences_idx do nothing""".stripMargin
override protected[JdbcLedgerDao] val SQL_BATCH_INSERT_DIVULGENCES_FROM_TRANSACTION_ID: String =
"""insert into contract_divulgences(contract_id, party, ledger_offset, transaction_id)
|select {contract_id}, {party}, ledger_offset, {transaction_id}
|from ledger_entries
|where transaction_id={transaction_id}
|on conflict on constraint contract_divulgences_idx do nothing""".stripMargin
override protected[JdbcLedgerDao] val DUPLICATE_KEY_ERROR: String = "duplicate key"
}
object H2Database extends DbType {
override val name: String = "h2database"
// H2 does not support concurrent, conditional updates to the ledger_end at read committed isolation
// level: "It is possible that a transaction from one connection overtakes a transaction from a different
// connection. Depending on the operations, this might result in different results, for example when conditionally
// incrementing a value in a row." - from http://www.h2database.com/html/advanced.html
override val supportsParallelLedgerAppend: Boolean = false
override protected[JdbcLedgerDao] val SQL_INSERT_PACKAGE: String =
"""merge into packages using dual on package_id = {package_id}
|when not matched then insert (package_id, upload_id, source_description, size, known_since, ledger_offset, package)
|select {package_id}, {upload_id}, {source_description}, {size}, {known_since}, ledger_end, {package}
|from parameters""".stripMargin
override protected[JdbcLedgerDao] val SQL_IMPLICITLY_INSERT_PARTIES: String =
"""merge into parties using dual on party = {name}
|when not matched then insert (party, explicit, ledger_offset) values ({name}, {explicit}, {ledger_offset})""".stripMargin
override protected[JdbcLedgerDao] val SQL_BATCH_INSERT_DIVULGENCES: String =
"""merge into contract_divulgences using dual on contract_id = {contract_id} and party = {party}
|when not matched then insert (contract_id, party, ledger_offset, transaction_id)
|values ({contract_id}, {party}, {ledger_offset}, {transaction_id})""".stripMargin
override protected[JdbcLedgerDao] val SQL_BATCH_INSERT_DIVULGENCES_FROM_TRANSACTION_ID: String =
"""merge into contract_divulgences using dual on contract_id = {contract_id} and party = {party}
|when not matched then insert (contract_id, party, ledger_offset, transaction_id)
|select {contract_id}, {party}, ledger_offset, {transaction_id}
|from ledger_entries
|where transaction_id={transaction_id}""".stripMargin
override protected[JdbcLedgerDao] val DUPLICATE_KEY_ERROR: String =
"Unique index or primary key violation"
}
def jdbcType(jdbcUrl: String): DbType = jdbcUrl match {
case h2 if h2.startsWith("jdbc:h2:") => H2Database
case _ => Postgres
}
}

View File

@ -3,20 +3,21 @@
package com.digitalasset.platform.sandbox.stores.ledger.sql.migration
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.JdbcLedgerDao
import javax.sql.DataSource
import org.flywaydb.core.Flyway
import org.slf4j.LoggerFactory
import scala.util.control.NonFatal
class FlywayMigrations(ds: DataSource) {
class FlywayMigrations(ds: DataSource, dbType: JdbcLedgerDao.DbType) {
import FlywayMigrations._
private val logger = LoggerFactory.getLogger(getClass)
def migrate(): Unit = {
try {
val flyway = configurationBase.dataSource(ds).load()
val flyway = configurationBase(dbType).dataSource(ds).load()
logger.info(s"running Flyway migration..")
val stepsTaken = flyway.migrate()
logger.info(s"Flyway schema migration finished successfully applying ${stepsTaken} steps.")
@ -33,7 +34,9 @@ class FlywayMigrations(ds: DataSource) {
object FlywayMigrations {
val configurationBase = Flyway.configure()
def configurationBase(dbType: JdbcLedgerDao.DbType) =
Flyway.configure.locations("classpath:db/migration/" + dbType.name)
def apply(ds: DataSource): FlywayMigrations = new FlywayMigrations(ds)
def apply(ds: DataSource, dbType: JdbcLedgerDao.DbType): FlywayMigrations =
new FlywayMigrations(ds, dbType)
}

View File

@ -1,8 +1,9 @@
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
// Note: package name must correspond exactly to the flyway 'locations' setting, which defaults to 'db.migration'
package db.migration
// Note: package name must correspond exactly to the flyway 'locations' setting, which defaults to
// 'db.migration.postgres' for postgres migrations
package db.migration.postgres
import java.io.InputStream
import java.sql.Connection

View File

@ -1,8 +1,9 @@
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
// Note: package name must correspond exactly to the flyway 'locations' setting, which defaults to 'db.migration'
package db.migration
// Note: package name must correspond exactly to the flyway 'locations' setting, which defaults to
// 'db.migration.postgres' for postgres migrations
package db.migration.postgres
import java.sql.{Connection, ResultSet}

View File

@ -1,8 +1,9 @@
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
// Note: package name must correspond exactly to the flyway 'locations' setting, which defaults to 'db.migration'
package db.migration
// Note: package name must correspond exactly to the flyway 'locations' setting, which defaults to
// 'db.migration.postgres' for postgres migrations
package db.migration.postgres
import java.sql.{Connection, ResultSet}

View File

@ -9,7 +9,10 @@ import java.util.concurrent.Executors
import akka.stream.scaladsl.Source
import akka.{Done, NotUsed}
import com.digitalasset.platform.common.util.DirectExecutionContext
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.HikariJdbcConnectionProvider
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.{
HikariJdbcConnectionProvider,
JdbcLedgerDao
}
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.slf4j.LoggerFactory
@ -19,9 +22,9 @@ trait DbDispatcher extends AutoCloseable {
/** Runs an SQL statement in a dedicated Executor. The whole block will be run in a single database transaction.
*
* The isolation level by default is the one defined in the JDBC driver, it can be however overriden per query on
* The isolation level by default is the one defined in the JDBC driver, it can be however overridden per query on
* the Connection. See further details at: https://docs.oracle.com/cd/E19830-01/819-4721/beamv/index.html
* */
*/
def executeSql[T](sql: Connection => T): Future[T]
/**
@ -42,13 +45,18 @@ trait DbDispatcher extends AutoCloseable {
private class DbDispatcherImpl(
jdbcUrl: String,
dbType: JdbcLedgerDao.DbType,
val noOfShortLivedConnections: Int,
noOfStreamingConnections: Int)
extends DbDispatcher {
private val logger = LoggerFactory.getLogger(getClass)
private val connectionProvider =
HikariJdbcConnectionProvider(jdbcUrl, noOfShortLivedConnections, noOfStreamingConnections)
HikariJdbcConnectionProvider(
jdbcUrl,
dbType,
noOfShortLivedConnections,
noOfStreamingConnections)
private val sqlExecutor = SqlExecutor(noOfShortLivedConnections)
private val connectionGettingThreadPool = ExecutionContext.fromExecutorService(
@ -91,12 +99,14 @@ object DbDispatcher {
* * in sync with the number of JDBC connections in the pool.
*
* @param jdbcUrl the jdbc url containing the database name, user name and password
* @param dbType the jdbc database type, needed for db migrations
* @param noOfShortLivedConnections the number of connections to be pre-allocated for regular SQL queries
* @param noOfStreamingConnections the max number of connections to be used for long, streaming queries
*/
def apply(
jdbcUrl: String,
dbType: JdbcLedgerDao.DbType,
noOfShortLivedConnections: Int,
noOfStreamingConnections: Int): DbDispatcher =
new DbDispatcherImpl(jdbcUrl, noOfShortLivedConnections, noOfStreamingConnections)
new DbDispatcherImpl(jdbcUrl, dbType, noOfShortLivedConnections, noOfStreamingConnections)
}

View File

@ -64,7 +64,7 @@ object LedgerResource {
ledger = LedgerResource.resource(
() =>
Ledger.postgres(
Ledger.jdbcBacked(
postgres.value.jdbcUrl,
ledgerId,
timeProvider,

View File

@ -3,12 +3,16 @@
package com.digitalasset.platform.sandbox.persistence
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.HikariJdbcConnectionProvider
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.{
HikariJdbcConnectionProvider,
JdbcLedgerDao
}
import org.scalatest._
class PostgresIT extends WordSpec with Matchers with PostgresAroundAll {
private lazy val connectionProvider = HikariJdbcConnectionProvider(postgresFixture.jdbcUrl, 4, 4)
private lazy val connectionProvider =
HikariJdbcConnectionProvider(postgresFixture.jdbcUrl, JdbcLedgerDao.Postgres, 4, 4)
"Postgres" when {

View File

@ -70,13 +70,14 @@ class PostgresDaoSpec
override def beforeAll(): Unit = {
super.beforeAll()
dbDispatcher = DbDispatcher(postgresFixture.jdbcUrl, 4, 4)
ledgerDao = PostgresLedgerDao(
dbDispatcher = DbDispatcher(postgresFixture.jdbcUrl, JdbcLedgerDao.Postgres, 4, 4)
ledgerDao = JdbcLedgerDao(
dbDispatcher,
ContractSerializer,
TransactionSerializer,
ValueSerializer,
KeyHasher)
KeyHasher,
JdbcLedgerDao.Postgres)
Await.result(ledgerDao.initializeLedger(LedgerId("test-ledger"), 0), 10.seconds)
}

View File

@ -6,6 +6,7 @@ package com.digitalasset.platform.sandbox.stores.ledger.sql.migration
import java.math.BigInteger
import java.security.MessageDigest
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.JdbcLedgerDao
import com.digitalasset.platform.sandbox.stores.ledger.sql.migration.FlywayMigrations.configurationBase
import org.flywaydb.core.internal.resource.LoadableResource
import org.flywaydb.core.internal.scanner.Scanner
@ -19,32 +20,41 @@ import scala.collection.JavaConverters._
class FlywayMigrationsSpec extends WordSpec with Matchers {
private val digester = MessageDigest.getInstance("SHA-256")
private val resourceScanner = new Scanner(
configurationBase.getLocations.toList.asJava,
getClass.getClassLoader,
configurationBase.getEncoding
)
"Flyway migration files" should {
"always have a valid SHA-256 digest file accompanied" in {
resourceScanner
.getResources("", ".sql")
.asScala
.map { res =>
val fileName = res.getFilename
val expectedDigest = getExpectedDigest(fileName, fileName.dropRight(4) + ".sha256")
val currentDigest = getCurrentDigest(res)
assert(
currentDigest == expectedDigest,
s"Digest of migration file $fileName has changed! It is NOT allowed to change neither existing sql migrations files nor their digests!"
)
}
}
private def scannerOfDbType(dbType: JdbcLedgerDao.DbType) = {
val config = configurationBase(dbType)
new Scanner(config.getLocations.toList.asJava, getClass.getClassLoader, config.getEncoding)
}
private def getExpectedDigest(sourceFile: String, digestFile: String) =
"Postgres flyway migration files" should {
"always have a valid SHA-256 digest file accompanied" in assertFlywayMigrationFileHashes(
JdbcLedgerDao.Postgres)
}
"H2 database flyway migration files" should {
"always have a valid SHA-256 digest file accompanied" in assertFlywayMigrationFileHashes(
JdbcLedgerDao.H2Database)
}
private def assertFlywayMigrationFileHashes(dbType: JdbcLedgerDao.DbType) = {
val resourceScanner = scannerOfDbType(dbType)
resourceScanner
.getResources("", ".sql")
.asScala
.map { res =>
val fileName = res.getFilename
val expectedDigest =
getExpectedDigest(fileName, fileName.dropRight(4) + ".sha256", resourceScanner)
val currentDigest = getCurrentDigest(res)
assert(
currentDigest == expectedDigest,
s"Digest of migration file $fileName has changed! It is NOT allowed to change neither existing sql migrations files nor their digests!"
)
}
}
private def getExpectedDigest(sourceFile: String, digestFile: String, resourceScanner: Scanner) =
new String(Option(resourceScanner.getResource(digestFile))
.getOrElse(sys.error(
s"Missing sha-256 file $digestFile! Are you introducing a new Flyway migration step? You need to create a sha-256 digest file by running this under the db/migration folder: shasum -a 256 $sourceFile | awk '{print $$1}' > $digestFile"))

View File

@ -14,3 +14,4 @@ HEAD — ongoing
* [Dev Tooling] `daml-sdk-head` now installs current-workdir versions of all the published JARs (as version `100.0.0`) to the local Maven repo (`~/.m2`), to allow local testing of unreleased versions of the SDK against JVM-based applications. (Disable with `--skip-jars`)
+ [DAML Compiler] Enable the language extension ``FlexibleContexts`` by default.
+ [DAML Compiler] **BREAKING CHANGE** Enable the language extension ``MonoLocalBinds`` by default. ``let`` and ``where`` bindings introducing polymorphic functions that are used at different types now need an explicit type annotation. Without the type annotation the type of the first use site will be inferred and use sites at different types will fail with a type mismatch error.
+ [Ledger] H2 Database support in the Ledger API Server.