From aec25d2a49d99e8f74d296f33cc0fb01bbabc1c3 Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Fri, 2 Oct 2020 17:16:05 +0200 Subject: [PATCH] ledger-on-sql: Use tagged execution contexts and data sources in `Database`. (#7525) * metrics: Support tagged Futures when timing. * ledger-on-sql: Use tagged execution contexts in `Database`. We have to deal with multiple execution contexts in `Database`. This makes it possible to use them implicitly, which is much cleaner. CHANGELOG_BEGIN CHANGELOG_END * ledger-on-sql: Simplify `Database` a little. * ledger-on-sql: Make the connection pool implicit. * ledger-on-sql: Move the execution context into the connection pool. * ledger-on-sql: Make connection pools more implicit. * ledger-on-sql: Use the `sc` prefix for `scala.concurrent`. * ledger-on-sql: Remove an unnecessary import. --- ledger/ledger-on-sql/BUILD.bazel | 1 + .../com/daml/ledger/on/sql/Database.scala | 149 +++++++++--------- .../ledger/on/sql/SqlLedgerReaderWriter.scala | 76 +++++---- ledger/metrics/BUILD.bazel | 2 + .../main/scala/com/daml/metrics/Timed.scala | 8 + .../src/main/scala/concurrent/package.scala | 9 +- 6 files changed, 138 insertions(+), 107 deletions(-) diff --git a/ledger/ledger-on-sql/BUILD.bazel b/ledger/ledger-on-sql/BUILD.bazel index 7a344d0bea8..bb0463ca9c2 100644 --- a/ledger/ledger-on-sql/BUILD.bazel +++ b/ledger/ledger-on-sql/BUILD.bazel @@ -93,6 +93,7 @@ da_scala_library( "//ledger/metrics", "//ledger/participant-state", "//ledger/participant-state/kvutils", + "//libs-scala/concurrent", "//libs-scala/contextualized-logging", "//libs-scala/resources", "@maven//:com_google_protobuf_protobuf_java", diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala index 696ef5b784e..aa838ddc175 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala @@ -6,6 +6,8 @@ package com.daml.ledger.on.sql import java.sql.{Connection, SQLException} import java.util.concurrent.Executors +import com.daml.concurrent.{ExecutionContext, Future} +import com.daml.dec.DirectExecutionContext import com.daml.ledger.on.sql.Database._ import com.daml.ledger.on.sql.queries._ import com.daml.logging.{ContextualizedLogger, LoggingContext} @@ -15,49 +17,50 @@ import com.daml.resources.ResourceOwner import com.zaxxer.hikari.HikariDataSource import javax.sql.DataSource import org.flywaydb.core.Flyway +import scalaz.syntax.bind._ import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} final class Database( queries: Connection => Queries, - readerConnectionPool: DataSource, - readerExecutionContext: ExecutionContext, - writerConnectionPool: DataSource, - writerExecutionContext: ExecutionContext, metrics: Metrics, +)( + implicit readerConnectionPool: ConnectionPool[Reader], + writerConnectionPool: ConnectionPool[Writer], ) { - def inReadTransaction[T](name: String)(body: ReadQueries => Future[T]): Future[T] = - inTransaction(name, readerConnectionPool)(connection => - Future(body(new TimedQueries(queries(connection), metrics)))(readerExecutionContext).flatten) + def inReadTransaction[T](name: String)( + body: ReadQueries => Future[Reader, T] + ): Future[Reader, T] = + inTransaction(name)(body) - def inWriteTransaction[T](name: String)(body: Queries => Future[T]): Future[T] = - inTransaction(name, writerConnectionPool)(connection => - Future(body(new TimedQueries(queries(connection), metrics)))(writerExecutionContext).flatten) + def inWriteTransaction[T](name: String)(body: Queries => Future[Writer, T]): Future[Writer, T] = + inTransaction(name)(body) - private def inTransaction[T](name: String, connectionPool: DataSource)( - body: Connection => Future[T], - ): Future[T] = { + def inTransaction[X, T](name: String)(body: Queries => Future[X, T])( + implicit connectionPool: ConnectionPool[X], + ): Future[X, T] = { + import connectionPool.executionContext val connection = try { Timed.value( metrics.daml.ledger.database.transactions.acquireConnection(name), - connectionPool.getConnection()) + connectionPool.acquireConnection()) } catch { case exception: SQLException => throw new ConnectionAcquisitionException(name, exception) } Timed.future( - metrics.daml.ledger.database.transactions.run(name), { - body(connection) + metrics.daml.ledger.database.transactions.run(name), + Future[X] { + body(new TimedQueries(queries(connection), metrics)) .andThen { case Success(_) => connection.commit() case Failure(_) => connection.rollback() - }(writerExecutionContext) + } .andThen { case _ => connection.close() - }(writerExecutionContext) - } + } + }.join ) } } @@ -110,27 +113,28 @@ object Database { metrics: Metrics, ): ResourceOwner[UninitializedDatabase] = for { - readerConnectionPool <- ResourceOwner.forCloseable(() => + readerDataSource <- ResourceOwner.forCloseable(() => newHikariDataSource(jdbcUrl, readOnly = true)) - writerConnectionPool <- ResourceOwner.forCloseable(() => + writerDataSource <- ResourceOwner.forCloseable(() => newHikariDataSource(jdbcUrl, maxPoolSize = Some(MaximumWriterConnectionPoolSize))) - adminConnectionPool <- ResourceOwner.forCloseable(() => newHikariDataSource(jdbcUrl)) + adminDataSource <- ResourceOwner.forCloseable(() => newHikariDataSource(jdbcUrl)) readerExecutorService <- ResourceOwner.forExecutorService(() => Executors.newCachedThreadPool()) - readerExecutionContext = ExecutionContext.fromExecutorService(readerExecutorService) writerExecutorService <- ResourceOwner.forExecutorService(() => Executors.newFixedThreadPool(MaximumWriterConnectionPoolSize)) - writerExecutionContext = ExecutionContext.fromExecutorService(writerExecutorService) - } yield - new UninitializedDatabase( - system = system, - readerConnectionPool = readerConnectionPool, - readerExecutionContext = readerExecutionContext, - writerConnectionPool = writerConnectionPool, - writerExecutionContext = writerExecutionContext, - adminConnectionPool = adminConnectionPool, - metrics = metrics, - ) + } yield { + implicit val readerExecutionContext: ExecutionContext[Reader] = + ExecutionContext.fromExecutorService(readerExecutorService) + implicit val writerExecutionContext: ExecutionContext[Writer] = + ExecutionContext.fromExecutorService(writerExecutorService) + implicit val readerConnectionPool: ConnectionPool[Reader] = + new ConnectionPool(readerDataSource) + implicit val writerConnectionPool: ConnectionPool[Writer] = + new ConnectionPool(writerDataSource) + implicit val adminConnectionPool: ConnectionPool[Migrator] = + new ConnectionPool(adminDataSource)(DirectExecutionContext) + new UninitializedDatabase(system = system, metrics = metrics) + } } object SingleConnectionDatabase { @@ -140,23 +144,20 @@ object Database { metrics: Metrics, ): ResourceOwner[UninitializedDatabase] = for { - readerWriterConnectionPool <- ResourceOwner.forCloseable(() => + readerWriterDataSource <- ResourceOwner.forCloseable(() => newHikariDataSource(jdbcUrl, maxPoolSize = Some(MaximumWriterConnectionPoolSize))) - adminConnectionPool <- ResourceOwner.forCloseable(() => newHikariDataSource(jdbcUrl)) + adminDataSource <- ResourceOwner.forCloseable(() => newHikariDataSource(jdbcUrl)) readerWriterExecutorService <- ResourceOwner.forExecutorService(() => Executors.newFixedThreadPool(MaximumWriterConnectionPoolSize)) - readerWriterExecutionContext = ExecutionContext.fromExecutorService( - readerWriterExecutorService) - } yield - new UninitializedDatabase( - system = system, - readerConnectionPool = readerWriterConnectionPool, - readerExecutionContext = readerWriterExecutionContext, - writerConnectionPool = readerWriterConnectionPool, - writerExecutionContext = readerWriterExecutionContext, - adminConnectionPool = adminConnectionPool, - metrics = metrics, - ) + } yield { + implicit val readerWriterExecutionContext: ExecutionContext[Reader with Writer] = + ExecutionContext.fromExecutorService(readerWriterExecutorService) + implicit val readerWriterConnectionPool: ConnectionPool[Reader with Writer] = + new ConnectionPool(readerWriterDataSource) + implicit val adminConnectionPool: ConnectionPool[Migrator] = + new ConnectionPool(adminDataSource)(DirectExecutionContext) + new UninitializedDatabase(system = system, metrics = metrics) + } } private def newHikariDataSource( @@ -196,43 +197,35 @@ object Database { override val queries: Connection => Queries = SqliteQueries.apply } + } - class UninitializedDatabase( - system: RDBMS, - readerConnectionPool: DataSource, - readerExecutionContext: ExecutionContext, - writerConnectionPool: DataSource, - writerExecutionContext: ExecutionContext, - adminConnectionPool: DataSource, - metrics: Metrics, + class UninitializedDatabase(system: RDBMS, metrics: Metrics)( + implicit readerConnectionPool: ConnectionPool[Reader], + writerConnectionPool: ConnectionPool[Writer], + adminConnectionPool: ConnectionPool[Migrator], ) { private val flyway: Flyway = Flyway .configure() .placeholders(Map("table.prefix" -> TablePrefix).asJava) .table(TablePrefix + Flyway.configure().getTable) - .dataSource(adminConnectionPool) + .dataSource(adminConnectionPool.dataSource) .locations(s"classpath:/com/daml/ledger/on/sql/migrations/${system.name}") .load() def migrate(): Database = { flyway.migrate() - new Database( - queries = system.queries, - readerConnectionPool = readerConnectionPool, - readerExecutionContext = readerExecutionContext, - writerConnectionPool = writerConnectionPool, - writerExecutionContext = writerExecutionContext, - metrics = metrics, - ) + new Database(queries = system.queries, metrics = metrics) } - def migrateAndReset()(implicit executionContext: ExecutionContext): Future[Database] = { + def migrateAndReset()( + implicit executionContext: ExecutionContext[Migrator] + ): Future[Migrator, Database] = { val db = migrate() - db.inWriteTransaction("ledger_reset") { queries => + db.inTransaction("ledger_reset") { queries => Future.fromTry(queries.truncate()) - } + }(adminConnectionPool) .map(_ => db) } @@ -242,11 +235,25 @@ object Database { } } - class InvalidDatabaseException(message: String) + sealed trait Context + + sealed trait Reader extends Context + + sealed trait Writer extends Context + + sealed trait Migrator extends Context + + private final class ConnectionPool[+P]( + val dataSource: DataSource, + )(implicit val executionContext: ExecutionContext[P]) { + def acquireConnection(): Connection = dataSource.getConnection() + } + + final class InvalidDatabaseException(message: String) extends RuntimeException(message) with StartupException - class ConnectionAcquisitionException(name: String, cause: Throwable) + final class ConnectionAcquisitionException(name: String, cause: Throwable) extends RuntimeException(s"""Failed to acquire the connection during "$name".""", cause) } diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala index 863bbd95b2e..b4bd4f553f2 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala @@ -10,6 +10,7 @@ import akka.NotUsed import akka.stream.scaladsl.Source import com.daml.api.util.TimeProvider import com.daml.caching.Cache +import com.daml.concurrent.{ExecutionContext, Future} import com.daml.ledger.api.domain import com.daml.ledger.api.health.{HealthStatus, Healthy} import com.daml.ledger.on.sql.SqlLedgerReaderWriter._ @@ -35,8 +36,8 @@ import com.daml.platform.common.MismatchException import com.daml.resources.{Resource, ResourceOwner} import com.google.protobuf.ByteString -import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} +import scala.{concurrent => sc} final class SqlLedgerReaderWriter( override val ledgerId: LedgerId = Ref.LedgerString.assertFromString(UUID.randomUUID.toString), @@ -45,7 +46,7 @@ final class SqlLedgerReaderWriter( database: Database, dispatcher: Dispatcher[Index], committer: ValidatingCommitter[Index], - committerExecutionContext: ExecutionContext, + committerExecutionContext: sc.ExecutionContext, ) extends LedgerWriter with LedgerReader { @@ -58,11 +59,12 @@ final class SqlLedgerReaderWriter( RangeSource( (startExclusive, endInclusive) => Source - .future( - Timed.value(metrics.daml.ledger.log.read, database.inReadTransaction("read_log") { + .future(Timed + .value(metrics.daml.ledger.log.read, database.inReadTransaction("read_log") { queries => Future.fromTry(queries.selectFromLog(startExclusive, endInclusive)) - })) + }) + .removeExecutionContext) .mapConcat(identity) .mapMaterializedValue(_ => NotUsed)), ) @@ -72,7 +74,7 @@ final class SqlLedgerReaderWriter( correlationId: String, envelope: Bytes, metadata: CommitMetadata, - ): Future[SubmissionResult] = + ): sc.Future[SubmissionResult] = committer.commit(correlationId, envelope, participantId)(committerExecutionContext) } @@ -94,14 +96,17 @@ object SqlLedgerReaderWriter { )(implicit loggingContext: LoggingContext) extends ResourceOwner[SqlLedgerReaderWriter] { override def acquire()( - implicit executionContext: ExecutionContext - ): Resource[SqlLedgerReaderWriter] = + implicit executionContext: sc.ExecutionContext + ): Resource[SqlLedgerReaderWriter] = { + implicit val migratorExecutionContext: ExecutionContext[Database.Migrator] = + ExecutionContext(executionContext) for { uninitializedDatabase <- Database.owner(jdbcUrl, metrics).acquire() database <- Resource.fromFuture( - if (resetOnStartup) uninitializedDatabase.migrateAndReset() - else Future.successful(uninitializedDatabase.migrate())) - ledgerId <- Resource.fromFuture(updateOrRetrieveLedgerId(ledgerId, database)) + if (resetOnStartup) uninitializedDatabase.migrateAndReset().removeExecutionContext + else sc.Future.successful(uninitializedDatabase.migrate())) + ledgerId <- Resource.fromFuture( + updateOrRetrieveLedgerId(ledgerId, database).removeExecutionContext) dispatcher <- new DispatcherOwner(database).acquire() validator = SubmissionValidator.createForTimeMode( new SqlLedgerStateAccess(database, metrics), @@ -118,7 +123,7 @@ object SqlLedgerReaderWriter { ) committerExecutionContext <- ResourceOwner .forExecutorService(() => - ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())) + sc.ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())) .acquire() } yield new SqlLedgerReaderWriter( @@ -130,12 +135,13 @@ object SqlLedgerReaderWriter { committer, committerExecutionContext, ) + } } private def updateOrRetrieveLedgerId( providedLedgerId: LedgerId, database: Database, - ): Future[LedgerId] = + ): Future[Database.Writer, LedgerId] = database.inWriteTransaction("retrieve_ledger_id") { queries => Future.fromTry( queries @@ -155,15 +161,14 @@ object SqlLedgerReaderWriter { private final class DispatcherOwner(database: Database) extends ResourceOwner[Dispatcher[Index]] { override def acquire()( - implicit executionContext: ExecutionContext + implicit executionContext: sc.ExecutionContext ): Resource[Dispatcher[Index]] = for { - head <- Resource.fromFuture( - database - .inReadTransaction("read_head") { queries => - Future.fromTry( - queries.selectLatestLogEntryId().map(_.map(_ + 1).getOrElse(StartIndex))) - }) + head <- Resource.fromFuture(database + .inReadTransaction("read_head") { queries => + Future.fromTry(queries.selectLatestLogEntryId().map(_.map(_ + 1).getOrElse(StartIndex))) + } + .removeExecutionContext) dispatcher <- Dispatcher .owner( name = "sql-participant-state", @@ -185,30 +190,33 @@ object SqlLedgerReaderWriter { private final class SqlLedgerStateAccess(database: Database, metrics: Metrics) extends LedgerStateAccess[Index] { - override def inTransaction[T](body: LedgerStateOperations[Index] => Future[T])( - implicit executionContext: ExecutionContext - ): Future[T] = - database.inWriteTransaction("commit") { queries => - body(new TimedLedgerStateOperations(new SqlLedgerStateOperations(queries), metrics)) - } + override def inTransaction[T](body: LedgerStateOperations[Index] => sc.Future[T])( + implicit executionContext: sc.ExecutionContext + ): sc.Future[T] = + database + .inWriteTransaction("commit") { queries => + body(new TimedLedgerStateOperations(new SqlLedgerStateOperations(queries), metrics)) + } + .removeExecutionContext } private final class SqlLedgerStateOperations(queries: Queries) extends BatchingLedgerStateOperations[Index] { override def readState( keys: Iterable[Key], - )(implicit executionContext: ExecutionContext): Future[Seq[Option[Value]]] = - Future.fromTry(queries.selectStateValuesByKeys(keys)) + )(implicit executionContext: sc.ExecutionContext): sc.Future[Seq[Option[Value]]] = + Future.fromTry(queries.selectStateValuesByKeys(keys)).removeExecutionContext override def writeState( keyValuePairs: Iterable[(Key, Value)], - )(implicit executionContext: ExecutionContext): Future[Unit] = - Future.fromTry(queries.updateState(keyValuePairs)) + )(implicit executionContext: sc.ExecutionContext): sc.Future[Unit] = + Future.fromTry(queries.updateState(keyValuePairs)).removeExecutionContext - override def appendToLog(key: Key, value: Value)( - implicit executionContext: ExecutionContext - ): Future[Index] = - Future.fromTry(queries.insertRecordIntoLog(key, value)) + override def appendToLog( + key: Key, + value: Value, + )(implicit executionContext: sc.ExecutionContext): sc.Future[Index] = + Future.fromTry(queries.insertRecordIntoLog(key, value)).removeExecutionContext } } diff --git a/ledger/metrics/BUILD.bazel b/ledger/metrics/BUILD.bazel index 7af90c2c3ea..07b49aba3a2 100644 --- a/ledger/metrics/BUILD.bazel +++ b/ledger/metrics/BUILD.bazel @@ -22,6 +22,7 @@ da_scala_library( "@maven//:com_typesafe_akka_akka_stream_2_12", "@maven//:io_dropwizard_metrics_metrics_core", "@maven//:io_dropwizard_metrics_metrics_jvm", + "@maven//:org_scalaz_scalaz_core_2_12", ], ) @@ -31,6 +32,7 @@ da_scala_test_suite( srcs = glob(["src/test/scala/**/*.scala"]), deps = [ ":metrics", + "//libs-scala/concurrent", "@maven//:io_dropwizard_metrics_metrics_core", "@maven//:org_scalatest_scalatest_2_12", ], diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Timed.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Timed.scala index bebcdbdee4f..54c2dc85c2e 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/Timed.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/Timed.scala @@ -8,6 +8,7 @@ import java.util.concurrent.CompletionStage import akka.Done import akka.stream.scaladsl.{Keep, Source} import com.codahale.metrics.{Counter, Timer} +import com.daml.concurrent import com.daml.dec.DirectExecutionContext import scala.concurrent.Future @@ -32,6 +33,13 @@ object Timed { result } + def future[EC, T](timer: Timer, future: => concurrent.Future[EC, T]): concurrent.Future[EC, T] = { + val ctx = timer.time() + val result = future + result.onComplete(_ => ctx.stop())(DirectExecutionContext) + result + } + def trackedFuture[T](counter: Counter, future: => Future[T]): Future[T] = { counter.inc() future.andThen { case _ => counter.dec() }(DirectExecutionContext) diff --git a/libs-scala/concurrent/src/main/scala/concurrent/package.scala b/libs-scala/concurrent/src/main/scala/concurrent/package.scala index b00c08e07ee..a6c5fb2044e 100644 --- a/libs-scala/concurrent/src/main/scala/concurrent/package.scala +++ b/libs-scala/concurrent/src/main/scala/concurrent/package.scala @@ -3,11 +3,11 @@ package com.daml +import scalaz.Id.Id + import scala.util.Try import scala.{concurrent => sc} -import scalaz.Id.Id - /** A compatible layer for `scala.concurrent` with extra type parameters to * control `ExecutionContext`s. Deliberately uses the same names as the * equivalent concepts in `scala.concurrent`. @@ -78,6 +78,8 @@ package object concurrent { // keeping the companions with the same-named type aliases in same file package concurrent { + import java.util.concurrent.ExecutorService + object Future { /** {{{ @@ -106,5 +108,8 @@ package concurrent { */ def apply[EC](ec: sc.ExecutionContext): ExecutionContext[EC] = ExecutionContextOf.Instance.subst[Id, EC](ec) + + def fromExecutorService[EC](e: ExecutorService): ExecutionContext[EC] = + apply(sc.ExecutionContext.fromExecutorService(e)) } }