Sandbox: Respect MRT (#990)

Transactions with a record time > maximum record time are now rejected
with a timeout error instead of being committed to the ledger.
This commit is contained in:
Gerolf Seitz 2019-05-09 11:10:54 +02:00 committed by GitHub
parent 82edab41f8
commit c89f3bdebe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 304 additions and 65 deletions

View File

@ -9,6 +9,9 @@ This page contains release notes for the SDK.
HEAD — ongoing
--------------
- Sandbox: Transactions with a record time that is after the maximum record time (as provided in the original command)
are now properly rejected instead of committed to the ledger: `#987 <https://github.com/digital-asset/daml/issues/987>`_
.. _release-0-12-16:
0.12.16 - 2019-05-07

View File

@ -43,6 +43,7 @@ dependencies = [
"//ledger/ledger-api-domain:ledger-api-domain",
"//ledger/ledger-api-scala-logging",
"//ledger/sandbox:sandbox",
"//ledger/test-common:test-common",
"//ledger/sandbox:sandbox-scala-tests-lib",
]

View File

@ -20,7 +20,7 @@ import com.digitalasset.ledger.api.v1.value.{Record, RecordField, Value}
import com.digitalasset.platform.participant.util.ValueConversions._
import com.digitalasset.ledger.api.v1.event.{ExercisedEvent}
import com.digitalasset.ledger.api.v1.transaction.TreeEvent
import com.digitalasset.platform.apitesting.LedgerBackend.SandboxInMemory
import com.digitalasset.platform.testing.LedgerBackend.SandboxInMemory
@SuppressWarnings(Array("org.wartremover.warts.Any"))
class WitnessIT

View File

@ -11,7 +11,7 @@ import com.digitalasset.ledger.api.testing.utils.{
MockMessages,
SuiteResourceManagementAroundAll
}
import com.digitalasset.platform.apitesting.LedgerBackend.SandboxSql
import com.digitalasset.platform.testing.LedgerBackend.SandboxSql
import com.digitalasset.platform.apitesting.{LedgerContext, MultiLedgerFixture}
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.google.protobuf.empty.Empty

View File

@ -3,7 +3,7 @@
package com.digitalasset.platform.tests.integration.ledger.api.identity
import com.digitalasset.platform.apitesting.LedgerBackend
import com.digitalasset.platform.testing.LedgerBackend
import scala.concurrent.Promise

View File

@ -37,7 +37,7 @@ import com.digitalasset.ledger.api.v1.value.{
Value,
Variant
}
import com.digitalasset.platform.apitesting.LedgerBackend.SandboxInMemory
import com.digitalasset.platform.testing.LedgerBackend.SandboxInMemory
import com.digitalasset.platform.apitesting.LedgerContextExtensions._
import com.digitalasset.platform.participant.util.ValueConversions._
import com.google.rpc.code.Code

View File

@ -41,6 +41,7 @@ import com.digitalasset.ledger.client.services.commands.CommandClient
import com.digitalasset.ledger.client.services.pkg.PackageClient
import com.digitalasset.ledger.client.services.testing.time.StaticTime
import com.digitalasset.ledger.client.services.transactions.TransactionClient
import com.digitalasset.platform.testing.ResourceExtensions
import io.grpc.{Channel, StatusRuntimeException}
import io.grpc.reflection.v1alpha.ServerReflectionGrpc
import org.slf4j.LoggerFactory

View File

@ -7,6 +7,7 @@ import com.digitalasset.ledger.api.testing.utils.Resource
import com.digitalasset.platform.PlatformApplications
import com.digitalasset.platform.apitesting.LedgerFactories.SandboxStore
import com.digitalasset.platform.esf.TestExecutionSequencerFactory
import com.digitalasset.platform.testing.{LedgerBackend, MultiResourceBase}
import org.scalatest.AsyncTestSuite
trait MultiLedgerFixture

View File

@ -31,9 +31,8 @@ class SandboxSemanticTestsLfRunner
.withTimeProvider(TimeProviderType.StaticAllowBackwards)
// TODO SC delete when implicit disclosure supplied in PostgresLedgerDao
override protected def fixtureIdsEnabled
: Set[com.digitalasset.platform.apitesting.LedgerBackend] =
Set(com.digitalasset.platform.apitesting.LedgerBackend.SandboxInMemory)
override protected def fixtureIdsEnabled: Set[com.digitalasset.platform.testing.LedgerBackend] =
Set(com.digitalasset.platform.testing.LedgerBackend.SandboxInMemory)
lazy val (mainPkgId, packages) = {
val dar = UniversalArchiveReader().readFile(darFile).get

View File

@ -133,6 +133,7 @@ daml_compile(
testDependencies = [
":sandbox",
"//ledger/test-common:test-common",
"//ledger-api/testing-utils",
"//3rdparty/jvm/org/scalatest:scalatest",
"//3rdparty/jvm/org/scalacheck:scalacheck",

View File

@ -94,47 +94,57 @@ class InMemoryLedger(
)
private def handleSuccessfulTx(transactionId: String, tx: TransactionSubmission): Unit = {
val toAbsCoid: ContractId => AbsoluteContractId =
SandboxEventIdFormatter.makeAbsCoid(transactionId)
val mappedTx = tx.transaction.mapContractIdAndValue(toAbsCoid, _.mapContractId(toAbsCoid))
// 5b. modify the ActiveContracts, while checking that we do not have double
// spends or timing issues
val acsRes = acs.addTransaction(
let = tx.ledgerEffectiveTime,
workflowId = tx.workflowId,
transactionId = transactionId,
transaction = mappedTx,
explicitDisclosure = tx.blindingInfo.explicitDisclosure,
localImplicitDisclosure = tx.blindingInfo.localImplicitDisclosure,
globalImplicitDisclosure = tx.blindingInfo.globalImplicitDisclosure,
)
acsRes match {
case Left(err) =>
handleError(tx, RejectionReason.Inconsistent(s"Reason: ${err.mkString("[", ", ", "]")}"))
case Right(newAcs) =>
acs = newAcs
val recordTx = mappedTx
.mapNodeId(SandboxEventIdFormatter.fromTransactionId(transactionId, _))
val recordBlinding =
tx.blindingInfo.explicitDisclosure.map {
case (nid, parties) =>
(SandboxEventIdFormatter.fromTransactionId(transactionId, nid), parties)
}
val entry = LedgerEntry
.Transaction(
tx.commandId,
transactionId,
tx.applicationId,
tx.submitter,
tx.workflowId,
tx.ledgerEffectiveTime,
timeProvider.getCurrentTime,
recordTx,
recordBlinding.transform((_, v) => v.toSet[String])
)
entries.publish(entry)
()
val recordTime = timeProvider.getCurrentTime
if (recordTime.isAfter(tx.maximumRecordTime)) {
// This can happen if the DAML-LF computation (i.e. exercise of a choice) takes longer
// than the time window between LET and MRT allows for.
// See https://github.com/digital-asset/daml/issues/987
handleError(
tx,
RejectionReason.TimedOut(
s"RecordTime $recordTime is after MaxiumRecordTime ${tx.maximumRecordTime}"))
} else {
val toAbsCoid: ContractId => AbsoluteContractId =
SandboxEventIdFormatter.makeAbsCoid(transactionId)
val mappedTx = tx.transaction.mapContractIdAndValue(toAbsCoid, _.mapContractId(toAbsCoid))
// 5b. modify the ActiveContracts, while checking that we do not have double
// spends or timing issues
val acsRes = acs.addTransaction(
let = tx.ledgerEffectiveTime,
workflowId = tx.workflowId,
transactionId = transactionId,
transaction = mappedTx,
explicitDisclosure = tx.blindingInfo.explicitDisclosure,
localImplicitDisclosure = tx.blindingInfo.localImplicitDisclosure,
globalImplicitDisclosure = tx.blindingInfo.globalImplicitDisclosure,
)
acsRes match {
case Left(err) =>
handleError(tx, RejectionReason.Inconsistent(s"Reason: ${err.mkString("[", ", ", "]")}"))
case Right(newAcs) =>
acs = newAcs
val recordTx = mappedTx
.mapNodeId(SandboxEventIdFormatter.fromTransactionId(transactionId, _))
val recordBlinding =
tx.blindingInfo.explicitDisclosure.map {
case (nid, parties) =>
(SandboxEventIdFormatter.fromTransactionId(transactionId, nid), parties)
}
val entry = LedgerEntry
.Transaction(
tx.commandId,
transactionId,
tx.applicationId,
tx.submitter,
tx.workflowId,
tx.ledgerEffectiveTime,
recordTime,
recordTx,
recordBlinding.transform((_, v) => v.toSet[String])
)
entries.publish(entry)
()
}
}
}

View File

@ -13,7 +13,11 @@ import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.transaction.Node
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractId}
import com.digitalasset.ledger.backend.api.v1.{SubmissionResult, TransactionSubmission}
import com.digitalasset.ledger.backend.api.v1.{
RejectionReason,
SubmissionResult,
TransactionSubmission
}
import com.digitalasset.platform.akkastreams.dispatcher.Dispatcher
import com.digitalasset.platform.akkastreams.dispatcher.SubSource.RangeSource
import com.digitalasset.platform.common.util.{DirectExecutionContext => DE}
@ -233,17 +237,32 @@ private class SqlLedger(
parties.toSet[String]
}
LedgerEntry.Transaction(
tx.commandId,
transactionId,
tx.applicationId,
tx.submitter,
tx.workflowId,
tx.ledgerEffectiveTime,
timeProvider.getCurrentTime,
mappedTx,
mappedDisclosure
)
val recordTime = timeProvider.getCurrentTime
if (recordTime.isAfter(tx.maximumRecordTime)) {
// This can happen if the DAML-LF computation (i.e. exercise of a choice) takes longer
// than the time window between LET and MRT allows for.
// See https://github.com/digital-asset/daml/issues/987
LedgerEntry.Rejection(
recordTime,
tx.commandId,
tx.applicationId,
tx.submitter,
RejectionReason.TimedOut(
s"RecordTime $recordTime is after MaximumRecordTime ${tx.maximumRecordTime}")
)
} else {
LedgerEntry.Transaction(
tx.commandId,
transactionId,
tx.applicationId,
tx.submitter,
tx.workflowId,
tx.ledgerEffectiveTime,
recordTime,
mappedTx,
mappedDisclosure
)
}
}
private def enqueue(f: Long => LedgerEntry): Future[SubmissionResult] = {

View File

@ -0,0 +1,78 @@
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.sandbox
import akka.stream.Materializer
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.ledger.api.testing.utils.Resource
import com.digitalasset.platform.sandbox.metrics.MetricsManager
import com.digitalasset.platform.sandbox.persistence.{PostgresFixture, PostgresResource}
import com.digitalasset.platform.sandbox.stores.ActiveContractsInMemory
import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode
import com.digitalasset.platform.sandbox.stores.ledger.{Ledger, LedgerEntry}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
object LedgerResource {
def resource(ledgerFactory: () => Future[Ledger]): Resource[Ledger] = new Resource[Ledger] {
@volatile
var ledger: Ledger = _
override def value: Ledger = ledger
override def setup(): Unit = ledger = Await.result(ledgerFactory(), 30.seconds)
override def close(): Unit = ledger.close()
}
def inMemory(
ledgerId: String,
timeProvider: TimeProvider,
acs: ActiveContractsInMemory = ActiveContractsInMemory.empty,
entries: Seq[LedgerEntry] = Nil): Resource[Ledger] =
LedgerResource.resource(
() =>
Future.successful(
Ledger.inMemory(ledgerId, timeProvider, acs, entries)
)
)
def postgres(ledgerId: String, timeProvider: TimeProvider)(
implicit mat: Materializer,
mm: MetricsManager) = {
new Resource[Ledger] {
@volatile
private var postgres: Resource[PostgresFixture] = null
@volatile
private var ledger: Resource[Ledger] = null
override def value(): Ledger = ledger.value
override def setup(): Unit = {
postgres = PostgresResource()
postgres.setup()
ledger = LedgerResource.resource(
() =>
Ledger.postgres(
postgres.value.jdbcUrl,
ledgerId,
timeProvider,
Nil,
SqlStartMode.AlwaysReset))
ledger.setup()
}
override def close(): Unit = {
ledger.close()
postgres.close()
postgres = null
ledger = null
}
}
}
}

View File

@ -0,0 +1,104 @@
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.sandbox.stores.ledger
import java.time.Instant
import akka.stream.scaladsl.Sink
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.data.ImmArray
import com.digitalasset.daml.lf.transaction.Transaction.{ContractId, NodeId, Value}
import com.digitalasset.daml.lf.transaction.{BlindingInfo, GenTransaction}
import com.digitalasset.ledger.api.testing.utils.{
AkkaBeforeAndAfterAll,
Resource,
SuiteResourceManagementAroundEach
}
import com.digitalasset.ledger.backend.api.v1.{
RejectionReason,
SubmissionResult,
TransactionSubmission
}
import com.digitalasset.platform.sandbox.{LedgerResource, MetricsAround}
import com.digitalasset.platform.testing.MultiResourceBase
import org.scalatest.concurrent.{AsyncTimeLimitedTests, ScalaFutures}
import org.scalatest.time.Span
import org.scalatest.{AsyncWordSpec, Matchers}
import scala.concurrent.duration._
sealed abstract class BackendType
object BackendType {
case object InMemory extends BackendType
case object Postgres extends BackendType
}
@SuppressWarnings(Array("org.wartremover.warts.Any"))
class TransactionMRTComplianceIT
extends AsyncWordSpec
with AkkaBeforeAndAfterAll
with MultiResourceBase[BackendType, Ledger]
with SuiteResourceManagementAroundEach
with AsyncTimeLimitedTests
with ScalaFutures
with Matchers
with MetricsAround {
override def timeLimit: Span = 60.seconds
val ledgerId = "ledgerId"
val timeProvider = TimeProvider.Constant(Instant.EPOCH.plusSeconds(10))
/** Overriding this provides an easy way to narrow down testing to a single implementation. */
override protected def fixtureIdsEnabled: Set[BackendType] =
Set(BackendType.InMemory, BackendType.Postgres)
override protected def constructResource(index: Int, fixtureId: BackendType): Resource[Ledger] =
fixtureId match {
case BackendType.InMemory =>
LedgerResource.inMemory(ledgerId, timeProvider)
case BackendType.Postgres =>
LedgerResource.postgres(ledgerId, timeProvider)
}
val LET = Instant.EPOCH.plusSeconds(2)
val MRT = Instant.EPOCH.plusSeconds(5)
"A Ledger" should {
"reject transactions with a record time after the MRT" in allFixtures { ledger =>
val emptyBlinding = BlindingInfo(Map.empty, Map.empty, Map.empty)
val dummyTransaction =
GenTransaction[NodeId, ContractId, Value[ContractId]](Map.empty, ImmArray.empty, Set.empty)
val submission = TransactionSubmission(
"cmdId",
"wfid",
"submitter",
LET,
MRT,
"appId",
emptyBlinding,
dummyTransaction)
ledger.publishTransaction(submission).map(_ shouldBe SubmissionResult.Acknowledged)
ledger
.ledgerEntries(None)
.runWith(Sink.head)
.map(_._2)
.map {
_ should matchPattern {
case LedgerEntry.Rejection(
recordTime,
"cmdId",
"appId",
"submitter",
RejectionReason.TimedOut(_)) =>
}
}
}
}
}

View File

@ -0,0 +1,22 @@
# Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
load(
"//bazel_tools:scala.bzl",
"da_scala_library",
)
dependencies = [
"//3rdparty/jvm/org/scalatest:scalatest",
"//ledger-api/rs-grpc-akka",
"//ledger-api/testing-utils",
]
da_scala_library(
name = "test-common",
srcs = glob(["src/main/scala/**/*.scala"]),
visibility = [
"//visibility:public",
],
deps = dependencies,
)

View File

@ -1,7 +1,7 @@
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.apitesting
package com.digitalasset.platform.testing
sealed abstract class LedgerBackend extends Product with Serializable

View File

@ -1,7 +1,7 @@
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.apitesting
package com.digitalasset.platform.testing
import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
@ -9,11 +9,11 @@ import com.digitalasset.grpc.adapter.utils.DirectExecutionContext
import org.scalatest._
import org.scalatest.concurrent.AsyncTimeLimitedTests
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._
import scala.collection.immutable.Iterable
import scala.concurrent.ExecutionContext.global
import scala.concurrent.{Future, Promise, TimeoutException}
import scala.concurrent.duration._
import scala.util.control.{NoStackTrace, NonFatal}
trait MultiFixtureBase[FixtureId, TestContext]

View File

@ -1,7 +1,7 @@
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.apitesting
package com.digitalasset.platform.testing
import com.digitalasset.ledger.api.testing.utils.{Resource, SuiteResource}
import org.scalatest.AsyncTestSuite

View File

@ -1,7 +1,7 @@
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.apitesting
package com.digitalasset.platform.testing
import java.util.concurrent.atomic.AtomicReference