mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 09:17:43 +03:00
making SQL Ledger reset-able (#559)
* making SQL Ledger resetable * fixing compile errors * enabling the test for the SQL Ledger
This commit is contained in:
parent
b206d0d8ac
commit
2ffe3fe8b4
@ -17,7 +17,7 @@ import com.digitalasset.ledger.api.v1.command_service.SubmitAndWaitRequest
|
||||
import com.digitalasset.ledger.api.v1.event.CreatedEvent
|
||||
import com.digitalasset.ledger.api.v1.ledger_identity_service.GetLedgerIdentityRequest
|
||||
import com.digitalasset.ledger.api.v1.testing.reset_service.ResetRequest
|
||||
import com.digitalasset.platform.apitesting.{LedgerBackend, LedgerContext, MultiLedgerFixture}
|
||||
import com.digitalasset.platform.apitesting.{LedgerContext, MultiLedgerFixture}
|
||||
import com.digitalasset.platform.sandbox.services.TestCommands
|
||||
import com.digitalasset.platform.sandbox.utils.InfiniteRetries
|
||||
import io.grpc.Status
|
||||
@ -42,8 +42,6 @@ class ResetServiceIT
|
||||
|
||||
override def timeLimit: Span = 30.seconds
|
||||
|
||||
override protected def fixtureIdsEnabled: Set[LedgerBackend] = Set(LedgerBackend.SandboxInMemory)
|
||||
|
||||
override protected val config: Config = Config.defaultWithLedgerId(None)
|
||||
|
||||
override protected def darFile: File = new File("ledger/sandbox/Test.dar")
|
||||
|
@ -17,6 +17,7 @@ import com.digitalasset.platform.sandbox.config.{SandboxConfig, SandboxContext}
|
||||
import com.digitalasset.platform.sandbox.services.SandboxResetService
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContracts
|
||||
import com.digitalasset.platform.sandbox.stores.ledger._
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode
|
||||
import com.digitalasset.platform.server.services.testing.TimeServiceBackend
|
||||
import com.digitalasset.platform.services.time.TimeProviderType
|
||||
import io.grpc.netty.GrpcSslContexts
|
||||
@ -64,12 +65,13 @@ object SandboxApplication {
|
||||
},
|
||||
() => {
|
||||
server.close() // fully tear down the old server.
|
||||
buildAndStartServer()
|
||||
buildAndStartServer(SqlStartMode.AlwaysReset)
|
||||
},
|
||||
)
|
||||
|
||||
@SuppressWarnings(Array("org.wartremover.warts.ExplicitImplicitTypes"))
|
||||
private def buildAndStartServer(): Unit = {
|
||||
private def buildAndStartServer(
|
||||
startMode: SqlStartMode = SqlStartMode.ContinueIfExists): Unit = {
|
||||
implicit val mat = materializer
|
||||
implicit val ec: ExecutionContext = mat.system.dispatcher
|
||||
|
||||
@ -93,7 +95,7 @@ object SandboxApplication {
|
||||
val (ledgerType, ledger) = config.jdbcUrl match {
|
||||
case None => ("in-memory", Ledger.inMemory(ledgerId, timeProvider, acs, records))
|
||||
case Some(jdbcUrl) =>
|
||||
val ledgerF = Ledger.postgres(jdbcUrl, ledgerId, timeProvider, records)
|
||||
val ledgerF = Ledger.postgres(jdbcUrl, ledgerId, timeProvider, records, startMode)
|
||||
|
||||
val ledger = Try(Await.result(ledgerF, asyncTolerance)).fold(t => {
|
||||
val msg = "Could not start PostgreSQL persistence layer"
|
||||
|
@ -19,7 +19,7 @@ class SandboxResetService(
|
||||
getServer: () => Server,
|
||||
getEc: () => ExecutionContext,
|
||||
closeAllServices: () => Unit,
|
||||
buildAndStartServer: () => Unit)
|
||||
resetAndStartServer: () => Unit)
|
||||
extends ResetServiceGrpc.ResetService
|
||||
with BindableService {
|
||||
|
||||
@ -69,7 +69,7 @@ class SandboxResetService(
|
||||
server.shutdownNow()
|
||||
}
|
||||
logger.info(s"Rebuilding server...")
|
||||
buildAndStartServer()
|
||||
resetAndStartServer()
|
||||
logger.info(s"Server reset complete.")
|
||||
})
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ import com.digitalasset.ledger.backend.api.v1.{SubmissionResult, TransactionSubm
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContracts
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.inmemory.InMemoryLedger
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlLedger
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.sql.{SqlLedger, SqlStartMode}
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
@ -56,8 +56,9 @@ object Ledger {
|
||||
ledgerId: String,
|
||||
timeProvider: TimeProvider,
|
||||
ledgerEntries: Seq[LedgerEntry],
|
||||
startMode: SqlStartMode
|
||||
)(implicit mat: Materializer): Future[Ledger] =
|
||||
//TODO (robert): casting from Seq to immutable.Seq, make ledgerEntries immutable throughout the Sandbox?
|
||||
SqlLedger(jdbcUrl, Some(ledgerId), timeProvider, immutable.Seq(ledgerEntries: _*))
|
||||
SqlLedger(jdbcUrl, Some(ledgerId), timeProvider, immutable.Seq(ledgerEntries: _*), startMode)
|
||||
|
||||
}
|
||||
|
@ -19,6 +19,10 @@ import com.digitalasset.platform.common.util.DirectExecutionContext
|
||||
import com.digitalasset.platform.sandbox.config.LedgerIdGenerator
|
||||
import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdFormatter
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode.{
|
||||
AlwaysReset,
|
||||
ContinueIfExists
|
||||
}
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.PersistenceResponse.{Duplicate, Ok}
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.{LedgerDao, PostgresLedgerDao}
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.sql.serialisation.{
|
||||
@ -34,13 +38,27 @@ import scala.collection.immutable
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.{Failure, Success}
|
||||
|
||||
sealed abstract class SqlStartMode extends Product with Serializable
|
||||
|
||||
object SqlStartMode {
|
||||
|
||||
/** Will continue using an initialised ledger, otherwise initialize a new one */
|
||||
final case object ContinueIfExists extends SqlStartMode
|
||||
|
||||
/** Will always reset and initialize the ledger, even if it has data. */
|
||||
final case object AlwaysReset extends SqlStartMode
|
||||
|
||||
}
|
||||
|
||||
object SqlLedger {
|
||||
//jdbcUrl must have the user/password encoded in form of: "jdbc:postgresql://localhost/test?user=fred&password=secret"
|
||||
def apply(
|
||||
jdbcUrl: String,
|
||||
ledgerId: Option[String],
|
||||
timeProvider: TimeProvider,
|
||||
ledgerEntries: immutable.Seq[LedgerEntry])(implicit mat: Materializer): Future[Ledger] = {
|
||||
ledgerEntries: immutable.Seq[LedgerEntry],
|
||||
startMode: SqlStartMode = SqlStartMode.ContinueIfExists)(
|
||||
implicit mat: Materializer): Future[Ledger] = {
|
||||
implicit val ec: ExecutionContext = DirectExecutionContext
|
||||
|
||||
val noOfShortLivedConnections = 10
|
||||
@ -52,7 +70,7 @@ object SqlLedger {
|
||||
val sqlLedgerFactory = SqlLedgerFactory(ledgerDao)
|
||||
|
||||
for {
|
||||
sqlLedger <- sqlLedgerFactory.createSqlLedger(ledgerId, timeProvider)
|
||||
sqlLedger <- sqlLedgerFactory.createSqlLedger(ledgerId, timeProvider, startMode)
|
||||
_ <- sqlLedger.loadStartingState(ledgerEntries)
|
||||
} yield sqlLedger
|
||||
}
|
||||
@ -237,18 +255,34 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
|
||||
* In case the ledger had already been initialized, the given ledger id must not be set or must
|
||||
* be equal to the one in the database.
|
||||
* @param timeProvider to get the current time when sequencing transactions
|
||||
* @param startMode whether we should start with a clean state or continue where we left off
|
||||
* @return a compliant Ledger implementation
|
||||
*/
|
||||
def createSqlLedger(initialLedgerId: Option[String], timeProvider: TimeProvider)(
|
||||
implicit mat: Materializer): Future[SqlLedger] = {
|
||||
def createSqlLedger(
|
||||
initialLedgerId: Option[String],
|
||||
timeProvider: TimeProvider,
|
||||
startMode: SqlStartMode)(implicit mat: Materializer): Future[SqlLedger] = {
|
||||
@SuppressWarnings(Array("org.wartremover.warts.ExplicitImplicitTypes"))
|
||||
implicit val ec = DirectExecutionContext
|
||||
|
||||
def init() = startMode match {
|
||||
case AlwaysReset =>
|
||||
for {
|
||||
_ <- reset()
|
||||
ledgerId <- initialize(initialLedgerId)
|
||||
} yield ledgerId
|
||||
case ContinueIfExists => initialize(initialLedgerId)
|
||||
}
|
||||
|
||||
for {
|
||||
ledgerId <- initialize(initialLedgerId)
|
||||
ledgerId <- init()
|
||||
ledgerEnd <- ledgerDao.lookupLedgerEnd()
|
||||
} yield new SqlLedger(ledgerId, ledgerEnd, ledgerDao, timeProvider)
|
||||
}
|
||||
|
||||
private def reset(): Future[Unit] =
|
||||
ledgerDao.reset()
|
||||
|
||||
private def initialize(initialLedgerId: Option[String]) = initialLedgerId match {
|
||||
case Some(initialId) =>
|
||||
ledgerDao
|
||||
|
@ -121,4 +121,7 @@ trait LedgerDao extends AutoCloseable {
|
||||
newLedgerEnd: Long,
|
||||
ledgerEntry: LedgerEntry): Future[PersistenceResponse]
|
||||
|
||||
/** Resets the platform into a state as it was never used before. Meant to be used solely for testing. */
|
||||
def reset(): Future[Unit]
|
||||
|
||||
}
|
||||
|
@ -657,9 +657,25 @@ private class PostgresLedgerDao(
|
||||
DirectExecutionContext)
|
||||
}
|
||||
|
||||
override def close(): Unit = {
|
||||
private val SQL_TRUNCATE_ALL_TABLES =
|
||||
SQL("""
|
||||
|truncate ledger_entries cascade;
|
||||
|truncate disclosures cascade;
|
||||
|truncate contracts cascade;
|
||||
|truncate contract_witnesses cascade;
|
||||
|truncate contract_key_maintainers cascade;
|
||||
|truncate parameters cascade;
|
||||
|truncate contract_keys cascade;
|
||||
""".stripMargin)
|
||||
|
||||
override def reset(): Future[Unit] =
|
||||
dbDispatcher.executeSql { implicit conn =>
|
||||
val _ = SQL_TRUNCATE_ALL_TABLES.execute()
|
||||
()
|
||||
}
|
||||
|
||||
override def close(): Unit =
|
||||
dbDispatcher.close()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user