Edits are processed in the order of submission (#8787)

This is a quick fix to a long standing problem of
`org.enso.interpreter.service.error.FailedToApplyEditsException` which would prevent backend from processing any more changes, rendering GUI (and backend) virtually useless.
Edits are submitted for (background) processing in the order they are handled. However the order of execution of such tasks is not guaranteed. Most of the time edits are processed in the same order as their requests but when they don't, files get quickly out of sync.

Related to #8770.

# Important Notes
I'm not a fan of this change because it essentially blocks all open/file requests until all edits are processed and we already have logic to deal with that appropriately. Moreover those tasks can and should be processed independently. Since we already had the single thread executor present to ensure correct synchronization of open/file/push commands, we are simply adding edit commands to the list.

Ideally we want to have a specialized executor that executes tasks within the same group sequentially but groups of tasks can be executed in parallel, thus ensuring sufficient throughput. The latter will take much longer and will require significant rewrite of the command execution.

Added tests that would previously fail due to non-deterministic execution.
This commit is contained in:
Hubert Plociniczak 2024-01-23 00:05:41 +01:00 committed by GitHub
parent 5f1333a519
commit dfe867a9cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 446 additions and 84 deletions

View File

@ -1707,7 +1707,12 @@ lazy val runtime = (project in file("engine/runtime"))
(LocalProject(
"runtime-compiler"
) / Compile / productDirectories).value ++
(LocalProject("refactoring-utils") / Compile / productDirectories).value
(LocalProject(
"refactoring-utils"
) / Compile / productDirectories).value ++
(LocalProject(
"runtime-instrument-common"
) / Test / productDirectories).value
// Patch test-classes into the runtime module. This is standard way to deal with the
// split package problem in unit tests. For example, Maven's surefire plugin does this.
val testClassesDir = (Test / productDirectories).value.head
@ -1727,7 +1732,8 @@ lazy val runtime = (project in file("engine/runtime"))
runtimeModName -> Seq(
"ALL-UNNAMED",
testInstrumentsModName,
"truffle.tck.tests"
"truffle.tck.tests",
"org.openide.util.lookup.RELEASE180"
),
testInstrumentsModName -> Seq(runtimeModName)
)
@ -1926,7 +1932,7 @@ lazy val `runtime-instrument-runtime-server` =
instrumentationSettings
)
.dependsOn(LocalProject("runtime"))
.dependsOn(`runtime-instrument-common`)
.dependsOn(`runtime-instrument-common` % "test->test;compile->compile")
/** A "meta" project that exists solely to provide logic for assembling the `runtime.jar` fat Jar.
* We do not want to put this task into any other existing project, as it internally copies some

View File

@ -53,6 +53,16 @@ public class RuntimeOptions {
INTERPRETER_SEQUENTIAL_COMMAND_EXECUTION)
.build();
public static final String INTERPRETER_RANDOM_DELAYED_COMMAND_EXECUTION =
interpreterOptionName("randomDelayedCommandExecution");
public static final OptionKey<Boolean> INTERPRETER_RANDOM_DELAYED_COMMAND_EXECUTION_KEY =
new OptionKey<>(false);
public static final OptionDescriptor INTERPRETER_RANDOM_DELAYED_COMMAND_EXECUTION_DESCRIPTOR =
OptionDescriptor.newBuilder(
INTERPRETER_RANDOM_DELAYED_COMMAND_EXECUTION_KEY,
INTERPRETER_RANDOM_DELAYED_COMMAND_EXECUTION)
.build();
public static final String JOB_PARALLELISM = interpreterOptionName("jobParallelism");
public static final OptionKey<Integer> JOB_PARALLELISM_KEY = new OptionKey<>(1);
public static final OptionDescriptor JOB_PARALLELISM_DESCRIPTOR =
@ -138,6 +148,7 @@ public class RuntimeOptions {
LANGUAGE_HOME_OVERRIDE_DESCRIPTOR,
EDITION_OVERRIDE_DESCRIPTOR,
INTERPRETER_SEQUENTIAL_COMMAND_EXECUTION_DESCRIPTOR,
INTERPRETER_RANDOM_DELAYED_COMMAND_EXECUTION_DESCRIPTOR,
JOB_PARALLELISM_DESCRIPTOR,
DISABLE_IR_CACHES_DESCRIPTOR,
PREINITIALIZE_DESCRIPTOR,

View File

@ -10,6 +10,7 @@ open module org.enso.runtime {
requires static org.slf4j;
uses org.slf4j.spi.SLF4JServiceProvider;
uses org.enso.interpreter.instrument.HandlerFactory;
provides com.oracle.truffle.api.provider.TruffleLanguageProvider with
org.enso.interpreter.EnsoLanguageProvider,

View File

@ -0,0 +1,5 @@
package org.enso.interpreter.instrument;
public abstract class HandlerFactory {
public abstract Handler create();
}

View File

@ -0,0 +1,69 @@
package org.enso.interpreter.instrument
import org.enso.lockmanager.client.{
RuntimeServerConnectionEndpoint,
RuntimeServerRequestHandler
}
import org.enso.polyglot.runtime.Runtime.{Api, ApiRequest, ApiResponse}
import org.graalvm.polyglot.io.MessageEndpoint
import java.nio.ByteBuffer
import scala.concurrent.Future
/** A message endpoint implementation. */
class Endpoint(handler: Handler)
extends MessageEndpoint
with RuntimeServerConnectionEndpoint {
/** A helper endpoint that is used for handling requests sent to the Language
* Server.
*/
private val reverseRequestEndpoint = new RuntimeServerRequestHandler {
override def sendToClient(request: Api.Request): Unit =
client.sendBinary(Api.serialize(request))
}
var client: MessageEndpoint = _
/** Sets the client end of the connection, after it has been established.
*
* @param ep the client endpoint.
*/
def setClient(ep: MessageEndpoint): Unit = client = ep
/** Sends a response to the connected client.
*
* @param msg the message to send.
*/
def sendToClient(msg: Api.Response): Unit =
client.sendBinary(Api.serialize(msg))
/** Sends a notification to the runtime.
*
* Can be used to start a command processing in the background.
*
* @param msg the message to send.
*/
def sendToSelf(msg: Api.Request): Unit =
handler.onMessage(msg)
/** Sends a request to the connected client and expects a reply. */
override def sendRequest(msg: ApiRequest): Future[ApiResponse] =
reverseRequestEndpoint.sendRequest(msg)
override def sendText(text: String): Unit = {}
override def sendBinary(data: ByteBuffer): Unit =
Api.deserializeApiEnvelope(data).foreach {
case request: Api.Request =>
handler.onMessage(request)
case response: Api.Response =>
reverseRequestEndpoint.onResponseReceived(response)
}
override def sendPing(data: ByteBuffer): Unit = client.sendPong(data)
override def sendPong(data: ByteBuffer): Unit = {}
override def sendClose(): Unit = {}
}

