ledger-on-sql: Make sure the ledger picks up where it left off. (#4266)

* resources: Make it easy to use a resource and then release it.

* ledger-on-sql: Move database transaction methods to `Database`.

* ledger-on-sql: Make sure the ledger picks up where it left off.

This means seeding the dispatcher with the correct head, rather than
starting at 1 every time.

CHANGELOG_BEGIN
CHANGELOG_END

* ledger-on-sql: Move `Index` and `StartIndex` to the package object.

They're used everywhere.

Co-authored-by: Samir Talwar <samir.talwar@digitalasset.com>
This commit is contained in:
Samir Talwar 2020-01-30 11:16:32 +01:00 committed by GitHub
parent eacacbf44e
commit fb58dc8756
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 250 additions and 87 deletions

View File

@ -170,6 +170,7 @@ da_scala_test_suite(
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:org_flywaydb_flyway_core",
"@maven//:org_scala_lang_modules_scala_java8_compat_2_12",
"@maven//:org_scalactic_scalactic_2_12",
"@maven//:org_scalatest_scalatest_2_12",
],

View File

@ -3,6 +3,8 @@
package com.daml.ledger.on.sql
import java.sql.Connection
import com.daml.ledger.on.sql.queries.{H2Queries, PostgresqlQueries, Queries, SqliteQueries}
import com.digitalasset.logging.{ContextualizedLogger, LoggingContext}
import com.digitalasset.resources.ResourceOwner
@ -10,11 +12,57 @@ import com.zaxxer.hikari.HikariDataSource
import javax.sql.DataSource
import org.flywaydb.core.Flyway
case class Database(
queries: Queries,
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
class Database(
val queries: Queries,
readerConnectionPool: DataSource,
writerConnectionPool: DataSource,
)
) {
private val logger = ContextualizedLogger.get(this.getClass)
def inReadTransaction[T](message: String)(
body: Connection => T,
)(implicit logCtx: LoggingContext): T = {
inTransaction(message, readerConnectionPool)(body)
}
def inWriteTransaction[T](message: String)(
body: Connection => T,
)(implicit logCtx: LoggingContext): T = {
inTransaction(message, writerConnectionPool)(body)
}
private def inTransaction[T](message: String, connectionPool: DataSource)(
body: Connection => T,
)(implicit logCtx: LoggingContext): T = {
val connection =
time(s"$message: acquiring connection")(connectionPool.getConnection())
time(message) {
try {
val result = body(connection)
connection.commit()
result
} catch {
case NonFatal(exception) =>
connection.rollback()
throw exception
} finally {
connection.close()
}
}
}
private def time[T](message: String)(body: => T)(implicit logCtx: LoggingContext): T = {
val startTime = System.nanoTime()
logger.trace(s"$message: starting")
val result = body
val endTime = System.nanoTime()
logger.trace(s"$message: finished in ${Duration.fromNanos(endTime - startTime).toMillis}ms")
result
}
}
object Database {
private val logger = ContextualizedLogger.get(classOf[Database])
@ -148,7 +196,7 @@ object Database {
def migrate(): Database = {
flyway.migrate()
afterMigration()
Database(system.queries, readerConnectionPool, writerConnectionPool)
new Database(system.queries, readerConnectionPool, writerConnectionPool)
}
def clear(): this.type = {

View File

@ -11,11 +11,10 @@ import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.ledger.on.sql.SqlLedgerReaderWriter._
import com.daml.ledger.on.sql.queries.Queries.Index
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlLogEntryId,
DamlStateKey,
DamlStateValue,
DamlStateValue
}
import com.daml.ledger.participant.state.kvutils.api.{LedgerReader, LedgerRecord, LedgerWriter}
import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting}
@ -24,19 +23,16 @@ import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.daml.lf.engine.Engine
import com.digitalasset.ledger.api.health.{HealthStatus, Healthy}
import com.digitalasset.logging.LoggingContext
import com.digitalasset.logging.LoggingContext.withEnrichedLoggingContext
import com.digitalasset.logging.{ContextualizedLogger, LoggingContext}
import com.digitalasset.platform.akkastreams.dispatcher.Dispatcher
import com.digitalasset.platform.akkastreams.dispatcher.SubSource.RangeSource
import com.digitalasset.resources.ResourceOwner
import com.google.protobuf.ByteString
import javax.sql.DataSource
import scala.collection.JavaConverters._
import scala.collection.immutable.TreeSet
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
class SqlLedgerReaderWriter(
ledgerId: LedgerId = Ref.LedgerString.assertFromString(UUID.randomUUID.toString),
@ -50,8 +46,6 @@ class SqlLedgerReaderWriter(
) extends LedgerWriter
with LedgerReader {
private val logger = ContextualizedLogger.get(this.getClass)
private val engine = Engine()
private val queries = database.queries
@ -66,7 +60,7 @@ class SqlLedgerReaderWriter(
.startingAt(
offset.getOrElse(StartOffset).components.head,
RangeSource((start, end) => {
val result = inDatabaseReadTransaction(s"Querying events [$start, $end[ from log") {
val result = database.inReadTransaction(s"Querying events [$start, $end[ from log") {
implicit connection =>
queries.selectFromLog(start, end)
}
@ -88,22 +82,23 @@ class SqlLedgerReaderWriter(
.getOrElse(throw new IllegalArgumentException("Not a valid submission in envelope"))
val stateInputKeys = submission.getInputDamlStateList.asScala.toSet
val entryId = allocateEntryId()
val newHead = inDatabaseWriteTransaction("Committing a submission") { implicit connection =>
val stateInputs = readState(stateInputKeys)
val (logEntry, stateUpdates) =
KeyValueCommitting.processSubmission(
engine,
entryId,
currentRecordTime(),
LedgerReader.DefaultConfiguration,
submission,
participantId,
stateInputs,
)
queries.updateState(stateUpdates)
val latestSequenceNo =
queries.insertIntoLog(entryId, Envelope.enclose(logEntry))
latestSequenceNo + 1
val newHead = database.inWriteTransaction("Committing a submission") {
implicit connection =>
val stateInputs = readState(stateInputKeys)
val (logEntry, stateUpdates) =
KeyValueCommitting.processSubmission(
engine,
entryId,
currentRecordTime(),
LedgerReader.DefaultConfiguration,
submission,
participantId,
stateInputs,
)
queries.updateState(stateUpdates)
val latestSequenceNo =
queries.insertIntoLog(entryId, Envelope.enclose(logEntry))
latestSequenceNo + 1
}
dispatcher.signalNewHead(newHead)
SubmissionResult.Acknowledged
@ -128,55 +123,9 @@ class SqlLedgerReaderWriter(
.foldLeft(builder)(_ += _)
.result()
}
private def inDatabaseReadTransaction[T](message: String)(
body: Connection => T,
)(implicit logCtx: LoggingContext): T = {
inDatabaseTransaction(message, database.readerConnectionPool)(body)
}
private def inDatabaseWriteTransaction[T](message: String)(
body: Connection => T,
)(implicit logCtx: LoggingContext): T = {
inDatabaseTransaction(message, database.writerConnectionPool)(body)
}
private def inDatabaseTransaction[T](
message: String,
connectionPool: DataSource,
)(
body: Connection => T,
)(implicit logCtx: LoggingContext): T = {
val connection =
time(s"$message: acquiring connection")(connectionPool.getConnection())
time(message) {
try {
val result = body(connection)
connection.commit()
result
} catch {
case NonFatal(exception) =>
connection.rollback()
throw exception
} finally {
connection.close()
}
}
}
private def time[T](message: String)(body: => T)(implicit logCtx: LoggingContext): T = {
val startTime = System.nanoTime()
logger.trace(s"$message: starting")
val result = body
val endTime = System.nanoTime()
logger.trace(s"$message: finished in ${Duration.fromNanos(endTime - startTime).toMillis}ms")
result
}
}
object SqlLedgerReaderWriter {
val StartIndex: Index = 1
private val StartOffset: Offset = Offset(Array(StartIndex))
def owner(
@ -189,16 +138,17 @@ object SqlLedgerReaderWriter {
logCtx: LoggingContext,
): ResourceOwner[SqlLedgerReaderWriter] =
for {
uninitializedDatabase <- Database.owner(jdbcUrl)
database = uninitializedDatabase.migrate()
head = database.inReadTransaction("Reading head at startup") { implicit connection =>
database.queries.selectLatestLogEntryId().map(_ + 1).getOrElse(StartIndex)
}
dispatcher <- ResourceOwner.forCloseable(
() =>
Dispatcher(
"sql-participant-state",
zeroIndex = StartIndex,
headAtInitialization = StartIndex,
headAtInitialization = head,
))
uninitializedDatabase <- Database.owner(jdbcUrl)
} yield {
val database = uninitializedDatabase.migrate()
new SqlLedgerReaderWriter(ledgerId, participantId, database, dispatcher)
}
} yield new SqlLedgerReaderWriter(ledgerId, participantId, database, dispatcher)
}

View File

@ -0,0 +1,10 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.on
package object sql {
type Index = Long
val StartIndex: Index = 1
}

View File

@ -7,11 +7,12 @@ import java.sql.Connection
import anorm.SqlParser._
import anorm._
import com.daml.ledger.on.sql.Index
import com.daml.ledger.on.sql.queries.Queries._
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlLogEntryId,
DamlStateKey,
DamlStateValue,
DamlStateValue
}
import com.daml.ledger.participant.state.kvutils.api.LedgerRecord
import com.daml.ledger.participant.state.v1.Offset
@ -20,6 +21,11 @@ import com.google.protobuf.ByteString
import scala.collection.immutable
trait CommonQueries extends Queries {
override def selectLatestLogEntryId()(implicit connection: Connection): Option[Index] =
SQL"SELECT MAX(sequence_no) max_sequence_no FROM #$LogTable"
.as(get[Option[Long]]("max_sequence_no").singleOpt)
.flatten
override def selectFromLog(
start: Index,
end: Index,

View File

@ -7,6 +7,7 @@ import java.sql.Connection
import anorm.SqlParser._
import anorm._
import com.daml.ledger.on.sql.Index
import com.daml.ledger.on.sql.queries.Queries._
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId
import com.google.protobuf.ByteString

View File

@ -7,6 +7,7 @@ import java.sql.Connection
import anorm.SqlParser._
import anorm._
import com.daml.ledger.on.sql.Index
import com.daml.ledger.on.sql.queries.Queries._
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId
import com.google.protobuf.ByteString

View File

@ -6,7 +6,7 @@ package com.daml.ledger.on.sql.queries
import java.sql.Connection
import anorm.{BatchSql, NamedParameter}
import com.daml.ledger.on.sql.queries.Queries._
import com.daml.ledger.on.sql.Index
import com.daml.ledger.participant.state.kvutils.DamlKvutils
import com.daml.ledger.participant.state.kvutils.api.LedgerRecord
import com.google.protobuf.ByteString
@ -14,6 +14,8 @@ import com.google.protobuf.ByteString
import scala.collection.immutable
trait Queries {
def selectLatestLogEntryId()(implicit connection: Connection): Option[Index]
def selectFromLog(
start: Index,
end: Index,
@ -36,8 +38,6 @@ trait Queries {
}
object Queries {
type Index = Long
val TablePrefix = "ledger"
val LogTable = s"${TablePrefix}_log"
val StateTable = s"${TablePrefix}_state"

View File

@ -7,6 +7,7 @@ import java.sql.Connection
import anorm.SqlParser._
import anorm._
import com.daml.ledger.on.sql.Index
import com.daml.ledger.on.sql.queries.Queries._
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId
import com.google.protobuf.ByteString

View File

@ -22,7 +22,7 @@ abstract class SqlLedgerReaderWriterIntegrationSpecBase(implementationName: Stri
protected def newJdbcUrl(): String
override final val startIndex: Long = SqlLedgerReaderWriter.StartIndex
override final val startIndex: Long = StartIndex
override final def participantStateFactory(
participantId: ParticipantId,

View File

@ -0,0 +1,78 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.on.sql
import java.nio.file.Files
import java.util.UUID
import akka.stream.scaladsl.Sink
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.participant.state.v1.Update.PartyAddedToParticipant
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.data.Ref.{LedgerString, Party}
import com.digitalasset.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.resources.Resource
import org.scalatest.{AsyncWordSpec, BeforeAndAfterEach, Matchers}
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.DurationInt
import scala.util.Random
class RestartSpec
extends AsyncWordSpec
with Matchers
with BeforeAndAfterEach
with AkkaBeforeAndAfterAll {
private val root = Files.createTempDirectory(getClass.getSimpleName)
private def start(id: String): Resource[KeyValueParticipantState] = {
newLoggingContext { implicit logCtx =>
val ledgerId: LedgerString = LedgerString.assertFromString(s"ledger-${UUID.randomUUID()}")
val participantId: ParticipantId = LedgerString.assertFromString("participant")
val jdbcUrl =
s"jdbc:sqlite:file:$root/$id.sqlite"
SqlLedgerReaderWriter
.owner(ledgerId, participantId, jdbcUrl)
.map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter))
.acquire()
}
}
"an SQL ledger reader-writer" should {
"resume where it left off on restart" in {
val id = Random.nextInt().toString
for {
_ <- start(id).use { participant =>
for {
_ <- participant
.allocateParty(None, Some(Party.assertFromString("party-1")), randomLedgerString())
.toScala
} yield ()
}
updates <- start(id).use { participant =>
for {
_ <- participant
.allocateParty(None, Some(Party.assertFromString("party-2")), randomLedgerString())
.toScala
updates <- participant
.stateUpdates(beginAfter = None)
.take(2)
.completionTimeout(10.seconds)
.runWith(Sink.seq)
} yield updates.map(_._2)
}
} yield {
all(updates) should be(a[PartyAddedToParticipant])
val displayNames = updates.map(_.asInstanceOf[PartyAddedToParticipant].displayName)
displayNames should be(Seq("party-1", "party-2"))
}
}
}
private def randomLedgerString(): LedgerString =
Ref.LedgerString.assertFromString(UUID.randomUUID().toString)
}

View File

@ -50,7 +50,7 @@ trait Resource[A] {
flatMap(nested => nested)
def transformWith[B](f: Try[A] => Resource[B])(
implicit executionContext: ExecutionContext
implicit executionContext: ExecutionContext,
): Resource[B] =
Resource(
asFuture.transformWith(f.andThen(Future.successful)),
@ -59,6 +59,14 @@ trait Resource[A] {
).flatten
def vary[B >: A]: Resource[B] = asInstanceOf[Resource[B]]
def use[T](behavior: A => Future[T])(implicit executionContext: ExecutionContext): Future[T] =
asFuture
.flatMap(behavior)
.transformWith {
case Success(value) => release().map(_ => value)
case Failure(exception) => release().flatMap(_ => Future.failed(exception))
}
}
object Resource {

View File

@ -0,0 +1,59 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.resources
import com.digitalasset.resources.FailingResourceOwner.FailingResourceFailedToOpen
import org.scalatest.{AsyncWordSpec, Matchers}
class ResourceSpec extends AsyncWordSpec with Matchers {
"a Resource" should {
"be used" in {
val owner = ResourceOwner.successful(42)
owner.acquire().use { value =>
value should be(42)
}
}
"clean up after use" in {
val owner = TestResourceOwner(42)
owner
.acquire()
.use { value =>
owner.hasBeenAcquired should be(true)
value should be(42)
}
.map { _ =>
owner.hasBeenAcquired should be(false)
}
}
"report errors in acquisition even after usage" in {
val owner = FailingResourceOwner[Int]()
owner
.acquire()
.use { _ =>
fail("Can't use a failed resource.")
}
.failed
.map { exception =>
exception should be(a[FailingResourceFailedToOpen])
}
}
"report errors in usage" in {
val owner = TestResourceOwner(54)
owner
.acquire()
.use { _ =>
owner.hasBeenAcquired should be(true)
sys.error("Uh oh.")
}
.failed
.map { exception =>
owner.hasBeenAcquired should be(false)
exception.getMessage should be("Uh oh.")
}
}
}
}