From 9f8e640b1aebdf875dfabc0242e15a6e55da09b0 Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Fri, 10 Dec 2021 16:25:46 +0100 Subject: [PATCH] Use ExecutionContext.parasitic instead of DirectExecutionContext. (#11783) * concurrent: Replace `DirectExecutionContextInternal` with `parasitic`. * concurrent: Rename `DirectExecutionContext` `parasitic`. * Use `ExecutionContext.parasitic` instead of `DirectExecutionContext`. We no longer need the latter. CHANGELOG_BEGIN CHANGELOG_END * Fix formatting. --- ledger-api/rs-grpc-akka/BUILD.bazel | 1 - .../grpc/adapter/AkkaExecutionSequencer.scala | 3 +- ledger-api/testing-utils/BUILD.bazel | 1 - .../api/testing/utils/MultiFixtureBase.scala | 11 ++++--- ledger/indexer-benchmark/BUILD.bazel | 2 -- .../scala/ledger/indexerbenchmark/Main.scala | 7 ++-- .../indexerbenchmark/IndexerBenchmark.scala | 3 +- ledger/ledger-api-client/BUILD.bazel | 1 - .../ledger/client/CommandClientIT.scala | 13 ++++---- .../commands/CommandSubmissionFlow.scala | 5 ++- .../identity/LedgerIdentityClient.scala | 14 ++++---- .../services/testing/time/StaticTime.scala | 7 ++-- .../services/version/VersionClient.scala | 6 ++-- .../withoutledgerid/VersionClient.scala | 7 ++-- .../commands/CommandTrackerFlowTest.scala | 3 +- ledger/ledger-api-common/BUILD.bazel | 2 -- .../dispatcher/SignalDispatcher.scala | 5 +-- .../akkastreams/dispatcher/SubSource.scala | 5 ++- .../platform/akkastreams/FutureTimeouts.scala | 7 ++-- .../com/daml/ledger/on/sql/Database.scala | 5 ++- .../main/scala/com/daml/metrics/Timed.scala | 13 ++++---- .../com/daml/telemetry/TelemetryContext.scala | 5 ++- .../tracking/QueueBackedTracker.scala | 5 ++- .../services/tracking/TrackerMap.scala | 15 +++++---- .../index/LedgerBackedIndexService.scala | 7 ++-- .../scala/platform/store/BaseLedger.scala | 7 +--- .../appendonlydao/PaginatingAsyncStream.scala | 17 ++++------ .../appendonlydao/events/ACSReader.scala | 5 ++- .../store/interning/StringInterningView.scala | 5 ++- .../state/kvutils/api/BatchingQueue.scala | 5 ++- .../ledger/validator/LedgerStateAccess.scala | 3 +- .../kvutils/tools/BUILD.bazel | 1 - .../integritycheck/IntegrityChecker.scala | 12 ++----- .../platform/sandbox/SandboxServer.scala | 6 ++-- .../sandbox/stores/ledger/sql/SqlLedger.scala | 32 +++++++++---------- .../sandbox/ScenarioLoadingITDivulgence.scala | 5 +-- .../command/CommandStaticTimeIT.scala | 11 ++++--- ledger/sandbox-common/BUILD.bazel | 1 - .../services/SandboxResetService.scala | 11 ++++--- ledger/sandbox-perf/BUILD.bazel | 1 - .../platform/sandbox/perf/TestHelper.scala | 20 ++++++------ libs-scala/concurrent/BUILD.bazel | 3 -- .../src/main/scala/concurrent/package.scala | 9 ++++-- .../dec/DirectExecutionContextInternal.scala | 21 ------------ .../src/main/scala/dec/package.scala | 14 -------- triggers/service/BUILD.bazel | 3 +- .../daml/lf/engine/trigger/ServiceMain.scala | 17 +++++----- 47 files changed, 145 insertions(+), 217 deletions(-) delete mode 100644 libs-scala/concurrent/src/main/scala/dec/DirectExecutionContextInternal.scala delete mode 100644 libs-scala/concurrent/src/main/scala/dec/package.scala diff --git a/ledger-api/rs-grpc-akka/BUILD.bazel b/ledger-api/rs-grpc-akka/BUILD.bazel index bdf5f56908..6aee65eb9d 100644 --- a/ledger-api/rs-grpc-akka/BUILD.bazel +++ b/ledger-api/rs-grpc-akka/BUILD.bazel @@ -17,7 +17,6 @@ da_scala_library( deps = [ "//ledger-api/rs-grpc-bridge", "//ledger/error", - "//libs-scala/concurrent", "//libs-scala/contextualized-logging", "@maven//:io_grpc_grpc_api", "@maven//:io_grpc_grpc_stub", diff --git a/ledger-api/rs-grpc-akka/src/main/scala/com/digitalasset/grpc/adapter/AkkaExecutionSequencer.scala b/ledger-api/rs-grpc-akka/src/main/scala/com/digitalasset/grpc/adapter/AkkaExecutionSequencer.scala index 30d0bbc7cd..301f8516aa 100644 --- a/ledger-api/rs-grpc-akka/src/main/scala/com/digitalasset/grpc/adapter/AkkaExecutionSequencer.scala +++ b/ledger-api/rs-grpc-akka/src/main/scala/com/digitalasset/grpc/adapter/AkkaExecutionSequencer.scala @@ -12,7 +12,6 @@ import com.daml.grpc.adapter.RunnableSequencingActor.ShutdownRequest import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future} import scala.util.control.NonFatal -import com.daml.dec.DirectExecutionContext /** Implements serial execution semantics by forwarding the Runnables it receives to an underlying actor. */ @@ -23,7 +22,7 @@ class AkkaExecutionSequencer private (private val actorRef: ActorRef)(implicit override def sequence(runnable: Runnable): Unit = actorRef ! runnable override def close(): Unit = { - closeAsync(DirectExecutionContext) + closeAsync(ExecutionContext.parasitic) () } diff --git a/ledger-api/testing-utils/BUILD.bazel b/ledger-api/testing-utils/BUILD.bazel index b122e181c4..279b49ee31 100644 --- a/ledger-api/testing-utils/BUILD.bazel +++ b/ledger-api/testing-utils/BUILD.bazel @@ -34,7 +34,6 @@ da_scala_library( "//ledger-api/grpc-definitions:ledger_api_proto_scala", "//ledger-api/rs-grpc-akka", "//ledger-api/rs-grpc-bridge", - "//libs-scala/concurrent", "//libs-scala/contextualized-logging", "//libs-scala/grpc-utils", "//libs-scala/resources", diff --git a/ledger-api/testing-utils/src/main/scala/com/digitalasset/ledger/api/testing/utils/MultiFixtureBase.scala b/ledger-api/testing-utils/src/main/scala/com/digitalasset/ledger/api/testing/utils/MultiFixtureBase.scala index 918813b9f5..8ce0e831a3 100644 --- a/ledger-api/testing-utils/src/main/scala/com/digitalasset/ledger/api/testing/utils/MultiFixtureBase.scala +++ b/ledger-api/testing-utils/src/main/scala/com/digitalasset/ledger/api/testing/utils/MultiFixtureBase.scala @@ -5,16 +5,15 @@ package com.daml.ledger.api.testing.utils import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} -import com.daml.dec.DirectExecutionContext import com.daml.logging.LoggingContext -import org.scalatest._ import org.scalatest.concurrent.{AsyncTimeLimitedTests, ScaledTimeSpans} import org.scalatest.exceptions.TestCanceledException import org.scalatest.time.Span +import org.scalatest.{Assertion, Assertions, AsyncTestSuite, BeforeAndAfterAll, Succeeded} import scala.collection.immutable.Iterable import scala.concurrent.duration.DurationInt -import scala.concurrent.{Future, Promise, TimeoutException} +import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException} import scala.util.control.{NoStackTrace, NonFatal} trait MultiFixtureBase[FixtureId, TestContext] @@ -95,10 +94,12 @@ trait MultiFixtureBase[FixtureId, TestContext] try { Future - .firstCompletedOf(List(runTest(testFixture), timeoutPromise.future))(DirectExecutionContext) + .firstCompletedOf(List(runTest(testFixture), timeoutPromise.future))( + ExecutionContext.parasitic + ) .recover { case NonFatal(throwable) => failOnFixture(throwable) - }(DirectExecutionContext) + }(ExecutionContext.parasitic) } catch { case NonFatal(throwable) => failOnFixture(throwable) } diff --git a/ledger/indexer-benchmark/BUILD.bazel b/ledger/indexer-benchmark/BUILD.bazel index 2bf889648f..7cfd243f95 100644 --- a/ledger/indexer-benchmark/BUILD.bazel +++ b/ledger/indexer-benchmark/BUILD.bazel @@ -36,7 +36,6 @@ da_scala_library( "//ledger/metrics", "//ledger/participant-integration-api", "//ledger/participant-state", - "//libs-scala/concurrent", "//libs-scala/contextualized-logging", "//libs-scala/postgresql-testing", "//libs-scala/resources", @@ -65,7 +64,6 @@ da_scala_library( "//ledger/metrics", "//ledger/participant-state", "//ledger/participant-state/kvutils", - "//libs-scala/concurrent", "//libs-scala/contextualized-logging", "@maven//:io_dropwizard_metrics_metrics_core", ], diff --git a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala index d8c8c58c66..5dba22fe55 100644 --- a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala +++ b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala @@ -11,7 +11,6 @@ import akka.actor.ActorSystem import akka.stream.Materializer import akka.stream.scaladsl.{Sink, Source} import com.codahale.metrics.MetricRegistry -import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.health.{HealthStatus, Healthy} import com.daml.ledger.configuration.LedgerId import com.daml.ledger.offset.Offset @@ -26,7 +25,7 @@ import com.daml.ledger.participant.state.v2.Update import com.daml.logging.LoggingContext.newLoggingContext import com.daml.metrics.Metrics -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} object Main { def main(args: Array[String]): Unit = @@ -97,8 +96,8 @@ object Main { data } .runWith(Sink.seq[(Offset, Update)]) - .map(seq => seq.iterator)(DirectExecutionContext) - .andThen { case _ => system.terminate() }(DirectExecutionContext) + .map(seq => seq.iterator)(ExecutionContext.parasitic) + .andThen { case _ => system.terminate() }(ExecutionContext.parasitic) } } } diff --git a/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala b/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala index af1098ee0e..cc6d8cc490 100644 --- a/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala +++ b/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala @@ -10,7 +10,6 @@ import akka.actor.ActorSystem import akka.stream.Materializer import akka.stream.scaladsl.Source import com.codahale.metrics.{MetricRegistry, Snapshot} -import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.health.{HealthStatus, Healthy} import com.daml.ledger.configuration.{Configuration, LedgerInitialConditions, LedgerTimeModel} import com.daml.ledger.offset.Offset @@ -44,7 +43,7 @@ class IndexerBenchmark() { .use(db => { println(s"Running the indexer benchmark against the ephemeral Postgres database ${db.url}") run(createUpdates, config.copy(indexerConfig = config.indexerConfig.copy(jdbcUrl = db.url))) - })(DirectExecutionContext) + })(ExecutionContext.parasitic) } def run( diff --git a/ledger/ledger-api-client/BUILD.bazel b/ledger/ledger-api-client/BUILD.bazel index 331f992b85..e42813ac61 100644 --- a/ledger/ledger-api-client/BUILD.bazel +++ b/ledger/ledger-api-client/BUILD.bazel @@ -36,7 +36,6 @@ da_scala_library( "//ledger/ledger-grpc", "//ledger/ledger-resources", "//ledger/metrics", - "//libs-scala/concurrent", "//libs-scala/grpc-utils", "//libs-scala/ports", "//libs-scala/resources", diff --git a/ledger/ledger-api-client/src/it/scala/com/digitalasset/ledger/client/CommandClientIT.scala b/ledger/ledger-api-client/src/it/scala/com/digitalasset/ledger/client/CommandClientIT.scala index f34a483347..c79538ed3f 100644 --- a/ledger/ledger-api-client/src/it/scala/com/digitalasset/ledger/client/CommandClientIT.scala +++ b/ledger/ledger-api-client/src/it/scala/com/digitalasset/ledger/client/CommandClientIT.scala @@ -9,7 +9,6 @@ import java.util.concurrent.TimeUnit import akka.NotUsed import akka.stream.scaladsl.{Sink, Source} import com.daml.api.util.TimeProvider -import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.domain import com.daml.ledger.api.testing.utils.{ IsStatusException, @@ -90,10 +89,12 @@ final class CommandClientIT configuration, ) - private def timeProvider(ledgerId: domain.LedgerId): Future[TimeProvider] = { + private def timeProvider( + ledgerId: domain.LedgerId + ): Future[TimeProvider] = { StaticTime .updatedVia(TimeServiceGrpc.stub(channel), ledgerId.unwrap) - .recover { case NonFatal(_) => TimeProvider.UTC }(DirectExecutionContext) + .recover { case NonFatal(_) => TimeProvider.UTC } } private def commandClient( @@ -102,9 +103,7 @@ final class CommandClientIT configuration: CommandClientConfiguration = defaultCommandClientConfiguration, ): Future[CommandClient] = timeProvider(ledgerId) - .map(_ => commandClientWithoutTime(ledgerId, applicationId, configuration))( - DirectExecutionContext - ) + .map(_ => commandClientWithoutTime(ledgerId, applicationId, configuration)) override protected def config: SandboxConfig = super.config.copy(ledgerIdMode = LedgerIdMode.Static(testLedgerId)) @@ -171,7 +170,7 @@ final class CommandClientIT notOk.grpcStatus.code should be(expectedErrorCode.value) notOk.grpcStatus.message should include(expectedMessageSubString) } - }(DirectExecutionContext) + } /** Reads a set of command IDs expected in the given client after the given checkpoint. * Returns a pair of sets (elements seen, elements not seen). diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandSubmissionFlow.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandSubmissionFlow.scala index c2c6989454..5a0cf2ff00 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandSubmissionFlow.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandSubmissionFlow.scala @@ -5,12 +5,11 @@ package com.daml.ledger.client.services.commands import akka.NotUsed import akka.stream.scaladsl.Flow -import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.v1.command_submission_service.SubmitRequest import com.daml.util.Ctx import com.google.protobuf.empty.Empty -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Success, Try} object CommandSubmissionFlow { @@ -33,7 +32,7 @@ object CommandSubmissionFlow { telemetryContext, ) ) - }(DirectExecutionContext) + }(ExecutionContext.parasitic) } } } diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/identity/LedgerIdentityClient.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/identity/LedgerIdentityClient.scala index 8d7a315bd5..45551e237c 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/identity/LedgerIdentityClient.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/identity/LedgerIdentityClient.scala @@ -3,7 +3,6 @@ package com.daml.ledger.client.services.identity -import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.domain.LedgerId import com.daml.ledger.api.v1.ledger_identity_service.GetLedgerIdentityRequest import com.daml.ledger.api.v1.ledger_identity_service.LedgerIdentityServiceGrpc.LedgerIdentityServiceStub @@ -16,9 +15,10 @@ final class LedgerIdentityClient(service: LedgerIdentityServiceStub) { /** The ledgerId in use, if the check was successful. */ - def satisfies(ledgerIdRequirement: LedgerIdRequirement, token: Option[String] = None)(implicit - ec: ExecutionContext - ): Future[LedgerId] = + def satisfies( + ledgerIdRequirement: LedgerIdRequirement, + token: Option[String] = None, + )(implicit executionContext: ExecutionContext): Future[LedgerId] = for { ledgerId <- getLedgerId(token) } yield { @@ -30,10 +30,12 @@ final class LedgerIdentityClient(service: LedgerIdentityServiceStub) { LedgerId(ledgerId) } - def getLedgerId(token: Option[String] = None): Future[String] = + def getLedgerId( + token: Option[String] = None + )(implicit executionContext: ExecutionContext): Future[String] = LedgerClient .stub(service, token) .getLedgerIdentity(new GetLedgerIdentityRequest()) - .map(_.ledgerId)(DirectExecutionContext) + .map(_.ledgerId) } diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/testing/time/StaticTime.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/testing/time/StaticTime.scala index 970e84821b..772115f107 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/testing/time/StaticTime.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/testing/time/StaticTime.scala @@ -8,13 +8,12 @@ import java.util.concurrent.atomic.AtomicReference import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, RunnableGraph, Sink} import akka.stream.{ClosedShape, KillSwitches, Materializer, UniqueKillSwitch} -import com.daml.api.util.{TimeProvider, TimestampConversion} import com.daml.api.util.TimestampConversion._ +import com.daml.api.util.{TimeProvider, TimestampConversion} import com.daml.grpc.adapter.ExecutionSequencerFactory import com.daml.grpc.adapter.client.akka.ClientAdapter -import com.daml.dec.DirectExecutionContext -import com.daml.ledger.api.v1.testing.time_service.{GetTimeRequest, SetTimeRequest} import com.daml.ledger.api.v1.testing.time_service.TimeServiceGrpc.{TimeService, TimeServiceStub} +import com.daml.ledger.api.v1.testing.time_service.{GetTimeRequest, SetTimeRequest} import com.daml.ledger.client.LedgerClient import scala.concurrent.{ExecutionContext, Future} @@ -68,7 +67,7 @@ object StaticTime { // We serve this in a future which completes when the first element has passed through. // Thus we make sure that the object we serve already received time data from the ledger. futureOfFirstElem.map(_ => new StaticTime(timeService, clockRef, killSwitch, ledgerId))( - DirectExecutionContext + ExecutionContext.parasitic ) } { implicit b => (killSwitch, sinkHead) => import GraphDSL.Implicits._ diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/version/VersionClient.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/version/VersionClient.scala index 60f208f102..d0858bdd29 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/version/VersionClient.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/version/VersionClient.scala @@ -6,12 +6,14 @@ package com.daml.ledger.client.services.version import com.daml.ledger.api.domain.LedgerId import com.daml.ledger.api.v1.version_service.VersionServiceGrpc.VersionServiceStub -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} final class VersionClient(ledgerId: LedgerId, service: VersionServiceStub) { private val it = new withoutledgerid.VersionClient(service) - def getApiVersion(token: Option[String] = None): Future[String] = + def getApiVersion( + token: Option[String] = None + )(implicit executionContext: ExecutionContext): Future[String] = it.getApiVersion(ledgerId, token) } diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/version/withoutledgerid/VersionClient.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/version/withoutledgerid/VersionClient.scala index 4efb5fa390..7c266a731e 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/version/withoutledgerid/VersionClient.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/version/withoutledgerid/VersionClient.scala @@ -3,24 +3,23 @@ package com.daml.ledger.client.services.version.withoutledgerid -import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.domain.LedgerId import com.daml.ledger.api.v1.version_service.GetLedgerApiVersionRequest import com.daml.ledger.api.v1.version_service.VersionServiceGrpc.VersionServiceStub import com.daml.ledger.client.LedgerClient import scalaz.syntax.tag._ -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} private[daml] final class VersionClient(service: VersionServiceStub) { def getApiVersion( ledgerIdToUse: LedgerId, token: Option[String] = None, - ): Future[String] = + )(implicit executionContext: ExecutionContext): Future[String] = LedgerClient .stub(service, token) .getLedgerApiVersion( new GetLedgerApiVersionRequest(ledgerIdToUse.unwrap) ) - .map(_.version)(DirectExecutionContext) + .map(_.version) } diff --git a/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala b/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala index 95722902ac..321a1b674e 100644 --- a/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala +++ b/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala @@ -14,7 +14,6 @@ import akka.stream.testkit.{TestPublisher, TestSubscriber} import akka.stream.{OverflowStrategy, QueueOfferResult} import com.daml.api.util.TimestampConversion._ import com.daml.concurrent.ExecutionContext -import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll import com.daml.ledger.api.v1.command_completion_service.Checkpoint import com.daml.ledger.api.v1.commands.Commands @@ -105,7 +104,7 @@ class CommandTrackerFlowTest startOffset: LedgerOffset, ) - private implicit val ec: ExecutionContext[Nothing] = DirectExecutionContext + private implicit val ec: ExecutionContext[Nothing] = ExecutionContext.parasitic private val stateRef = new AtomicReference[Promise[State]](Promise[State]()) def createCompletionsSource( diff --git a/ledger/ledger-api-common/BUILD.bazel b/ledger/ledger-api-common/BUILD.bazel index 48a9a3f5c1..c04fa63f13 100644 --- a/ledger/ledger-api-common/BUILD.bazel +++ b/ledger/ledger-api-common/BUILD.bazel @@ -43,7 +43,6 @@ da_scala_library( "//ledger/ledger-offset", "//ledger/ledger-resources", "//ledger/metrics", - "//libs-scala/concurrent", "//libs-scala/contextualized-logging", "//libs-scala/grpc-utils", "//libs-scala/logging-entries", @@ -81,7 +80,6 @@ da_scala_library( "//daml-lf/transaction", "//language-support/scala/bindings", "//ledger/ledger-api-domain", - "//libs-scala/concurrent", "//libs-scala/grpc-utils", "@maven//:com_google_api_grpc_proto_google_common_protos", "@maven//:org_scalatest_scalatest_compatible", diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/akkastreams/dispatcher/SignalDispatcher.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/akkastreams/dispatcher/SignalDispatcher.scala index 6694be5467..25f178b332 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/akkastreams/dispatcher/SignalDispatcher.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/akkastreams/dispatcher/SignalDispatcher.scala @@ -9,9 +9,10 @@ import akka.NotUsed import akka.stream._ import akka.stream.scaladsl.{Source, SourceQueueWithComplete} import com.daml.platform.akkastreams.dispatcher.SignalDispatcher.Signal -import com.daml.dec.DirectExecutionContext import org.slf4j.LoggerFactory +import scala.concurrent.ExecutionContext + /** A fanout signaller that can be subscribed to dynamically. * Signals may be coalesced, but if a signal is sent, we guarantee that all consumers subscribed before * the signal is sent will eventually receive a signal. @@ -53,7 +54,7 @@ class SignalDispatcher private () extends AutoCloseable { q.watchCompletion() .onComplete { _ => runningState.updateAndGet(_.map(s => s - q)) - }(DirectExecutionContext) + }(ExecutionContext.parasitic) NotUsed } diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/akkastreams/dispatcher/SubSource.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/akkastreams/dispatcher/SubSource.scala index a05b459950..f1e9e60edf 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/akkastreams/dispatcher/SubSource.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/akkastreams/dispatcher/SubSource.scala @@ -5,10 +5,9 @@ package com.daml.platform.akkastreams.dispatcher import akka.NotUsed import akka.stream.scaladsl.Source -import com.daml.dec.DirectExecutionContext import scala.annotation.nowarn -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} /** Defines how the progress on the ledger should be mapped to look-up operations */ @nowarn("msg=parameter value evidence.* is never used") @@ -44,7 +43,7 @@ object SubSource { readElement(index).map { t => val nextIndex = readSuccessor(index) Some((nextIndex, (index, t))) - }(DirectExecutionContext) + }(ExecutionContext.parasitic) } } } diff --git a/ledger/ledger-api-common/src/test/lib/scala/com/digitalasset/platform/akkastreams/FutureTimeouts.scala b/ledger/ledger-api-common/src/test/lib/scala/com/digitalasset/platform/akkastreams/FutureTimeouts.scala index ea3dd4a834..24e8997cc2 100644 --- a/ledger/ledger-api-common/src/test/lib/scala/com/digitalasset/platform/akkastreams/FutureTimeouts.scala +++ b/ledger/ledger-api-common/src/test/lib/scala/com/digitalasset/platform/akkastreams/FutureTimeouts.scala @@ -3,12 +3,11 @@ package com.daml.platform.akkastreams import akka.actor.ActorSystem -import com.daml.dec.DirectExecutionContext import org.scalatest.Assertion import org.scalatest.wordspec.AsyncWordSpec -import scala.concurrent.{Future, Promise, TimeoutException} import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException} import scala.util.Try import scala.util.control.NoStackTrace @@ -28,10 +27,10 @@ trait FutureTimeouts { self: AsyncWordSpec => runnable, )(system.dispatcher) - f.onComplete((_: Try[Any]) => cancellable.cancel())(DirectExecutionContext) + f.onComplete((_: Try[Any]) => cancellable.cancel())(ExecutionContext.parasitic) recoverToSucceededIf[TimeoutException]( - Future.firstCompletedOf[Any](List[Future[Any]](f, promise.future))(DirectExecutionContext) + Future.firstCompletedOf[Any](List[Future[Any]](f, promise.future))(ExecutionContext.parasitic) ) } } diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala index 1920922925..fc48c89cba 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala @@ -8,7 +8,6 @@ import java.util.concurrent.Executors import javax.sql.DataSource import com.daml.concurrent.{ExecutionContext, Future} -import com.daml.dec.DirectExecutionContext import com.daml.ledger.on.sql.Database._ import com.daml.ledger.on.sql.queries._ import com.daml.ledger.participant.state.kvutils.KVOffsetBuilder @@ -143,7 +142,7 @@ object Database { implicit val writerConnectionPool: ConnectionPool[Writer] = new ConnectionPool(writerDataSource) implicit val adminConnectionPool: ConnectionPool[Migrator] = - new ConnectionPool(adminDataSource)(DirectExecutionContext) + new ConnectionPool(adminDataSource)(ExecutionContext.parasitic) new UninitializedDatabase(system, offsetBuilder, metrics) } } @@ -169,7 +168,7 @@ object Database { implicit val readerWriterConnectionPool: ConnectionPool[Reader with Writer] = new ConnectionPool(readerWriterDataSource) implicit val adminConnectionPool: ConnectionPool[Migrator] = - new ConnectionPool(adminDataSource)(DirectExecutionContext) + new ConnectionPool(adminDataSource)(ExecutionContext.parasitic) new UninitializedDatabase(system, offsetBuilder, metrics) } } diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Timed.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Timed.scala index 156771f497..c24033c8e2 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/Timed.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/Timed.scala @@ -9,9 +9,8 @@ import akka.Done import akka.stream.scaladsl.{Keep, Source} import com.codahale.metrics.{Counter, Meter, Timer} import com.daml.concurrent -import com.daml.dec.DirectExecutionContext -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} object Timed { @@ -56,25 +55,25 @@ object Timed { def future[T](timer: Timer, future: => Future[T]): Future[T] = { val ctx = timer.time() val result = future - result.onComplete(_ => ctx.stop())(DirectExecutionContext) + result.onComplete(_ => ctx.stop())(ExecutionContext.parasitic) result } def future[EC, T](timer: Timer, future: => concurrent.Future[EC, T]): concurrent.Future[EC, T] = { val ctx = timer.time() val result = future - result.onComplete(_ => ctx.stop())(DirectExecutionContext) + result.onComplete(_ => ctx.stop())(concurrent.ExecutionContext.parasitic) result } def trackedFuture[T](counter: Counter, future: => Future[T]): Future[T] = { counter.inc() - future.andThen { case _ => counter.dec() }(DirectExecutionContext) + future.andThen { case _ => counter.dec() }(ExecutionContext.parasitic) } def trackedFuture[T](meter: Meter, future: => Future[T]): Future[T] = { meter.mark(+1) - future.andThen { case _ => meter.mark(-1) }(DirectExecutionContext) + future.andThen { case _ => meter.mark(-1) }(ExecutionContext.parasitic) } def timedAndTrackedFuture[T](timer: Timer, counter: Counter, future: => Future[T]): Future[T] = { @@ -90,7 +89,7 @@ object Timed { source .watchTermination()(Keep.both[Mat, Future[Done]]) .mapMaterializedValue { case (mat, done) => - done.onComplete(_ => ctx.stop())(DirectExecutionContext) + done.onComplete(_ => ctx.stop())(ExecutionContext.parasitic) mat } } diff --git a/ledger/metrics/src/main/scala/com/daml/telemetry/TelemetryContext.scala b/ledger/metrics/src/main/scala/com/daml/telemetry/TelemetryContext.scala index 2b4d1d57f5..c887b0a702 100644 --- a/ledger/metrics/src/main/scala/com/daml/telemetry/TelemetryContext.scala +++ b/ledger/metrics/src/main/scala/com/daml/telemetry/TelemetryContext.scala @@ -5,11 +5,10 @@ package com.daml.telemetry import java.util.{HashMap => jHashMap, Map => jMap} -import com.daml.dec.DirectExecutionContext import io.opentelemetry.api.trace.{Span, Tracer} import io.opentelemetry.context.Context -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} trait TelemetryContext { @@ -112,7 +111,7 @@ protected class DefaultTelemetryContext(protected val tracer: Tracer, protected subSpan.end() case Success(_) => subSpan.end() - }(DirectExecutionContext) + }(ExecutionContext.parasitic) } override def runInNewSpan[T]( diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/QueueBackedTracker.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/QueueBackedTracker.scala index c4b9c705df..92f2364120 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/QueueBackedTracker.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/QueueBackedTracker.scala @@ -7,7 +7,6 @@ import akka.stream.scaladsl.{Flow, Keep, Sink} import akka.stream.{BoundedSourceQueue, Materializer, QueueOfferResult} import akka.{Done, NotUsed} import com.codahale.metrics.{Counter, Timer} -import com.daml.dec.DirectExecutionContext import com.daml.error.DamlContextualizedErrorLogger import com.daml.ledger.client.services.commands.CommandSubmission import com.daml.ledger.client.services.commands.CommandTrackerFlow.Materialized @@ -146,8 +145,8 @@ private[services] object QueueBackedTracker { errorFactories.trackerFailure(msg = promiseCancellationDescription)(errorLogger) ) ) - })(DirectExecutionContext) - }(DirectExecutionContext) + })(ExecutionContext.parasitic) + }(ExecutionContext.parasitic) new QueueBackedTracker(queue, done, errorFactories) } diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerMap.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerMap.scala index 68b01c5762..7bfc131d5c 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerMap.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerMap.scala @@ -8,7 +8,6 @@ import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicReference import akka.stream.Materializer -import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.v1.commands.Commands import com.daml.ledger.client.services.commands.CommandSubmission import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{ @@ -27,9 +26,9 @@ import scala.util.{Failure, Success} * A tracker tracker, if you will. * * @param retentionPeriod The minimum duration for which to retain ready-but-idling trackers. - * @param getKey A function to compute the tracker key from the commands. - * @param newTracker A function to construct a new tracker. - * Called when there is no tracker for the given key. + * @param getKey A function to compute the tracker key from the commands. + * @param newTracker A function to construct a new tracker. + * Called when there is no tracker for the given key. */ private[services] final class TrackerMap[Key]( retentionPeriod: Duration, @@ -149,9 +148,13 @@ private[services] object TrackerMap { } private sealed trait AsyncResourceState[+T <: AutoCloseable] + private final case object Waiting extends AsyncResourceState[Nothing] + private final case class Ready[T <: AutoCloseable](resource: T) extends AsyncResourceState[T] + private final case object Closed extends AsyncResourceState[Nothing] + private final case class Failed(exception: Throwable) extends AsyncResourceState[Nothing] /** A holder for an AutoCloseable that can be opened and closed async. @@ -170,7 +173,7 @@ private[services] object TrackerMap { state.set(Ready(resource)) case Failure(exception) => state.set(Failed(exception)) - }(DirectExecutionContext) + }(ExecutionContext.parasitic) private[TrackerMap] def currentState: AsyncResourceState[T] = state.get() @@ -188,7 +191,7 @@ private[services] object TrackerMap { case Waiting => try { Await.result( - future.transform(Success(_))(DirectExecutionContext), + future.transform(Success(_))(ExecutionContext.parasitic), 10.seconds, ) match { case Success(resource) => resource.close() diff --git a/ledger/participant-integration-api/src/main/scala/platform/index/LedgerBackedIndexService.scala b/ledger/participant-integration-api/src/main/scala/platform/index/LedgerBackedIndexService.scala index 64f4578f71..0df223598f 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/index/LedgerBackedIndexService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/index/LedgerBackedIndexService.scala @@ -6,7 +6,6 @@ package com.daml.platform.index import akka.NotUsed import akka.stream.scaladsl.Source import com.daml.daml_lf_dev.DamlLf.Archive -import com.daml.dec.{DirectExecutionContext => DEC} import com.daml.error.DamlContextualizedErrorLogger import com.daml.ledger.api.domain import com.daml.ledger.api.domain.ConfigurationEntry.Accepted @@ -48,7 +47,7 @@ import com.daml.platform.store.entries.PartyLedgerEntry import com.daml.telemetry.{SpanAttribute, Spans} import scalaz.syntax.tag.ToTagOps -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} private[platform] final class LedgerBackedIndexService( ledger: ReadOnlyLedger, @@ -287,7 +286,9 @@ private[platform] final class LedgerBackedIndexService( ): Future[Option[(LedgerOffset.Absolute, Configuration)]] = ledger .lookupLedgerConfiguration() - .map(_.map { case (offset, config) => (toAbsolute(offset), config) })(DEC) + .map( + _.map { case (offset, config) => (toAbsolute(offset), config) } + )(ExecutionContext.parasitic) /** Looks up the current configuration, if set, and continues to stream configuration changes. */ diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/BaseLedger.scala b/ledger/participant-integration-api/src/main/scala/platform/store/BaseLedger.scala index d03d399a9e..10c10b472f 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/BaseLedger.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/BaseLedger.scala @@ -6,7 +6,6 @@ package com.daml.platform.store import akka.NotUsed import akka.stream.scaladsl.Source import com.daml.daml_lf_dev.DamlLf -import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.domain import com.daml.ledger.api.domain.{ApplicationId, CommandId, LedgerId} import com.daml.ledger.api.health.HealthStatus @@ -48,8 +47,6 @@ private[platform] abstract class BaseLedger( dispatcher: Dispatcher[Offset], ) extends ReadOnlyLedger { - implicit private val DEC: ExecutionContext = DirectExecutionContext - override def currentHealth(): HealthStatus = ledgerDao.currentHealth() override def lookupKey(key: GlobalKey, forParties: Set[Ref.Party])(implicit @@ -166,9 +163,7 @@ private[platform] abstract class BaseLedger( .getLfArchive(packageId) .flatMap(archiveO => Future.fromTry(Try(archiveO.map(archive => Decode.assertDecodeArchive(archive)._2))) - )( - DEC - ) + )(ExecutionContext.parasitic) override def packageEntries(startExclusive: Offset)(implicit loggingContext: LoggingContext diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/PaginatingAsyncStream.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/PaginatingAsyncStream.scala index 0958a898de..15fdf230e6 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/PaginatingAsyncStream.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/PaginatingAsyncStream.scala @@ -5,9 +5,8 @@ package com.daml.platform.store.appendonlydao import akka.NotUsed import akka.stream.scaladsl.Source -import com.daml.dec.DirectExecutionContext -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} private[platform] object PaginatingAsyncStream { @@ -26,7 +25,7 @@ private[platform] object PaginatingAsyncStream { * This is not designed to page through results using the "seek method": * https://use-the-index-luke.com/sql/partial-results/fetch-next-page * - * @param pageSize number of items to retrieve per call + * @param pageSize number of items to retrieve per call * @param queryPage takes the offset from which to start the next page and returns that page * @tparam T the type of the items returned in each call */ @@ -39,7 +38,7 @@ private[platform] object PaginatingAsyncStream { val resultSize = result.size.toLong val newQueryOffset = if (resultSize < pageSize) None else Some(queryOffset + pageSize) Some(newQueryOffset -> result) - }(DirectExecutionContext) + }(ExecutionContext.parasitic) } .flatMapConcat(Source(_)) } @@ -57,10 +56,10 @@ private[platform] object PaginatingAsyncStream { * lookup calls. * * @param startFromOffset initial offset - * @param getOffset function that returns a position/offset from the element of type [[T]] - * @param query a function that fetches results starting from provided offset + * @param getOffset function that returns a position/offset from the element of type [[T]] + * @param query a function that fetches results starting from provided offset * @tparam Off the type of the offset - * @tparam T the type of the items returned in each call + * @tparam T the type of the items returned in each call */ def streamFrom[Off, T](startFromOffset: Off, getOffset: T => Off)( query: Off => Future[Vector[T]] @@ -73,9 +72,7 @@ private[platform] object PaginatingAsyncStream { query(offset).map { result => val nextPageOffset: Option[Off] = result.lastOption.map(getOffset) Some((nextPageOffset, result)) - }( - DirectExecutionContext - ) // run in the same thread as the query, avoid context switch for a cheap operation + }(ExecutionContext.parasitic) } .flatMapConcat(Source(_)) } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ACSReader.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ACSReader.scala index c882c9dd57..84a60e58d4 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ACSReader.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ACSReader.scala @@ -6,7 +6,6 @@ package com.daml.platform.store.appendonlydao.events import akka.NotUsed import akka.stream.scaladsl.Source import akka.stream.{BoundedSourceQueue, Materializer, QueueOfferResult} -import com.daml.dec.DirectExecutionContext import com.daml.error.definitions.LedgerApiErrors import com.daml.error.definitions.LedgerApiErrors.ParticipantBackpressure import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger} @@ -20,7 +19,7 @@ import com.daml.platform.store.utils.ConcurrencyLimiter import scala.annotation.tailrec import scala.collection.mutable -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} trait ACSReader { def acsStream( @@ -189,7 +188,7 @@ private[events] object FilterTableACSReader { work(task).map { case (result, nextTask) => queueState.finishTask(nextTask) task -> result - }(DirectExecutionContext) + }(ExecutionContext.parasitic) } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/interning/StringInterningView.scala b/ledger/participant-integration-api/src/main/scala/platform/store/interning/StringInterningView.scala index 6c3f17a26c..53297ba2ae 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/interning/StringInterningView.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/interning/StringInterningView.scala @@ -3,11 +3,10 @@ package com.daml.platform.store.interning -import com.daml.dec.DirectExecutionContext import com.daml.lf.data.Ref import com.daml.logging.LoggingContext -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} class DomainStringIterators( val parties: Iterator[String], @@ -100,7 +99,7 @@ class StringInterningView(loadPrefixedEntries: LoadStringInterningEntries) Future.unit } else { loadPrefixedEntries(raw.lastId, lastStringInterningId)(loggingContext) - .map(updateView)(DirectExecutionContext) + .map(updateView)(ExecutionContext.parasitic) } private def updateView(newEntries: Iterable[(Int, String)]): Unit = synchronized { diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/BatchingQueue.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/BatchingQueue.scala index 2a26a9f19e..5bfeb2a90d 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/BatchingQueue.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/BatchingQueue.scala @@ -7,14 +7,13 @@ import java.util.concurrent.atomic.AtomicReference import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete} import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult} -import com.daml.dec.DirectExecutionContext import com.daml.ledger.participant.state.kvutils.wire.DamlSubmissionBatch import com.daml.ledger.participant.state.v2.SubmissionResult import com.google.rpc.code.Code import com.google.rpc.status.Status -import scala.concurrent.Future import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} object BatchingQueue { type CommitBatchFunction = @@ -154,7 +153,7 @@ case class DefaultBatchingQueue( RunningBatchingQueueState.Complete, ) () - }(DirectExecutionContext) + }(ExecutionContext.parasitic) } } } diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/LedgerStateAccess.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/LedgerStateAccess.scala index 24070cfe62..3d35f19e50 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/LedgerStateAccess.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/LedgerStateAccess.scala @@ -3,7 +3,6 @@ package com.daml.ledger.validator -import com.daml.dec.DirectExecutionContext import com.daml.ledger.participant.state.kvutils.Raw import com.daml.logging.LoggingContext import com.daml.metrics.{Metrics, Timed} @@ -103,7 +102,7 @@ abstract class BatchingLedgerStateOperations[LogResult] extends LedgerStateOpera executionContext: ExecutionContext, loggingContext: LoggingContext, ): Future[Option[Raw.Envelope]] = - readState(Seq(key)).map(_.head)(DirectExecutionContext) + readState(Seq(key)).map(_.head)(ExecutionContext.parasitic) override final def writeState( key: Raw.StateKey, diff --git a/ledger/participant-state/kvutils/tools/BUILD.bazel b/ledger/participant-state/kvutils/tools/BUILD.bazel index fec9fb3e04..17e107b12e 100644 --- a/ledger/participant-state/kvutils/tools/BUILD.bazel +++ b/ledger/participant-state/kvutils/tools/BUILD.bazel @@ -41,7 +41,6 @@ da_scala_library( "//ledger/participant-integration-api", "//ledger/participant-state", "//ledger/participant-state/kvutils", - "//libs-scala/concurrent", "//libs-scala/contextualized-logging", "//libs-scala/resources", "//libs-scala/resources-akka", diff --git a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityChecker.scala b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityChecker.scala index 3c62282f98..765ee0d609 100644 --- a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityChecker.scala +++ b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityChecker.scala @@ -5,11 +5,11 @@ package com.daml.ledger.participant.state.kvutils.tools.integritycheck import java.io.PrintWriter import java.util.concurrent.{Executors, TimeUnit} + import akka.actor.ActorSystem import akka.stream.Materializer import akka.stream.scaladsl.{Sink, Source} import com.codahale.metrics.{ConsoleReporter, MetricRegistry} -import com.daml.dec.DirectExecutionContext import com.daml.ledger.participant.state.kvutils.export.{ LedgerDataImporter, ProtobufBasedLedgerDataImporter, @@ -20,13 +20,7 @@ import com.daml.lf.data.Ref import com.daml.logging.LoggingContext import com.daml.logging.LoggingContext.newLoggingContext import com.daml.metrics.Metrics -import com.daml.platform.indexer.{ - Indexer, - IndexerConfig, - IndexerStartupMode, - JdbcIndexer, - StandaloneIndexerServer, -} +import com.daml.platform.indexer._ import com.daml.platform.store.LfValueTranslationCache import scala.concurrent.duration.Duration @@ -327,7 +321,7 @@ object IntegrityChecker { case Failure(exception) => exception.printStackTrace() sys.exit(1) - }(DirectExecutionContext) + } } } diff --git a/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala b/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala index a4f12fbe45..ee20446021 100644 --- a/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala +++ b/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala @@ -8,7 +8,6 @@ import akka.stream.Materializer import com.codahale.metrics.MetricRegistry import com.daml.api.util.TimeProvider import com.daml.buildinfo.BuildInfo -import com.daml.dec.DirectExecutionContext import com.daml.error.ErrorCodesVersionSwitcher import com.daml.ledger.api.auth.interceptor.AuthorizationInterceptor import com.daml.ledger.api.auth.{AuthService, AuthServiceWildcard, Authorizer} @@ -199,7 +198,7 @@ final class SandboxServer( // Only used in testing; hopefully we can get rid of it soon. def port: Port = - Await.result(portF(DirectExecutionContext), AsyncTolerance) + Await.result(portF(ExecutionContext.parasitic), AsyncTolerance) def portF(implicit executionContext: ExecutionContext): Future[Port] = apiServer.map(_.port) @@ -493,8 +492,7 @@ final class SandboxServer( } override def close(): Unit = { - implicit val executionContext: ExecutionContext = DirectExecutionContext - Await.result(sandboxState.flatMap(_.release()), AsyncTolerance) + Await.result(sandboxState.flatMap(_.release())(ExecutionContext.parasitic), AsyncTolerance) } private def writePortFile(port: Port)(implicit executionContext: ExecutionContext): Future[Unit] = diff --git a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/sql/SqlLedger.scala b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/sql/SqlLedger.scala index c49cd6a9aa..cecd169711 100644 --- a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/sql/SqlLedger.scala +++ b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/sql/SqlLedger.scala @@ -9,7 +9,6 @@ import akka.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComplete} import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult} import com.daml.api.util.TimeProvider import com.daml.daml_lf_dev.DamlLf.Archive -import com.daml.dec.{DirectExecutionContext => DEC} import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger} import com.daml.grpc.GrpcStatus import com.daml.ledger.api.domain @@ -288,9 +287,10 @@ private[sandbox] object SqlLedger { ledgerDao .storePackageEntry(newLedgerEnd, packages, None) - .transform(_ => (), e => sys.error("Failed to copy initial packages: " + e.getMessage))( - DEC - ) + .transform( + _ => (), + e => sys.error("Failed to copy initial packages: " + e.getMessage), + )(ExecutionContext.parasitic) } else { Future.unit } @@ -391,7 +391,7 @@ private[sandbox] object SqlLedger { })(queue => Future.successful(queue.complete())) private def persistAll(queue: Queue[Offset => Future[Unit]]): Future[Unit] = { - implicit val executionContext: ExecutionContext = DEC + implicit val executionContext: ExecutionContext = ExecutionContext.parasitic val startOffset = SandboxOffset.fromOffset(dispatcher.getHead()) // This will attempt to run the SQL queries concurrently, but there is no parallelism here, // so they will still run sequentially. @@ -416,7 +416,7 @@ private[sandbox] object SqlLedger { .failed .foreach { throwable => logger.error("Persistence queue has been closed with a failure.", throwable) - }(DEC) + }(ExecutionContext.parasitic) } } @@ -512,7 +512,7 @@ private final class SqlLedger( _.map(_ => ()).recover { case NonFatal(t) => logger.error(s"Failed to persist entry with offset: ${offset.toApiString}", t) } - )(DEC) + )(ExecutionContext.parasitic) }(errorLogger) } @@ -548,7 +548,7 @@ private final class SqlLedger( "Failed to enqueue submission" )(f) Failure(protobuf.StatusProto.toStatusRuntimeException(failedStatus)) - }(DEC) + }(ExecutionContext.parasitic) override def publishPartyAllocation( submissionId: Ref.SubmissionId, @@ -570,7 +570,7 @@ private final class SqlLedger( PartyDetails(party, displayName, isLocal = true), ), ) - .map(_ => ())(DEC) + .map(_ => ())(ExecutionContext.parasitic) .recover { case t => //recovering from the failure so the persistence stream doesn't die logger.error( @@ -578,14 +578,14 @@ private final class SqlLedger( t, ) () - }(DEC) + }(ExecutionContext.parasitic) case _ => logger.warn( s"Ignoring duplicate party submission with ID $party for submissionId ${Some(submissionId)}" ) Future.unit - }(DEC) + }(ExecutionContext.parasitic) }(errorLogger) } @@ -609,12 +609,12 @@ private final class SqlLedger( PackageLedgerEntry.PackageUploadAccepted(submissionId, timeProvider.getCurrentTimestamp) ), ) - .map(_ => ())(DEC) + .map(_ => ())(ExecutionContext.parasitic) .recover { case t => //recovering from the failure so the persistence stream doesn't die logger.error(s"Failed to persist packages with offset: ${offset.toApiString}", t) () - }(DEC) + }(ExecutionContext.parasitic) }(errorLogger) } @@ -644,7 +644,7 @@ private final class SqlLedger( // database transaction. // NOTE(RA): Since the new configuration can be rejected inside storeConfigurationEntry, // we look up the current configuration again to see if it was stored successfully. - implicit val ec: ExecutionContext = DEC + implicit val ec: ExecutionContext = ExecutionContext.parasitic for { response <- ledgerDao.storeConfigurationEntry( offset, @@ -661,12 +661,12 @@ private final class SqlLedger( } storeF - .map(_ => ())(DEC) + .map(_ => ())(ExecutionContext.parasitic) .recover { case t => //recovering from the failure so the persistence stream doesn't die logger.error(s"Failed to persist configuration with offset: $offset", t) () - }(DEC) + }(ExecutionContext.parasitic) }(errorLogger) } } diff --git a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/ScenarioLoadingITDivulgence.scala b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/ScenarioLoadingITDivulgence.scala index 476bba5e43..528af67dfe 100644 --- a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/ScenarioLoadingITDivulgence.scala +++ b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/ScenarioLoadingITDivulgence.scala @@ -9,11 +9,10 @@ import com.daml.ledger.api.testing.utils.{SuiteResourceManagementAroundEach, Moc import com.daml.ledger.api.v1.active_contracts_service.ActiveContractsServiceGrpc import com.daml.ledger.api.v1.transaction_filter._ import com.daml.ledger.client.services.acs.ActiveContractSetClient -import com.daml.dec.DirectExecutionContext import com.daml.platform.sandbox.services.{SandboxFixture, TestCommands} import org.scalatest.concurrent.ScalaFutures -import org.scalatest.time.{Millis, Span} import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Millis, Span} import org.scalatest.wordspec.AnyWordSpec @SuppressWarnings(Array("org.wartremover.warts.StringPlusAny")) @@ -40,8 +39,6 @@ class ScenarioLoadingITDivulgence .getActiveContracts(transactionFilter) .runWith(Sink.seq) - implicit val ec = DirectExecutionContext - "ScenarioLoading" when { "running a divulgence scenario" should { "not fail" in { diff --git a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/command/CommandStaticTimeIT.scala b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/command/CommandStaticTimeIT.scala index 3059c9c464..6073632bc7 100644 --- a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/command/CommandStaticTimeIT.scala +++ b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/command/CommandStaticTimeIT.scala @@ -6,7 +6,6 @@ package com.daml.platform.sandbox.services.command import java.util.concurrent.atomic.AtomicInteger import com.daml.api.util.TimeProvider -import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.testing.utils.{MockMessages, SuiteResourceManagementAroundAll} import com.daml.ledger.api.v1.command_completion_service.CommandCompletionServiceGrpc import com.daml.ledger.api.v1.command_submission_service.{ @@ -27,7 +26,7 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec import scalaz.syntax.tag._ -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.util.control.NonFatal final class CommandStaticTimeIT @@ -46,10 +45,12 @@ final class CommandStaticTimeIT private lazy val unwrappedLedgerId = ledgerId().unwrap - private def createCommandClient(): Future[CommandClient] = + private def createCommandClient()(implicit + executionContext: ExecutionContext + ): Future[CommandClient] = StaticTime .updatedVia(TimeServiceGrpc.stub(channel), unwrappedLedgerId) - .recover { case NonFatal(_) => TimeProvider.UTC }(DirectExecutionContext) + .recover { case NonFatal(_) => TimeProvider.UTC } .map(_ => new CommandClient( CommandSubmissionServiceGrpc.stub(channel), @@ -62,7 +63,7 @@ final class CommandStaticTimeIT defaultDeduplicationTime = java.time.Duration.ofSeconds(30), ), ) - )(DirectExecutionContext) + ) private lazy val submitRequest: SubmitRequest = MockMessages.submitRequest.update( diff --git a/ledger/sandbox-common/BUILD.bazel b/ledger/sandbox-common/BUILD.bazel index df825bcec0..56fb977c8f 100644 --- a/ledger/sandbox-common/BUILD.bazel +++ b/ledger/sandbox-common/BUILD.bazel @@ -42,7 +42,6 @@ da_scala_library( "//ledger/metrics", "//ledger/participant-integration-api", "//libs-scala/build-info", - "//libs-scala/concurrent", "//libs-scala/contextualized-logging", "//libs-scala/logging-entries", "//libs-scala/ports", diff --git a/ledger/sandbox-common/src/main/scala/platform/sandbox/services/SandboxResetService.scala b/ledger/sandbox-common/src/main/scala/platform/sandbox/services/SandboxResetService.scala index c788345649..23cd06e3e4 100644 --- a/ledger/sandbox-common/src/main/scala/platform/sandbox/services/SandboxResetService.scala +++ b/ledger/sandbox-common/src/main/scala/platform/sandbox/services/SandboxResetService.scala @@ -4,7 +4,7 @@ package com.daml.platform.sandbox.services import java.util.concurrent.atomic.AtomicBoolean -import com.daml.dec.{DirectExecutionContext => DE} + import com.daml.error.DamlContextualizedErrorLogger import com.daml.ledger.api.auth.Authorizer import com.daml.ledger.api.domain.LedgerId @@ -15,7 +15,7 @@ import com.google.protobuf.empty.Empty import io.grpc.ServerCall.Listener import io.grpc._ -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} class SandboxResetService( ledgerId: LedgerId, @@ -34,7 +34,7 @@ class SandboxResetService( private val resetInitialized = new AtomicBoolean(false) override def bindService(): ServerServiceDefinition = - ResetServiceGrpc.bindService(this, DE) + ResetServiceGrpc.bindService(this, ExecutionContext.parasitic) override def reset(request: ResetRequest): Future[Empty] = authorizer.requireAdminClaims(doReset)(request) @@ -65,7 +65,10 @@ class SandboxResetService( request.ledgerId, errorFactories.ledgerIdMismatch(ledgerId, LedgerId(request.ledgerId), None), ) - .fold(Future.failed[Empty], _ => actuallyReset().map(_ => Empty())(DE)) + .fold( + Future.failed[Empty], + _ => actuallyReset().map(_ => Empty())(ExecutionContext.parasitic), + ) private def actuallyReset() = { logger.info("Initiating server reset.") diff --git a/ledger/sandbox-perf/BUILD.bazel b/ledger/sandbox-perf/BUILD.bazel index bff33ec1d0..0af114c329 100644 --- a/ledger/sandbox-perf/BUILD.bazel +++ b/ledger/sandbox-perf/BUILD.bazel @@ -41,7 +41,6 @@ da_scala_library( "//ledger/sandbox-common:sandbox-common-scala-tests-lib", "//ledger/test-common", "//ledger/test-common:dar-files-default-lib", - "//libs-scala/concurrent", "//libs-scala/ports", "//libs-scala/postgresql-testing", "//libs-scala/resources", diff --git a/ledger/sandbox-perf/src/perf/lib/scala/com/digitalasset/platform/sandbox/perf/TestHelper.scala b/ledger/sandbox-perf/src/perf/lib/scala/com/digitalasset/platform/sandbox/perf/TestHelper.scala index 513dcffd26..d27460de95 100644 --- a/ledger/sandbox-perf/src/perf/lib/scala/com/digitalasset/platform/sandbox/perf/TestHelper.scala +++ b/ledger/sandbox-perf/src/perf/lib/scala/com/digitalasset/platform/sandbox/perf/TestHelper.scala @@ -7,7 +7,6 @@ import java.io.File import java.util.UUID import akka.stream.scaladsl.{Sink, Source} -import com.daml.lf.data.Ref.PackageId import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest import com.daml.ledger.api.v1.commands.{Command, Commands} @@ -15,11 +14,11 @@ import com.daml.ledger.api.v1.event.CreatedEvent import com.daml.ledger.api.v1.transaction_filter.{Filters, TransactionFilter} import com.daml.ledger.api.v1.value.{Identifier, Value} import com.daml.ledger.client.services.acs.ActiveContractSetClient -import com.daml.dec.DirectExecutionContext +import com.daml.lf.data.Ref.PackageId import com.daml.platform.sandbox.perf.util.DarUtil -import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} trait TestHelper { @@ -31,12 +30,11 @@ trait TestHelper { val applicationId: String = "app1" val party = "party" - val rangeOfIntsTemplateId = - Identifier( - packageId = largeTxPackageId, - moduleName = "LargeTransaction", - entityName = "RangeOfInts", - ) + val rangeOfIntsTemplateId = Identifier( + packageId = largeTxPackageId, + moduleName = "LargeTransaction", + entityName = "RangeOfInts", + ) val listUtilTemplateId = Identifier( packageId = largeTxPackageId, @@ -67,7 +65,7 @@ trait TestHelper { applicationId = applicationId, commandId = commandId, party = party, - commands = Seq(Command(command)), + commands = Seq(Command.of(command)), ) SubmitAndWaitRequest(Some(commands)) } @@ -115,7 +113,7 @@ trait TestHelper { workflowId: String = "", ): Future[Unit] = { val request: SubmitAndWaitRequest = submitAndWaitRequest(command, commandId, workflowId) - state.ledger.commandService.submitAndWait(request).map(_ => ())(DirectExecutionContext) + state.ledger.commandService.submitAndWait(request).map(_ => ())(ExecutionContext.parasitic) } def activeContractIds( diff --git a/libs-scala/concurrent/BUILD.bazel b/libs-scala/concurrent/BUILD.bazel index 0504aba0ed..5104102098 100644 --- a/libs-scala/concurrent/BUILD.bazel +++ b/libs-scala/concurrent/BUILD.bazel @@ -27,9 +27,6 @@ da_scala_library( visibility = [ "//visibility:public", ], - deps = [ - "@maven//:org_slf4j_slf4j_api", - ], ) da_scala_test( diff --git a/libs-scala/concurrent/src/main/scala/concurrent/package.scala b/libs-scala/concurrent/src/main/scala/concurrent/package.scala index 011175d9d9..be807ac984 100644 --- a/libs-scala/concurrent/src/main/scala/concurrent/package.scala +++ b/libs-scala/concurrent/src/main/scala/concurrent/package.scala @@ -21,9 +21,9 @@ import scala.{concurrent => sc} * declare which ExecutionContext their operations are in. * * For Scala 2.12, you must pass `-Xsource:2.13` to scalac for methods and - * conversions to be automatically found. You must also `import - * scalaz.syntax.bind._` or similar for Future methods like `map`, `flatMap`, - * and so on. + * conversions to be automatically found. You must also + * `import scalaz.syntax.bind._` or similar for Future methods like `map`, + * `flatMap`, and so on. * * There are no constraints on the `EC` type variable; you need only declare * types you wish to use for it that are sufficient for describing the domains @@ -111,5 +111,8 @@ package concurrent { def fromExecutorService[EC](e: ExecutorService): ExecutionContext[EC] = apply(sc.ExecutionContext.fromExecutorService(e)) + + val parasitic: ExecutionContext[Nothing] = + ExecutionContext(sc.ExecutionContext.parasitic) } } diff --git a/libs-scala/concurrent/src/main/scala/dec/DirectExecutionContextInternal.scala b/libs-scala/concurrent/src/main/scala/dec/DirectExecutionContextInternal.scala deleted file mode 100644 index 2c24688a2a..0000000000 --- a/libs-scala/concurrent/src/main/scala/dec/DirectExecutionContextInternal.scala +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.dec - -import org.slf4j.LoggerFactory - -import scala.concurrent.ExecutionContext - -// Starting from Scala 2.13 this can deleted and replaced by `parasitic` -private[dec] object DirectExecutionContextInternal extends ExecutionContext { - - private[this] val logger = LoggerFactory.getLogger(this.getClass) - - override final def execute(runnable: Runnable): Unit = - runnable.run() - - override final def reportFailure(cause: Throwable): Unit = - logger.error("Unhandled exception", cause) - -} diff --git a/libs-scala/concurrent/src/main/scala/dec/package.scala b/libs-scala/concurrent/src/main/scala/dec/package.scala deleted file mode 100644 index af96efd876..0000000000 --- a/libs-scala/concurrent/src/main/scala/dec/package.scala +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml - -import com.daml.concurrent.ExecutionContext - -package object dec { - - // Starting from Scala 2.13 this can deleted and replaced by `parasitic` - val DirectExecutionContext: ExecutionContext[Nothing] = - ExecutionContext(DirectExecutionContextInternal) - -} diff --git a/triggers/service/BUILD.bazel b/triggers/service/BUILD.bazel index e12855e4d1..461e842f8b 100644 --- a/triggers/service/BUILD.bazel +++ b/triggers/service/BUILD.bazel @@ -90,8 +90,6 @@ scala_binary_deps = [ binary_deps = [ ":trigger-service", - "//libs-scala/ports", - "//libs-scala/concurrent", "//daml-lf/archive:daml_lf_archive_reader", "//daml-lf/archive:daml_lf_1.dev_archive_proto_java", "//daml-lf/data", @@ -100,6 +98,7 @@ binary_deps = [ "//ledger/ledger-api-common", "//libs-scala/contextualized-logging", "//libs-scala/db-utils", + "//libs-scala/ports", "//libs-scala/scala-utils", "//triggers/service/auth:middleware-api", "@maven//:org_slf4j_slf4j_api", diff --git a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/ServiceMain.scala b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/ServiceMain.scala index 22f27c2de4..4167cbc2a1 100644 --- a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/ServiceMain.scala +++ b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/ServiceMain.scala @@ -5,14 +5,13 @@ package com.daml.lf.engine.trigger import java.util.UUID -import akka.actor.typed.{ActorRef, ActorSystem, Scheduler} import akka.actor.typed.scaladsl.AskPattern._ +import akka.actor.typed.{ActorRef, ActorSystem, Scheduler} import akka.http.scaladsl.Http.ServerBinding import akka.http.scaladsl.model.Uri import akka.util.Timeout import com.daml.auth.middleware.api.{Client => AuthClient} import com.daml.daml_lf_dev.DamlLf -import com.daml.dec.DirectExecutionContext import com.daml.dbutils.JdbcConfig import com.daml.lf.archive.{Dar, DarReader} import com.daml.lf.data.Ref.PackageId @@ -20,16 +19,16 @@ import com.daml.lf.engine.trigger.dao.DbTriggerDao import com.daml.lf.speedy.Compiler import com.daml.logging.ContextualizedLogger import com.daml.ports.{Port, PortFiles} -import com.daml.scalautil.Statement.discard import com.daml.runtime.JdbcDrivers +import com.daml.scalautil.Statement.discard +import scalaz.std.either._ +import scalaz.std.list._ +import scalaz.syntax.traverse._ -import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} import scala.sys.ShutdownHookThread import scala.util.{Failure, Success, Try} -import scalaz.syntax.traverse._ -import scalaz.std.list._ -import scalaz.std.either._ object ServiceMain { @@ -135,8 +134,8 @@ object ServiceMain { case Some(c) => Try( Await.result( - DbTriggerDao(c)(DirectExecutionContext) - .initialize(config.allowExistingSchema)(DirectExecutionContext), + DbTriggerDao(c)(ExecutionContext.parasitic) + .initialize(config.allowExistingSchema)(ExecutionContext.parasitic), Duration(30, SECONDS), ) ) match {