diff --git a/ledger/ledger-api-integration-tests/src/test/itsuite/scala/com/digitalasset/platform/tests/integration/ledger/api/ResetServiceIT.scala b/ledger/ledger-api-integration-tests/src/test/itsuite/scala/com/digitalasset/platform/tests/integration/ledger/api/ResetServiceIT.scala index 5abc6df8ee1..cbb716b53e3 100644 --- a/ledger/ledger-api-integration-tests/src/test/itsuite/scala/com/digitalasset/platform/tests/integration/ledger/api/ResetServiceIT.scala +++ b/ledger/ledger-api-integration-tests/src/test/itsuite/scala/com/digitalasset/platform/tests/integration/ledger/api/ResetServiceIT.scala @@ -17,7 +17,7 @@ import com.digitalasset.ledger.api.v1.command_service.SubmitAndWaitRequest import com.digitalasset.ledger.api.v1.event.CreatedEvent import com.digitalasset.ledger.api.v1.ledger_identity_service.GetLedgerIdentityRequest import com.digitalasset.ledger.api.v1.testing.reset_service.ResetRequest -import com.digitalasset.platform.apitesting.{LedgerBackend, LedgerContext, MultiLedgerFixture} +import com.digitalasset.platform.apitesting.{LedgerContext, MultiLedgerFixture} import com.digitalasset.platform.sandbox.services.TestCommands import com.digitalasset.platform.sandbox.utils.InfiniteRetries import io.grpc.Status @@ -42,8 +42,6 @@ class ResetServiceIT override def timeLimit: Span = 30.seconds - override protected def fixtureIdsEnabled: Set[LedgerBackend] = Set(LedgerBackend.SandboxInMemory) - override protected val config: Config = Config.defaultWithLedgerId(None) override protected def darFile: File = new File("ledger/sandbox/Test.dar") diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxApplication.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxApplication.scala index f709fe944e0..f1805f3af25 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxApplication.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxApplication.scala @@ -17,6 +17,7 @@ import com.digitalasset.platform.sandbox.config.{SandboxConfig, SandboxContext} import com.digitalasset.platform.sandbox.services.SandboxResetService import com.digitalasset.platform.sandbox.stores.ActiveContracts import com.digitalasset.platform.sandbox.stores.ledger._ +import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode import com.digitalasset.platform.server.services.testing.TimeServiceBackend import com.digitalasset.platform.services.time.TimeProviderType import io.grpc.netty.GrpcSslContexts @@ -64,12 +65,13 @@ object SandboxApplication { }, () => { server.close() // fully tear down the old server. - buildAndStartServer() + buildAndStartServer(SqlStartMode.AlwaysReset) }, ) @SuppressWarnings(Array("org.wartremover.warts.ExplicitImplicitTypes")) - private def buildAndStartServer(): Unit = { + private def buildAndStartServer( + startMode: SqlStartMode = SqlStartMode.ContinueIfExists): Unit = { implicit val mat = materializer implicit val ec: ExecutionContext = mat.system.dispatcher @@ -93,7 +95,7 @@ object SandboxApplication { val (ledgerType, ledger) = config.jdbcUrl match { case None => ("in-memory", Ledger.inMemory(ledgerId, timeProvider, acs, records)) case Some(jdbcUrl) => - val ledgerF = Ledger.postgres(jdbcUrl, ledgerId, timeProvider, records) + val ledgerF = Ledger.postgres(jdbcUrl, ledgerId, timeProvider, records, startMode) val ledger = Try(Await.result(ledgerF, asyncTolerance)).fold(t => { val msg = "Could not start PostgreSQL persistence layer" diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/services/SandboxResetService.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/services/SandboxResetService.scala index aeba93efb49..27da8e31e59 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/services/SandboxResetService.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/services/SandboxResetService.scala @@ -19,7 +19,7 @@ class SandboxResetService( getServer: () => Server, getEc: () => ExecutionContext, closeAllServices: () => Unit, - buildAndStartServer: () => Unit) + resetAndStartServer: () => Unit) extends ResetServiceGrpc.ResetService with BindableService { @@ -69,7 +69,7 @@ class SandboxResetService( server.shutdownNow() } logger.info(s"Rebuilding server...") - buildAndStartServer() + resetAndStartServer() logger.info(s"Server reset complete.") }) } diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/Ledger.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/Ledger.scala index ede90e18bcf..35d35f97079 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/Ledger.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/Ledger.scala @@ -16,7 +16,7 @@ import com.digitalasset.ledger.backend.api.v1.{SubmissionResult, TransactionSubm import com.digitalasset.platform.sandbox.stores.ActiveContracts import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract import com.digitalasset.platform.sandbox.stores.ledger.inmemory.InMemoryLedger -import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlLedger +import com.digitalasset.platform.sandbox.stores.ledger.sql.{SqlLedger, SqlStartMode} import scala.collection.immutable import scala.concurrent.Future @@ -56,8 +56,9 @@ object Ledger { ledgerId: String, timeProvider: TimeProvider, ledgerEntries: Seq[LedgerEntry], + startMode: SqlStartMode )(implicit mat: Materializer): Future[Ledger] = //TODO (robert): casting from Seq to immutable.Seq, make ledgerEntries immutable throughout the Sandbox? - SqlLedger(jdbcUrl, Some(ledgerId), timeProvider, immutable.Seq(ledgerEntries: _*)) + SqlLedger(jdbcUrl, Some(ledgerId), timeProvider, immutable.Seq(ledgerEntries: _*), startMode) } diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedger.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedger.scala index 79fcf71f614..fb1af9287f9 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedger.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedger.scala @@ -19,6 +19,10 @@ import com.digitalasset.platform.common.util.DirectExecutionContext import com.digitalasset.platform.sandbox.config.LedgerIdGenerator import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdFormatter import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract +import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode.{ + AlwaysReset, + ContinueIfExists +} import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.PersistenceResponse.{Duplicate, Ok} import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.{LedgerDao, PostgresLedgerDao} import com.digitalasset.platform.sandbox.stores.ledger.sql.serialisation.{ @@ -34,13 +38,27 @@ import scala.collection.immutable import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} +sealed abstract class SqlStartMode extends Product with Serializable + +object SqlStartMode { + + /** Will continue using an initialised ledger, otherwise initialize a new one */ + final case object ContinueIfExists extends SqlStartMode + + /** Will always reset and initialize the ledger, even if it has data. */ + final case object AlwaysReset extends SqlStartMode + +} + object SqlLedger { //jdbcUrl must have the user/password encoded in form of: "jdbc:postgresql://localhost/test?user=fred&password=secret" def apply( jdbcUrl: String, ledgerId: Option[String], timeProvider: TimeProvider, - ledgerEntries: immutable.Seq[LedgerEntry])(implicit mat: Materializer): Future[Ledger] = { + ledgerEntries: immutable.Seq[LedgerEntry], + startMode: SqlStartMode = SqlStartMode.ContinueIfExists)( + implicit mat: Materializer): Future[Ledger] = { implicit val ec: ExecutionContext = DirectExecutionContext val noOfShortLivedConnections = 10 @@ -52,7 +70,7 @@ object SqlLedger { val sqlLedgerFactory = SqlLedgerFactory(ledgerDao) for { - sqlLedger <- sqlLedgerFactory.createSqlLedger(ledgerId, timeProvider) + sqlLedger <- sqlLedgerFactory.createSqlLedger(ledgerId, timeProvider, startMode) _ <- sqlLedger.loadStartingState(ledgerEntries) } yield sqlLedger } @@ -237,18 +255,34 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) { * In case the ledger had already been initialized, the given ledger id must not be set or must * be equal to the one in the database. * @param timeProvider to get the current time when sequencing transactions + * @param startMode whether we should start with a clean state or continue where we left off * @return a compliant Ledger implementation */ - def createSqlLedger(initialLedgerId: Option[String], timeProvider: TimeProvider)( - implicit mat: Materializer): Future[SqlLedger] = { + def createSqlLedger( + initialLedgerId: Option[String], + timeProvider: TimeProvider, + startMode: SqlStartMode)(implicit mat: Materializer): Future[SqlLedger] = { @SuppressWarnings(Array("org.wartremover.warts.ExplicitImplicitTypes")) implicit val ec = DirectExecutionContext + + def init() = startMode match { + case AlwaysReset => + for { + _ <- reset() + ledgerId <- initialize(initialLedgerId) + } yield ledgerId + case ContinueIfExists => initialize(initialLedgerId) + } + for { - ledgerId <- initialize(initialLedgerId) + ledgerId <- init() ledgerEnd <- ledgerDao.lookupLedgerEnd() } yield new SqlLedger(ledgerId, ledgerEnd, ledgerDao, timeProvider) } + private def reset(): Future[Unit] = + ledgerDao.reset() + private def initialize(initialLedgerId: Option[String]) = initialLedgerId match { case Some(initialId) => ledgerDao diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/LedgerDao.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/LedgerDao.scala index f673da093c1..f6052c9f25a 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/LedgerDao.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/LedgerDao.scala @@ -121,4 +121,7 @@ trait LedgerDao extends AutoCloseable { newLedgerEnd: Long, ledgerEntry: LedgerEntry): Future[PersistenceResponse] + /** Resets the platform into a state as it was never used before. Meant to be used solely for testing. */ + def reset(): Future[Unit] + } diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/PostgresLedgerDao.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/PostgresLedgerDao.scala index f74ff5b382e..8a20eace34c 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/PostgresLedgerDao.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/PostgresLedgerDao.scala @@ -657,9 +657,25 @@ private class PostgresLedgerDao( DirectExecutionContext) } - override def close(): Unit = { + private val SQL_TRUNCATE_ALL_TABLES = + SQL(""" + |truncate ledger_entries cascade; + |truncate disclosures cascade; + |truncate contracts cascade; + |truncate contract_witnesses cascade; + |truncate contract_key_maintainers cascade; + |truncate parameters cascade; + |truncate contract_keys cascade; + """.stripMargin) + + override def reset(): Future[Unit] = + dbDispatcher.executeSql { implicit conn => + val _ = SQL_TRUNCATE_ALL_TABLES.execute() + () + } + + override def close(): Unit = dbDispatcher.close() - } }