View File

@ -1,84 +1,21 @@
package org.enso.interpreter.instrument
import com.oracle.truffle.api.TruffleContext
import org.enso.interpreter.instrument.command.CommandFactory
import org.enso.interpreter.instrument.command.{
CommandFactory,
CommandFactoryImpl
}
import org.enso.interpreter.instrument.execution.{
CommandExecutionEngine,
CommandProcessor
}
import org.enso.interpreter.service.ExecutionService
import org.enso.lockmanager.client.{
RuntimeServerConnectionEndpoint,
RuntimeServerRequestHandler
}
import org.enso.polyglot.runtime.Runtime.{Api, ApiRequest, ApiResponse}
import org.graalvm.polyglot.io.MessageEndpoint
import java.nio.ByteBuffer
import scala.concurrent.Future
/** A message endpoint implementation. */
class Endpoint(handler: Handler)
extends MessageEndpoint
with RuntimeServerConnectionEndpoint {
/** A helper endpoint that is used for handling requests sent to the Language
* Server.
*/
private val reverseRequestEndpoint = new RuntimeServerRequestHandler {
override def sendToClient(request: Api.Request): Unit =
client.sendBinary(Api.serialize(request))
}
var client: MessageEndpoint = _
/** Sets the client end of the connection, after it has been established.
*
* @param ep the client endpoint.
*/
def setClient(ep: MessageEndpoint): Unit = client = ep
/** Sends a response to the connected client.
*
* @param msg the message to send.
*/
def sendToClient(msg: Api.Response): Unit =
client.sendBinary(Api.serialize(msg))
/** Sends a notification to the runtime.
*
* Can be used to start a command processing in the background.
*
* @param msg the message to send.
*/
def sendToSelf(msg: Api.Request): Unit =
handler.onMessage(msg)
/** Sends a request to the connected client and expects a reply. */
override def sendRequest(msg: ApiRequest): Future[ApiResponse] =
reverseRequestEndpoint.sendRequest(msg)
override def sendText(text: String): Unit = {}
override def sendBinary(data: ByteBuffer): Unit =
Api.deserializeApiEnvelope(data).foreach {
case request: Api.Request =>
handler.onMessage(request)
case response: Api.Response =>
reverseRequestEndpoint.onResponseReceived(response)
}
override def sendPing(data: ByteBuffer): Unit = client.sendPong(data)
override def sendPong(data: ByteBuffer): Unit = {}
override def sendClose(): Unit = {}
}
import org.enso.polyglot.runtime.Runtime.Api
/** A message handler, dispatching behaviors based on messages received
* from an instance of [[Endpoint]].
*/
final class Handler {
abstract class Handler {
val endpoint = new Endpoint(this)
val contextManager = new ExecutionContextManager
@ -136,7 +73,7 @@ final class Handler {
case request: Api.Request =>
if (localCtx != null) {
val cmd = CommandFactory.createCommand(request)
val cmd = cmdFactory.createCommand(request)
localCtx.commandProcessor.invoke(cmd)
} else {
throw new IllegalStateException(
@ -145,4 +82,10 @@ final class Handler {
}
}
}
def cmdFactory: CommandFactory
}
private class HandlerImpl extends Handler {
override def cmdFactory: CommandFactory = CommandFactoryImpl
}

View File

@ -0,0 +1,5 @@
package org.enso.interpreter.instrument
private object HandlerFactoryImpl extends HandlerFactory {
override def create(): Handler = new HandlerImpl()
}

View File

@ -27,7 +27,7 @@ class AttachVisualizationCmd(
ctx.executionService.getLogger.log(
Level.FINE,
"Attach visualization cmd for request id [{0}] and visualization id [{1}]",
Array(maybeRequestId, request.visualizationId)
Array[Object](maybeRequestId.toString, request.visualizationId)
)
ctx.endpoint.sendToClient(
Api.Response(maybeRequestId, Api.VisualizationAttached())

View File

@ -4,7 +4,7 @@ import org.enso.polyglot.runtime.Runtime.Api
/** A factory that creates a command for an API request.
*/
object CommandFactory {
class CommandFactory {
/** Creates a command that encapsulates a function request as an object.
*
@ -99,3 +99,5 @@ object CommandFactory {
}
}
object CommandFactoryImpl extends CommandFactory

View File

@ -6,23 +6,23 @@ import org.enso.interpreter.instrument.job.{EnsureCompiledJob, ExecuteJob}
import org.enso.polyglot.runtime.Runtime.Api
import java.util.logging.Level
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.ExecutionContext
/** A command that performs edition of a file.
*
* @param request a request for editing
*/
class EditFileCmd(request: Api.EditFileNotification)
extends AsynchronousCommand(None) {
extends SynchronousCommand(None) {
/** Executes a request.
*
* @param ctx contains suppliers of services to perform a request
*/
override def executeAsynchronously(implicit
override def executeSynchronously(implicit
ctx: RuntimeContext,
ec: ExecutionContext
): Future[Unit] = {
): Unit = {
val logger = ctx.executionService.getLogger
val fileLockTimestamp = ctx.locking.acquireFileLock(request.path)
val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock()
@ -40,7 +40,6 @@ class EditFileCmd(request: Api.EditFileNotification)
ctx.jobProcessor.run(new EnsureCompiledJob(Seq(request.path)))
executeJobs.foreach(ctx.jobProcessor.run)
}
Future.successful(())
} finally {
ctx.locking.releasePendingEditsLock()
logger.log(

View File

@ -0,0 +1,9 @@
package org.enso.interpreter.instrument;
@org.openide.util.lookup.ServiceProvider(service = HandlerFactory.class)
public class HandlerFactoryMock extends HandlerFactory {
@Override
public Handler create() {
return new MockHandler();
}
}

View File

@ -0,0 +1,14 @@
package org.enso.interpreter.instrument;
import org.enso.interpreter.instrument.command.CommandFactory;
import org.enso.interpreter.instrument.command.MockedCommandFactory$;
public class MockHandler extends Handler {
private CommandFactory _cmdFactory = MockedCommandFactory$.MODULE$;
@Override
public CommandFactory cmdFactory() {
return _cmdFactory;
}
}

View File

@ -0,0 +1,19 @@
package org.enso.interpreter.instrument.command
import org.enso.polyglot.runtime.Runtime.Api
object MockedCommandFactory extends CommandFactory {
private var editRequestCounter = 0
override def createCommand(request: Api.Request): Command = {
request.payload match {
case payload: Api.EditFileNotification =>
val cmd = new SlowEditFileCmd(payload, editRequestCounter)
editRequestCounter += 1
cmd
case _ =>
super.createCommand(request)
}
}
}

View File

@ -0,0 +1,26 @@
package org.enso.interpreter.instrument.command
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.polyglot.runtime.Runtime.Api
import scala.concurrent.ExecutionContext
class SlowEditFileCmd(request: Api.EditFileNotification, counter: Int)
extends EditFileCmd(request) {
override def executeSynchronously(implicit
ctx: RuntimeContext,
ec: ExecutionContext
): Unit = {
if (
ctx.executionService.getContext.isRandomDelayedCommandExecution && counter % 2 == 0
) {
try {
Thread.sleep(2000)
} catch {
case _: InterruptedException =>
}
}
super.executeSynchronously(ctx, ec)
}
}

View File

@ -1,6 +1,7 @@
package org.enso.interpreter.instrument;
import com.oracle.truffle.api.TruffleContext;
import com.oracle.truffle.api.TruffleOptions;
import com.oracle.truffle.api.instrumentation.ContextsListener;
import com.oracle.truffle.api.instrumentation.EventBinding;
import com.oracle.truffle.api.instrumentation.TruffleInstrument;
@ -19,6 +20,7 @@ import org.graalvm.options.OptionDescriptor;
import org.graalvm.options.OptionDescriptors;
import org.graalvm.polyglot.io.MessageEndpoint;
import org.graalvm.polyglot.io.MessageTransport;
import org.openide.util.Lookup;
/**
* An instrument exposing a server for other services to connect to, in order to control the current
@ -102,14 +104,18 @@ public class RuntimeServerInstrument extends TruffleInstrument {
protected void onCreate(Env env) {
this.env = env;
env.registerService(this);
Handler handler = new Handler();
this.handler = handler;
if (TruffleOptions.AOT) {
this.handler = HandlerFactoryImpl.create();
} else {
var loadedHandler = Lookup.getDefault().lookup(HandlerFactory.class);
this.handler = loadedHandler != null ? loadedHandler.create() : HandlerFactoryImpl.create();
}
try {
MessageEndpoint client =
env.startServer(URI.create(RuntimeServerInfo.URI), handler.endpoint());
env.startServer(URI.create(RuntimeServerInfo.URI), this.handler.endpoint());
if (client != null) {
handler.endpoint().setClient(client);
this.handler.endpoint().setClient(client);
} else {
env.getLogger(RuntimeServerInstrument.class)
.warning(

View File

@ -665,6 +665,15 @@ public final class EnsoContext {
return getOption(RuntimeOptions.INTERPRETER_SEQUENTIAL_COMMAND_EXECUTION_KEY);
}
/**
* Checks value of {@link RuntimeOptions#INTERPRETER_RANDOM_DELAYED_COMMAND_EXECUTION_KEY}.
*
* @return the value of the option
*/
public boolean isRandomDelayedCommandExecution() {
return getOption(RuntimeOptions.INTERPRETER_RANDOM_DELAYED_COMMAND_EXECUTION_KEY);
}
/**
* Checks whether the suggestion indexing is enabled for external libraries.
*

View File

@ -0,0 +1,238 @@
package org.enso.interpreter.test.instrument
import org.enso.interpreter.runtime.`type`.ConstantsGen
import org.enso.polyglot.{
LanguageInfo,
RuntimeOptions,
RuntimeServerInfo,
Suggestion
}
import org.enso.polyglot.data.Tree
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.text.editing.model
import org.enso.text.editing.model.TextEdit
import org.graalvm.polyglot.Context
import org.scalatest.BeforeAndAfterEach
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import java.io.{ByteArrayOutputStream, File}
import java.nio.file.{Files, Paths}
import java.util.UUID
import java.util.logging.Level
@scala.annotation.nowarn("msg=multiarg infix syntax")
class RuntimeTextEditsTest
extends AnyFlatSpec
with Matchers
with BeforeAndAfterEach {
var context: TestContext = _
class TestContext(packageName: String)
extends InstrumentTestContext(packageName) {
val out: ByteArrayOutputStream = new ByteArrayOutputStream()
val context =
Context
.newBuilder(LanguageInfo.ID)
.allowExperimentalOptions(true)
.allowAllAccess(true)
.option(RuntimeOptions.PROJECT_ROOT, pkg.root.getAbsolutePath)
.option(RuntimeOptions.LOG_LEVEL, Level.WARNING.getName)
.option(
RuntimeOptions.INTERPRETER_SEQUENTIAL_COMMAND_EXECUTION,
"false"
)
.option(
RuntimeOptions.INTERPRETER_RANDOM_DELAYED_COMMAND_EXECUTION,
"true"
)
.option(RuntimeOptions.ENABLE_GLOBAL_SUGGESTIONS, "false")
.option(
RuntimeOptions.DISABLE_IR_CACHES,
InstrumentTestContext.DISABLE_IR_CACHE
)
.option(RuntimeServerInfo.ENABLE_OPTION, "true")
.option(RuntimeOptions.INTERACTIVE_MODE, "true")
.option(
RuntimeOptions.LANGUAGE_HOME_OVERRIDE,
Paths
.get("../../test/micro-distribution/component")
.toFile
.getAbsolutePath
)
.option(RuntimeOptions.EDITION_OVERRIDE, "0.0.0-dev")
.out(out)
.logHandler(System.err)
.serverTransport(runtimeServerEmulator.makeServerTransport)
.build()
def writeMain(contents: String): File =
Files.write(pkg.mainFile.toPath, contents.getBytes).toFile
def writeFile(file: File, contents: String): File =
Files.write(file.toPath, contents.getBytes).toFile
def writeInSrcDir(moduleName: String, contents: String): File = {
val file = new File(pkg.sourceDir, s"$moduleName.enso")
Files.write(file.toPath, contents.getBytes).toFile
}
def send(msg: Api.Request): Unit = runtimeServerEmulator.sendToRuntime(msg)
def consumeOut: List[String] = {
val result = out.toString
out.reset()
result.linesIterator.toList
}
def executionComplete(contextId: UUID): Api.Response =
Api.Response(Api.ExecutionComplete(contextId))
}
override protected def beforeEach(): Unit = {
context = new TestContext("Test")
context.init()
val Some(Api.Response(_, Api.InitializedNotification())) = context.receive
context.send(
Api.Request(UUID.randomUUID(), Api.StartBackgroundProcessing())
)
context.receive shouldEqual Some(
Api.Response(Api.BackgroundJobsStartedNotification())
)
}
override protected def afterEach(): Unit = {
if (context != null) {
context.close()
context.out.reset()
context = null
}
}
it should "send accept multiple file modification" in {
val contextId = UUID.randomUUID()
val requestId = UUID.randomUUID()
val moduleName = "Enso_Test.Test.Main"
val code =
"""from Standard.Base import all
|
|main = IO.println "Hello World!"
|""".stripMargin.linesIterator.mkString("\n")
val mainFile = context.writeMain(code)
// create context
context.send(Api.Request(requestId, Api.CreateContextRequest(contextId)))
context.receive shouldEqual Some(
Api.Response(requestId, Api.CreateContextResponse(contextId))
)
// open file
context.send(
Api.Request(requestId, Api.OpenFileRequest(mainFile, code))
)
context.receive shouldEqual Some(
Api.Response(Some(requestId), Api.OpenFileResponse)
)
// push main
context.send(
Api.Request(
requestId,
Api.PushContextRequest(
contextId,
Api.StackItem.ExplicitCall(
Api.MethodPointer(moduleName, "Enso_Test.Test.Main", "main"),
None,
Vector()
)
)
)
)
context.receiveNIgnoreExpressionUpdates(
3
) should contain theSameElementsAs Seq(
Api.Response(requestId, Api.PushContextResponse(contextId)),
Api.Response(
Api.SuggestionsDatabaseModuleUpdateNotification(
module = moduleName,
actions = Vector(Api.SuggestionsDatabaseAction.Clean(moduleName)),
exports = Vector(),
updates = Tree.Root(
Vector(
Tree.Node(
Api.SuggestionUpdate(
Suggestion.Module(
moduleName,
None
),
Api.SuggestionAction.Add()
),
Vector()
),
Tree.Node(
Api.SuggestionUpdate(
Suggestion.DefinedMethod(
None,
moduleName,
"main",
List(),
"Enso_Test.Test.Main",
ConstantsGen.ANY,
true,
None,
Seq()
),
Api.SuggestionAction.Add()
),
Vector()
)
)
)
)
),
context.executionComplete(contextId)
)
context.consumeOut shouldEqual List("Hello World!")
// Modify the file
context.send(
Api.Request(
Api.EditFileNotification(
mainFile,
Seq(
TextEdit(
model.Range(model.Position(2, 19), model.Position(2, 31)),
"Meh"
)
),
execute = false
)
)
)
// Modify the file
context.send(
Api.Request(
Api.EditFileNotification(
mainFile,
Seq(
TextEdit(
model.Range(model.Position(2, 19), model.Position(2, 22)),
"Welcome!"
)
),
execute = true
)
)
)
context.receiveNIgnoreExpressionUpdates(
1
) should contain theSameElementsAs Seq(
context.executionComplete(contextId)
)
context.consumeOut shouldEqual List("Welcome!")
}
}