From fb58dc87564436c730243e0a5a2e05ef83589f1e Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Thu, 30 Jan 2020 11:16:32 +0100 Subject: [PATCH] ledger-on-sql: Make sure the ledger picks up where it left off. (#4266) * resources: Make it easy to use a resource and then release it. * ledger-on-sql: Move database transaction methods to `Database`. * ledger-on-sql: Make sure the ledger picks up where it left off. This means seeding the dispatcher with the correct head, rather than starting at 1 every time. CHANGELOG_BEGIN CHANGELOG_END * ledger-on-sql: Move `Index` and `StartIndex` to the package object. They're used everywhere. Co-authored-by: Samir Talwar --- ledger/ledger-on-sql/BUILD.bazel | 1 + .../com/daml/ledger/on/sql/Database.scala | 56 +++++++++- .../ledger/on/sql/SqlLedgerReaderWriter.scala | 104 +++++------------- .../com/daml/ledger/on/sql/package.scala | 10 ++ .../ledger/on/sql/queries/CommonQueries.scala | 8 +- .../ledger/on/sql/queries/H2Queries.scala | 1 + .../on/sql/queries/PostgresqlQueries.scala | 1 + .../daml/ledger/on/sql/queries/Queries.scala | 6 +- .../ledger/on/sql/queries/SqliteQueries.scala | 1 + ...edgerReaderWriterIntegrationSpecBase.scala | 2 +- .../com/daml/ledger/on/sql/RestartSpec.scala | 78 +++++++++++++ .../com/digitalasset/resources/Resource.scala | 10 +- .../digitalasset/resources/ResourceSpec.scala | 59 ++++++++++ 13 files changed, 250 insertions(+), 87 deletions(-) create mode 100644 ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/package.scala create mode 100644 ledger/ledger-on-sql/src/test/suite/scala/com/daml/ledger/on/sql/RestartSpec.scala create mode 100644 libs-scala/resources/src/test/suite/scala/com/digitalasset/resources/ResourceSpec.scala diff --git a/ledger/ledger-on-sql/BUILD.bazel b/ledger/ledger-on-sql/BUILD.bazel index ec9e456042..3e99e49ce1 100644 --- a/ledger/ledger-on-sql/BUILD.bazel +++ b/ledger/ledger-on-sql/BUILD.bazel @@ -170,6 +170,7 @@ da_scala_test_suite( "@maven//:com_typesafe_akka_akka_actor_2_12", "@maven//:com_typesafe_akka_akka_stream_2_12", "@maven//:org_flywaydb_flyway_core", + "@maven//:org_scala_lang_modules_scala_java8_compat_2_12", "@maven//:org_scalactic_scalactic_2_12", "@maven//:org_scalatest_scalatest_2_12", ], diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala index e4bd384888..c9fe18c447 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala @@ -3,6 +3,8 @@ package com.daml.ledger.on.sql +import java.sql.Connection + import com.daml.ledger.on.sql.queries.{H2Queries, PostgresqlQueries, Queries, SqliteQueries} import com.digitalasset.logging.{ContextualizedLogger, LoggingContext} import com.digitalasset.resources.ResourceOwner @@ -10,11 +12,57 @@ import com.zaxxer.hikari.HikariDataSource import javax.sql.DataSource import org.flywaydb.core.Flyway -case class Database( - queries: Queries, +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +class Database( + val queries: Queries, readerConnectionPool: DataSource, writerConnectionPool: DataSource, -) +) { + private val logger = ContextualizedLogger.get(this.getClass) + + def inReadTransaction[T](message: String)( + body: Connection => T, + )(implicit logCtx: LoggingContext): T = { + inTransaction(message, readerConnectionPool)(body) + } + + def inWriteTransaction[T](message: String)( + body: Connection => T, + )(implicit logCtx: LoggingContext): T = { + inTransaction(message, writerConnectionPool)(body) + } + + private def inTransaction[T](message: String, connectionPool: DataSource)( + body: Connection => T, + )(implicit logCtx: LoggingContext): T = { + val connection = + time(s"$message: acquiring connection")(connectionPool.getConnection()) + time(message) { + try { + val result = body(connection) + connection.commit() + result + } catch { + case NonFatal(exception) => + connection.rollback() + throw exception + } finally { + connection.close() + } + } + } + + private def time[T](message: String)(body: => T)(implicit logCtx: LoggingContext): T = { + val startTime = System.nanoTime() + logger.trace(s"$message: starting") + val result = body + val endTime = System.nanoTime() + logger.trace(s"$message: finished in ${Duration.fromNanos(endTime - startTime).toMillis}ms") + result + } +} object Database { private val logger = ContextualizedLogger.get(classOf[Database]) @@ -148,7 +196,7 @@ object Database { def migrate(): Database = { flyway.migrate() afterMigration() - Database(system.queries, readerConnectionPool, writerConnectionPool) + new Database(system.queries, readerConnectionPool, writerConnectionPool) } def clear(): this.type = { diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala index ddffb1498f..b2dce15f26 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala @@ -11,11 +11,10 @@ import akka.NotUsed import akka.stream.Materializer import akka.stream.scaladsl.Source import com.daml.ledger.on.sql.SqlLedgerReaderWriter._ -import com.daml.ledger.on.sql.queries.Queries.Index import com.daml.ledger.participant.state.kvutils.DamlKvutils.{ DamlLogEntryId, DamlStateKey, - DamlStateValue, + DamlStateValue } import com.daml.ledger.participant.state.kvutils.api.{LedgerReader, LedgerRecord, LedgerWriter} import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting} @@ -24,19 +23,16 @@ import com.digitalasset.daml.lf.data.Ref import com.digitalasset.daml.lf.data.Time.Timestamp import com.digitalasset.daml.lf.engine.Engine import com.digitalasset.ledger.api.health.{HealthStatus, Healthy} +import com.digitalasset.logging.LoggingContext import com.digitalasset.logging.LoggingContext.withEnrichedLoggingContext -import com.digitalasset.logging.{ContextualizedLogger, LoggingContext} import com.digitalasset.platform.akkastreams.dispatcher.Dispatcher import com.digitalasset.platform.akkastreams.dispatcher.SubSource.RangeSource import com.digitalasset.resources.ResourceOwner import com.google.protobuf.ByteString -import javax.sql.DataSource import scala.collection.JavaConverters._ import scala.collection.immutable.TreeSet -import scala.concurrent.duration.Duration import scala.concurrent.{ExecutionContext, Future} -import scala.util.control.NonFatal class SqlLedgerReaderWriter( ledgerId: LedgerId = Ref.LedgerString.assertFromString(UUID.randomUUID.toString), @@ -50,8 +46,6 @@ class SqlLedgerReaderWriter( ) extends LedgerWriter with LedgerReader { - private val logger = ContextualizedLogger.get(this.getClass) - private val engine = Engine() private val queries = database.queries @@ -66,7 +60,7 @@ class SqlLedgerReaderWriter( .startingAt( offset.getOrElse(StartOffset).components.head, RangeSource((start, end) => { - val result = inDatabaseReadTransaction(s"Querying events [$start, $end[ from log") { + val result = database.inReadTransaction(s"Querying events [$start, $end[ from log") { implicit connection => queries.selectFromLog(start, end) } @@ -88,22 +82,23 @@ class SqlLedgerReaderWriter( .getOrElse(throw new IllegalArgumentException("Not a valid submission in envelope")) val stateInputKeys = submission.getInputDamlStateList.asScala.toSet val entryId = allocateEntryId() - val newHead = inDatabaseWriteTransaction("Committing a submission") { implicit connection => - val stateInputs = readState(stateInputKeys) - val (logEntry, stateUpdates) = - KeyValueCommitting.processSubmission( - engine, - entryId, - currentRecordTime(), - LedgerReader.DefaultConfiguration, - submission, - participantId, - stateInputs, - ) - queries.updateState(stateUpdates) - val latestSequenceNo = - queries.insertIntoLog(entryId, Envelope.enclose(logEntry)) - latestSequenceNo + 1 + val newHead = database.inWriteTransaction("Committing a submission") { + implicit connection => + val stateInputs = readState(stateInputKeys) + val (logEntry, stateUpdates) = + KeyValueCommitting.processSubmission( + engine, + entryId, + currentRecordTime(), + LedgerReader.DefaultConfiguration, + submission, + participantId, + stateInputs, + ) + queries.updateState(stateUpdates) + val latestSequenceNo = + queries.insertIntoLog(entryId, Envelope.enclose(logEntry)) + latestSequenceNo + 1 } dispatcher.signalNewHead(newHead) SubmissionResult.Acknowledged @@ -128,55 +123,9 @@ class SqlLedgerReaderWriter( .foldLeft(builder)(_ += _) .result() } - - private def inDatabaseReadTransaction[T](message: String)( - body: Connection => T, - )(implicit logCtx: LoggingContext): T = { - inDatabaseTransaction(message, database.readerConnectionPool)(body) - } - - private def inDatabaseWriteTransaction[T](message: String)( - body: Connection => T, - )(implicit logCtx: LoggingContext): T = { - inDatabaseTransaction(message, database.writerConnectionPool)(body) - } - - private def inDatabaseTransaction[T]( - message: String, - connectionPool: DataSource, - )( - body: Connection => T, - )(implicit logCtx: LoggingContext): T = { - val connection = - time(s"$message: acquiring connection")(connectionPool.getConnection()) - time(message) { - try { - val result = body(connection) - connection.commit() - result - } catch { - case NonFatal(exception) => - connection.rollback() - throw exception - } finally { - connection.close() - } - } - } - - private def time[T](message: String)(body: => T)(implicit logCtx: LoggingContext): T = { - val startTime = System.nanoTime() - logger.trace(s"$message: starting") - val result = body - val endTime = System.nanoTime() - logger.trace(s"$message: finished in ${Duration.fromNanos(endTime - startTime).toMillis}ms") - result - } } object SqlLedgerReaderWriter { - val StartIndex: Index = 1 - private val StartOffset: Offset = Offset(Array(StartIndex)) def owner( @@ -189,16 +138,17 @@ object SqlLedgerReaderWriter { logCtx: LoggingContext, ): ResourceOwner[SqlLedgerReaderWriter] = for { + uninitializedDatabase <- Database.owner(jdbcUrl) + database = uninitializedDatabase.migrate() + head = database.inReadTransaction("Reading head at startup") { implicit connection => + database.queries.selectLatestLogEntryId().map(_ + 1).getOrElse(StartIndex) + } dispatcher <- ResourceOwner.forCloseable( () => Dispatcher( "sql-participant-state", zeroIndex = StartIndex, - headAtInitialization = StartIndex, + headAtInitialization = head, )) - uninitializedDatabase <- Database.owner(jdbcUrl) - } yield { - val database = uninitializedDatabase.migrate() - new SqlLedgerReaderWriter(ledgerId, participantId, database, dispatcher) - } + } yield new SqlLedgerReaderWriter(ledgerId, participantId, database, dispatcher) } diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/package.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/package.scala new file mode 100644 index 0000000000..9b1f81dfc5 --- /dev/null +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/package.scala @@ -0,0 +1,10 @@ +// Copyright (c) 2020 The DAML Authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.on + +package object sql { + type Index = Long + + val StartIndex: Index = 1 +} diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/CommonQueries.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/CommonQueries.scala index 5930871f2f..02f2833505 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/CommonQueries.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/CommonQueries.scala @@ -7,11 +7,12 @@ import java.sql.Connection import anorm.SqlParser._ import anorm._ +import com.daml.ledger.on.sql.Index import com.daml.ledger.on.sql.queries.Queries._ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{ DamlLogEntryId, DamlStateKey, - DamlStateValue, + DamlStateValue } import com.daml.ledger.participant.state.kvutils.api.LedgerRecord import com.daml.ledger.participant.state.v1.Offset @@ -20,6 +21,11 @@ import com.google.protobuf.ByteString import scala.collection.immutable trait CommonQueries extends Queries { + override def selectLatestLogEntryId()(implicit connection: Connection): Option[Index] = + SQL"SELECT MAX(sequence_no) max_sequence_no FROM #$LogTable" + .as(get[Option[Long]]("max_sequence_no").singleOpt) + .flatten + override def selectFromLog( start: Index, end: Index, diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/H2Queries.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/H2Queries.scala index 4b7980f5b7..ab8e65efa1 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/H2Queries.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/H2Queries.scala @@ -7,6 +7,7 @@ import java.sql.Connection import anorm.SqlParser._ import anorm._ +import com.daml.ledger.on.sql.Index import com.daml.ledger.on.sql.queries.Queries._ import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId import com.google.protobuf.ByteString diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/PostgresqlQueries.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/PostgresqlQueries.scala index 2929faef65..e25ad8831b 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/PostgresqlQueries.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/PostgresqlQueries.scala @@ -7,6 +7,7 @@ import java.sql.Connection import anorm.SqlParser._ import anorm._ +import com.daml.ledger.on.sql.Index import com.daml.ledger.on.sql.queries.Queries._ import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId import com.google.protobuf.ByteString diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/Queries.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/Queries.scala index 5f247e012f..fc18c0b9bb 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/Queries.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/Queries.scala @@ -6,7 +6,7 @@ package com.daml.ledger.on.sql.queries import java.sql.Connection import anorm.{BatchSql, NamedParameter} -import com.daml.ledger.on.sql.queries.Queries._ +import com.daml.ledger.on.sql.Index import com.daml.ledger.participant.state.kvutils.DamlKvutils import com.daml.ledger.participant.state.kvutils.api.LedgerRecord import com.google.protobuf.ByteString @@ -14,6 +14,8 @@ import com.google.protobuf.ByteString import scala.collection.immutable trait Queries { + def selectLatestLogEntryId()(implicit connection: Connection): Option[Index] + def selectFromLog( start: Index, end: Index, @@ -36,8 +38,6 @@ trait Queries { } object Queries { - type Index = Long - val TablePrefix = "ledger" val LogTable = s"${TablePrefix}_log" val StateTable = s"${TablePrefix}_state" diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/SqliteQueries.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/SqliteQueries.scala index a9e934c5b7..d5e46d521b 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/SqliteQueries.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/SqliteQueries.scala @@ -7,6 +7,7 @@ import java.sql.Connection import anorm.SqlParser._ import anorm._ +import com.daml.ledger.on.sql.Index import com.daml.ledger.on.sql.queries.Queries._ import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId import com.google.protobuf.ByteString diff --git a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala index 30f60cd5c6..fdf64a7803 100644 --- a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala +++ b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala @@ -22,7 +22,7 @@ abstract class SqlLedgerReaderWriterIntegrationSpecBase(implementationName: Stri protected def newJdbcUrl(): String - override final val startIndex: Long = SqlLedgerReaderWriter.StartIndex + override final val startIndex: Long = StartIndex override final def participantStateFactory( participantId: ParticipantId, diff --git a/ledger/ledger-on-sql/src/test/suite/scala/com/daml/ledger/on/sql/RestartSpec.scala b/ledger/ledger-on-sql/src/test/suite/scala/com/daml/ledger/on/sql/RestartSpec.scala new file mode 100644 index 0000000000..055314c26b --- /dev/null +++ b/ledger/ledger-on-sql/src/test/suite/scala/com/daml/ledger/on/sql/RestartSpec.scala @@ -0,0 +1,78 @@ +// Copyright (c) 2020 The DAML Authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.on.sql + +import java.nio.file.Files +import java.util.UUID + +import akka.stream.scaladsl.Sink +import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState +import com.daml.ledger.participant.state.v1.ParticipantId +import com.daml.ledger.participant.state.v1.Update.PartyAddedToParticipant +import com.digitalasset.daml.lf.data.Ref +import com.digitalasset.daml.lf.data.Ref.{LedgerString, Party} +import com.digitalasset.ledger.api.testing.utils.AkkaBeforeAndAfterAll +import com.digitalasset.logging.LoggingContext.newLoggingContext +import com.digitalasset.resources.Resource +import org.scalatest.{AsyncWordSpec, BeforeAndAfterEach, Matchers} + +import scala.compat.java8.FutureConverters._ +import scala.concurrent.duration.DurationInt +import scala.util.Random + +class RestartSpec + extends AsyncWordSpec + with Matchers + with BeforeAndAfterEach + with AkkaBeforeAndAfterAll { + + private val root = Files.createTempDirectory(getClass.getSimpleName) + + private def start(id: String): Resource[KeyValueParticipantState] = { + newLoggingContext { implicit logCtx => + val ledgerId: LedgerString = LedgerString.assertFromString(s"ledger-${UUID.randomUUID()}") + val participantId: ParticipantId = LedgerString.assertFromString("participant") + val jdbcUrl = + s"jdbc:sqlite:file:$root/$id.sqlite" + SqlLedgerReaderWriter + .owner(ledgerId, participantId, jdbcUrl) + .map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter)) + .acquire() + } + } + + "an SQL ledger reader-writer" should { + "resume where it left off on restart" in { + val id = Random.nextInt().toString + for { + _ <- start(id).use { participant => + for { + _ <- participant + .allocateParty(None, Some(Party.assertFromString("party-1")), randomLedgerString()) + .toScala + } yield () + } + updates <- start(id).use { participant => + for { + _ <- participant + .allocateParty(None, Some(Party.assertFromString("party-2")), randomLedgerString()) + .toScala + updates <- participant + .stateUpdates(beginAfter = None) + .take(2) + .completionTimeout(10.seconds) + .runWith(Sink.seq) + } yield updates.map(_._2) + } + } yield { + all(updates) should be(a[PartyAddedToParticipant]) + val displayNames = updates.map(_.asInstanceOf[PartyAddedToParticipant].displayName) + displayNames should be(Seq("party-1", "party-2")) + } + } + } + + private def randomLedgerString(): LedgerString = + Ref.LedgerString.assertFromString(UUID.randomUUID().toString) +} diff --git a/libs-scala/resources/src/main/scala/com/digitalasset/resources/Resource.scala b/libs-scala/resources/src/main/scala/com/digitalasset/resources/Resource.scala index c7f8d62e2e..b62d2db595 100644 --- a/libs-scala/resources/src/main/scala/com/digitalasset/resources/Resource.scala +++ b/libs-scala/resources/src/main/scala/com/digitalasset/resources/Resource.scala @@ -50,7 +50,7 @@ trait Resource[A] { flatMap(nested => nested) def transformWith[B](f: Try[A] => Resource[B])( - implicit executionContext: ExecutionContext + implicit executionContext: ExecutionContext, ): Resource[B] = Resource( asFuture.transformWith(f.andThen(Future.successful)), @@ -59,6 +59,14 @@ trait Resource[A] { ).flatten def vary[B >: A]: Resource[B] = asInstanceOf[Resource[B]] + + def use[T](behavior: A => Future[T])(implicit executionContext: ExecutionContext): Future[T] = + asFuture + .flatMap(behavior) + .transformWith { + case Success(value) => release().map(_ => value) + case Failure(exception) => release().flatMap(_ => Future.failed(exception)) + } } object Resource { diff --git a/libs-scala/resources/src/test/suite/scala/com/digitalasset/resources/ResourceSpec.scala b/libs-scala/resources/src/test/suite/scala/com/digitalasset/resources/ResourceSpec.scala new file mode 100644 index 0000000000..698c625ddc --- /dev/null +++ b/libs-scala/resources/src/test/suite/scala/com/digitalasset/resources/ResourceSpec.scala @@ -0,0 +1,59 @@ +// Copyright (c) 2020 The DAML Authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.digitalasset.resources + +import com.digitalasset.resources.FailingResourceOwner.FailingResourceFailedToOpen +import org.scalatest.{AsyncWordSpec, Matchers} + +class ResourceSpec extends AsyncWordSpec with Matchers { + "a Resource" should { + "be used" in { + val owner = ResourceOwner.successful(42) + owner.acquire().use { value => + value should be(42) + } + } + + "clean up after use" in { + val owner = TestResourceOwner(42) + owner + .acquire() + .use { value => + owner.hasBeenAcquired should be(true) + value should be(42) + } + .map { _ => + owner.hasBeenAcquired should be(false) + } + } + + "report errors in acquisition even after usage" in { + val owner = FailingResourceOwner[Int]() + owner + .acquire() + .use { _ => + fail("Can't use a failed resource.") + } + .failed + .map { exception => + exception should be(a[FailingResourceFailedToOpen]) + } + } + + "report errors in usage" in { + val owner = TestResourceOwner(54) + owner + .acquire() + .use { _ => + owner.hasBeenAcquired should be(true) + sys.error("Uh oh.") + } + .failed + .map { exception => + owner.hasBeenAcquired should be(false) + exception.getMessage should be("Uh oh.") + } + } + } +}