ledger-on-sql: Use tagged execution contexts and data sources in Database. (#7525)

* metrics: Support tagged Futures when timing.

* ledger-on-sql: Use tagged execution contexts in `Database`.

We have to deal with multiple execution contexts in `Database`. This
makes it possible to use them implicitly, which is much cleaner.

CHANGELOG_BEGIN
CHANGELOG_END

* ledger-on-sql: Simplify `Database` a little.

* ledger-on-sql: Make the connection pool implicit.

* ledger-on-sql: Move the execution context into the connection pool.

* ledger-on-sql: Make connection pools more implicit.

* ledger-on-sql: Use the `sc` prefix for `scala.concurrent`.

* ledger-on-sql: Remove an unnecessary import.
This commit is contained in:
Samir Talwar 2020-10-02 17:16:05 +02:00 committed by GitHub
parent 059ae41095
commit aec25d2a49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 138 additions and 107 deletions

View File

@ -93,6 +93,7 @@ da_scala_library(
"//ledger/metrics",
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//libs-scala/concurrent",
"//libs-scala/contextualized-logging",
"//libs-scala/resources",
"@maven//:com_google_protobuf_protobuf_java",

View File

@ -6,6 +6,8 @@ package com.daml.ledger.on.sql
import java.sql.{Connection, SQLException}
import java.util.concurrent.Executors
import com.daml.concurrent.{ExecutionContext, Future}
import com.daml.dec.DirectExecutionContext
import com.daml.ledger.on.sql.Database._
import com.daml.ledger.on.sql.queries._
import com.daml.logging.{ContextualizedLogger, LoggingContext}
@ -15,49 +17,50 @@ import com.daml.resources.ResourceOwner
import com.zaxxer.hikari.HikariDataSource
import javax.sql.DataSource
import org.flywaydb.core.Flyway
import scalaz.syntax.bind._
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
final class Database(
queries: Connection => Queries,
readerConnectionPool: DataSource,
readerExecutionContext: ExecutionContext,
writerConnectionPool: DataSource,
writerExecutionContext: ExecutionContext,
metrics: Metrics,
)(
implicit readerConnectionPool: ConnectionPool[Reader],
writerConnectionPool: ConnectionPool[Writer],
) {
def inReadTransaction[T](name: String)(body: ReadQueries => Future[T]): Future[T] =
inTransaction(name, readerConnectionPool)(connection =>
Future(body(new TimedQueries(queries(connection), metrics)))(readerExecutionContext).flatten)
def inReadTransaction[T](name: String)(
body: ReadQueries => Future[Reader, T]
): Future[Reader, T] =
inTransaction(name)(body)
def inWriteTransaction[T](name: String)(body: Queries => Future[T]): Future[T] =
inTransaction(name, writerConnectionPool)(connection =>
Future(body(new TimedQueries(queries(connection), metrics)))(writerExecutionContext).flatten)
def inWriteTransaction[T](name: String)(body: Queries => Future[Writer, T]): Future[Writer, T] =
inTransaction(name)(body)
private def inTransaction[T](name: String, connectionPool: DataSource)(
body: Connection => Future[T],
): Future[T] = {
def inTransaction[X, T](name: String)(body: Queries => Future[X, T])(
implicit connectionPool: ConnectionPool[X],
): Future[X, T] = {
import connectionPool.executionContext
val connection = try {
Timed.value(
metrics.daml.ledger.database.transactions.acquireConnection(name),
connectionPool.getConnection())
connectionPool.acquireConnection())
} catch {
case exception: SQLException =>
throw new ConnectionAcquisitionException(name, exception)
}
Timed.future(
metrics.daml.ledger.database.transactions.run(name), {
body(connection)
metrics.daml.ledger.database.transactions.run(name),
Future[X] {
body(new TimedQueries(queries(connection), metrics))
.andThen {
case Success(_) => connection.commit()
case Failure(_) => connection.rollback()
}(writerExecutionContext)
}
.andThen {
case _ => connection.close()
}(writerExecutionContext)
}
}
}.join
)
}
}
@ -110,27 +113,28 @@ object Database {
metrics: Metrics,
): ResourceOwner[UninitializedDatabase] =
for {
readerConnectionPool <- ResourceOwner.forCloseable(() =>
readerDataSource <- ResourceOwner.forCloseable(() =>
newHikariDataSource(jdbcUrl, readOnly = true))
writerConnectionPool <- ResourceOwner.forCloseable(() =>
writerDataSource <- ResourceOwner.forCloseable(() =>
newHikariDataSource(jdbcUrl, maxPoolSize = Some(MaximumWriterConnectionPoolSize)))
adminConnectionPool <- ResourceOwner.forCloseable(() => newHikariDataSource(jdbcUrl))
adminDataSource <- ResourceOwner.forCloseable(() => newHikariDataSource(jdbcUrl))
readerExecutorService <- ResourceOwner.forExecutorService(() =>
Executors.newCachedThreadPool())
readerExecutionContext = ExecutionContext.fromExecutorService(readerExecutorService)
writerExecutorService <- ResourceOwner.forExecutorService(() =>
Executors.newFixedThreadPool(MaximumWriterConnectionPoolSize))
writerExecutionContext = ExecutionContext.fromExecutorService(writerExecutorService)
} yield
new UninitializedDatabase(
system = system,
readerConnectionPool = readerConnectionPool,
readerExecutionContext = readerExecutionContext,
writerConnectionPool = writerConnectionPool,
writerExecutionContext = writerExecutionContext,
adminConnectionPool = adminConnectionPool,
metrics = metrics,
)
} yield {
implicit val readerExecutionContext: ExecutionContext[Reader] =
ExecutionContext.fromExecutorService(readerExecutorService)
implicit val writerExecutionContext: ExecutionContext[Writer] =
ExecutionContext.fromExecutorService(writerExecutorService)
implicit val readerConnectionPool: ConnectionPool[Reader] =
new ConnectionPool(readerDataSource)
implicit val writerConnectionPool: ConnectionPool[Writer] =
new ConnectionPool(writerDataSource)
implicit val adminConnectionPool: ConnectionPool[Migrator] =
new ConnectionPool(adminDataSource)(DirectExecutionContext)
new UninitializedDatabase(system = system, metrics = metrics)
}
}
object SingleConnectionDatabase {
@ -140,23 +144,20 @@ object Database {
metrics: Metrics,
): ResourceOwner[UninitializedDatabase] =
for {
readerWriterConnectionPool <- ResourceOwner.forCloseable(() =>
readerWriterDataSource <- ResourceOwner.forCloseable(() =>
newHikariDataSource(jdbcUrl, maxPoolSize = Some(MaximumWriterConnectionPoolSize)))
adminConnectionPool <- ResourceOwner.forCloseable(() => newHikariDataSource(jdbcUrl))
adminDataSource <- ResourceOwner.forCloseable(() => newHikariDataSource(jdbcUrl))
readerWriterExecutorService <- ResourceOwner.forExecutorService(() =>
Executors.newFixedThreadPool(MaximumWriterConnectionPoolSize))
readerWriterExecutionContext = ExecutionContext.fromExecutorService(
readerWriterExecutorService)
} yield
new UninitializedDatabase(
system = system,
readerConnectionPool = readerWriterConnectionPool,
readerExecutionContext = readerWriterExecutionContext,
writerConnectionPool = readerWriterConnectionPool,
writerExecutionContext = readerWriterExecutionContext,
adminConnectionPool = adminConnectionPool,
metrics = metrics,
)
} yield {
implicit val readerWriterExecutionContext: ExecutionContext[Reader with Writer] =
ExecutionContext.fromExecutorService(readerWriterExecutorService)
implicit val readerWriterConnectionPool: ConnectionPool[Reader with Writer] =
new ConnectionPool(readerWriterDataSource)
implicit val adminConnectionPool: ConnectionPool[Migrator] =
new ConnectionPool(adminDataSource)(DirectExecutionContext)
new UninitializedDatabase(system = system, metrics = metrics)
}
}
private def newHikariDataSource(
@ -196,43 +197,35 @@ object Database {
override val queries: Connection => Queries = SqliteQueries.apply
}
}
class UninitializedDatabase(
system: RDBMS,
readerConnectionPool: DataSource,
readerExecutionContext: ExecutionContext,
writerConnectionPool: DataSource,
writerExecutionContext: ExecutionContext,
adminConnectionPool: DataSource,
metrics: Metrics,
class UninitializedDatabase(system: RDBMS, metrics: Metrics)(
implicit readerConnectionPool: ConnectionPool[Reader],
writerConnectionPool: ConnectionPool[Writer],
adminConnectionPool: ConnectionPool[Migrator],
) {
private val flyway: Flyway =
Flyway
.configure()
.placeholders(Map("table.prefix" -> TablePrefix).asJava)
.table(TablePrefix + Flyway.configure().getTable)
.dataSource(adminConnectionPool)
.dataSource(adminConnectionPool.dataSource)
.locations(s"classpath:/com/daml/ledger/on/sql/migrations/${system.name}")
.load()
def migrate(): Database = {
flyway.migrate()
new Database(
queries = system.queries,
readerConnectionPool = readerConnectionPool,
readerExecutionContext = readerExecutionContext,
writerConnectionPool = writerConnectionPool,
writerExecutionContext = writerExecutionContext,
metrics = metrics,
)
new Database(queries = system.queries, metrics = metrics)
}
def migrateAndReset()(implicit executionContext: ExecutionContext): Future[Database] = {
def migrateAndReset()(
implicit executionContext: ExecutionContext[Migrator]
): Future[Migrator, Database] = {
val db = migrate()
db.inWriteTransaction("ledger_reset") { queries =>
db.inTransaction("ledger_reset") { queries =>
Future.fromTry(queries.truncate())
}
}(adminConnectionPool)
.map(_ => db)
}
@ -242,11 +235,25 @@ object Database {
}
}
class InvalidDatabaseException(message: String)
sealed trait Context
sealed trait Reader extends Context
sealed trait Writer extends Context
sealed trait Migrator extends Context
private final class ConnectionPool[+P](
val dataSource: DataSource,
)(implicit val executionContext: ExecutionContext[P]) {
def acquireConnection(): Connection = dataSource.getConnection()
}
final class InvalidDatabaseException(message: String)
extends RuntimeException(message)
with StartupException
class ConnectionAcquisitionException(name: String, cause: Throwable)
final class ConnectionAcquisitionException(name: String, cause: Throwable)
extends RuntimeException(s"""Failed to acquire the connection during "$name".""", cause)
}

View File

@ -10,6 +10,7 @@ import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.api.util.TimeProvider
import com.daml.caching.Cache
import com.daml.concurrent.{ExecutionContext, Future}
import com.daml.ledger.api.domain
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.on.sql.SqlLedgerReaderWriter._
@ -35,8 +36,8 @@ import com.daml.platform.common.MismatchException
import com.daml.resources.{Resource, ResourceOwner}
import com.google.protobuf.ByteString
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.{concurrent => sc}
final class SqlLedgerReaderWriter(
override val ledgerId: LedgerId = Ref.LedgerString.assertFromString(UUID.randomUUID.toString),
@ -45,7 +46,7 @@ final class SqlLedgerReaderWriter(
database: Database,
dispatcher: Dispatcher[Index],
committer: ValidatingCommitter[Index],
committerExecutionContext: ExecutionContext,
committerExecutionContext: sc.ExecutionContext,
) extends LedgerWriter
with LedgerReader {
@ -58,11 +59,12 @@ final class SqlLedgerReaderWriter(
RangeSource(
(startExclusive, endInclusive) =>
Source
.future(
Timed.value(metrics.daml.ledger.log.read, database.inReadTransaction("read_log") {
.future(Timed
.value(metrics.daml.ledger.log.read, database.inReadTransaction("read_log") {
queries =>
Future.fromTry(queries.selectFromLog(startExclusive, endInclusive))
}))
})
.removeExecutionContext)
.mapConcat(identity)
.mapMaterializedValue(_ => NotUsed)),
)
@ -72,7 +74,7 @@ final class SqlLedgerReaderWriter(
correlationId: String,
envelope: Bytes,
metadata: CommitMetadata,
): Future[SubmissionResult] =
): sc.Future[SubmissionResult] =
committer.commit(correlationId, envelope, participantId)(committerExecutionContext)
}
@ -94,14 +96,17 @@ object SqlLedgerReaderWriter {
)(implicit loggingContext: LoggingContext)
extends ResourceOwner[SqlLedgerReaderWriter] {
override def acquire()(
implicit executionContext: ExecutionContext
): Resource[SqlLedgerReaderWriter] =
implicit executionContext: sc.ExecutionContext
): Resource[SqlLedgerReaderWriter] = {
implicit val migratorExecutionContext: ExecutionContext[Database.Migrator] =
ExecutionContext(executionContext)
for {
uninitializedDatabase <- Database.owner(jdbcUrl, metrics).acquire()
database <- Resource.fromFuture(
if (resetOnStartup) uninitializedDatabase.migrateAndReset()
else Future.successful(uninitializedDatabase.migrate()))
ledgerId <- Resource.fromFuture(updateOrRetrieveLedgerId(ledgerId, database))
if (resetOnStartup) uninitializedDatabase.migrateAndReset().removeExecutionContext
else sc.Future.successful(uninitializedDatabase.migrate()))
ledgerId <- Resource.fromFuture(
updateOrRetrieveLedgerId(ledgerId, database).removeExecutionContext)
dispatcher <- new DispatcherOwner(database).acquire()
validator = SubmissionValidator.createForTimeMode(
new SqlLedgerStateAccess(database, metrics),
@ -118,7 +123,7 @@ object SqlLedgerReaderWriter {
)
committerExecutionContext <- ResourceOwner
.forExecutorService(() =>
ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()))
sc.ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()))
.acquire()
} yield
new SqlLedgerReaderWriter(
@ -130,12 +135,13 @@ object SqlLedgerReaderWriter {
committer,
committerExecutionContext,
)
}
}
private def updateOrRetrieveLedgerId(
providedLedgerId: LedgerId,
database: Database,
): Future[LedgerId] =
): Future[Database.Writer, LedgerId] =
database.inWriteTransaction("retrieve_ledger_id") { queries =>
Future.fromTry(
queries
@ -155,15 +161,14 @@ object SqlLedgerReaderWriter {
private final class DispatcherOwner(database: Database) extends ResourceOwner[Dispatcher[Index]] {
override def acquire()(
implicit executionContext: ExecutionContext
implicit executionContext: sc.ExecutionContext
): Resource[Dispatcher[Index]] =
for {
head <- Resource.fromFuture(
database
.inReadTransaction("read_head") { queries =>
Future.fromTry(
queries.selectLatestLogEntryId().map(_.map(_ + 1).getOrElse(StartIndex)))
})
head <- Resource.fromFuture(database
.inReadTransaction("read_head") { queries =>
Future.fromTry(queries.selectLatestLogEntryId().map(_.map(_ + 1).getOrElse(StartIndex)))
}
.removeExecutionContext)
dispatcher <- Dispatcher
.owner(
name = "sql-participant-state",
@ -185,30 +190,33 @@ object SqlLedgerReaderWriter {
private final class SqlLedgerStateAccess(database: Database, metrics: Metrics)
extends LedgerStateAccess[Index] {
override def inTransaction[T](body: LedgerStateOperations[Index] => Future[T])(
implicit executionContext: ExecutionContext
): Future[T] =
database.inWriteTransaction("commit") { queries =>
body(new TimedLedgerStateOperations(new SqlLedgerStateOperations(queries), metrics))
}
override def inTransaction[T](body: LedgerStateOperations[Index] => sc.Future[T])(
implicit executionContext: sc.ExecutionContext
): sc.Future[T] =
database
.inWriteTransaction("commit") { queries =>
body(new TimedLedgerStateOperations(new SqlLedgerStateOperations(queries), metrics))
}
.removeExecutionContext
}
private final class SqlLedgerStateOperations(queries: Queries)
extends BatchingLedgerStateOperations[Index] {
override def readState(
keys: Iterable[Key],
)(implicit executionContext: ExecutionContext): Future[Seq[Option[Value]]] =
Future.fromTry(queries.selectStateValuesByKeys(keys))
)(implicit executionContext: sc.ExecutionContext): sc.Future[Seq[Option[Value]]] =
Future.fromTry(queries.selectStateValuesByKeys(keys)).removeExecutionContext
override def writeState(
keyValuePairs: Iterable[(Key, Value)],
)(implicit executionContext: ExecutionContext): Future[Unit] =
Future.fromTry(queries.updateState(keyValuePairs))
)(implicit executionContext: sc.ExecutionContext): sc.Future[Unit] =
Future.fromTry(queries.updateState(keyValuePairs)).removeExecutionContext
override def appendToLog(key: Key, value: Value)(
implicit executionContext: ExecutionContext
): Future[Index] =
Future.fromTry(queries.insertRecordIntoLog(key, value))
override def appendToLog(
key: Key,
value: Value,
)(implicit executionContext: sc.ExecutionContext): sc.Future[Index] =
Future.fromTry(queries.insertRecordIntoLog(key, value)).removeExecutionContext
}
}

View File

@ -22,6 +22,7 @@ da_scala_library(
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_dropwizard_metrics_metrics_jvm",
"@maven//:org_scalaz_scalaz_core_2_12",
],
)
@ -31,6 +32,7 @@ da_scala_test_suite(
srcs = glob(["src/test/scala/**/*.scala"]),
deps = [
":metrics",
"//libs-scala/concurrent",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:org_scalatest_scalatest_2_12",
],

View File

@ -8,6 +8,7 @@ import java.util.concurrent.CompletionStage
import akka.Done
import akka.stream.scaladsl.{Keep, Source}
import com.codahale.metrics.{Counter, Timer}
import com.daml.concurrent
import com.daml.dec.DirectExecutionContext
import scala.concurrent.Future
@ -32,6 +33,13 @@ object Timed {
result
}
def future[EC, T](timer: Timer, future: => concurrent.Future[EC, T]): concurrent.Future[EC, T] = {
val ctx = timer.time()
val result = future
result.onComplete(_ => ctx.stop())(DirectExecutionContext)
result
}
def trackedFuture[T](counter: Counter, future: => Future[T]): Future[T] = {
counter.inc()
future.andThen { case _ => counter.dec() }(DirectExecutionContext)

View File

@ -3,11 +3,11 @@
package com.daml
import scalaz.Id.Id
import scala.util.Try
import scala.{concurrent => sc}
import scalaz.Id.Id
/** A compatible layer for `scala.concurrent` with extra type parameters to
* control `ExecutionContext`s. Deliberately uses the same names as the
* equivalent concepts in `scala.concurrent`.
@ -78,6 +78,8 @@ package object concurrent {
// keeping the companions with the same-named type aliases in same file
package concurrent {
import java.util.concurrent.ExecutorService
object Future {
/** {{{
@ -106,5 +108,8 @@ package concurrent {
*/
def apply[EC](ec: sc.ExecutionContext): ExecutionContext[EC] =
ExecutionContextOf.Instance.subst[Id, EC](ec)
def fromExecutorService[EC](e: ExecutorService): ExecutionContext[EC] =
apply(sc.ExecutionContext.fromExecutorService(e))
}
}