diff --git a/engine/runtime-integration-tests/src/test/java/org/enso/interpreter/test/instrument/RuntimeManagementCleanupTest.java b/engine/runtime-integration-tests/src/test/java/org/enso/interpreter/test/instrument/RuntimeManagementCleanupTest.java new file mode 100644 index 0000000000..25e31b1155 --- /dev/null +++ b/engine/runtime-integration-tests/src/test/java/org/enso/interpreter/test/instrument/RuntimeManagementCleanupTest.java @@ -0,0 +1,35 @@ +package org.enso.interpreter.test.instrument; + +import static org.junit.Assert.fail; + +import java.io.OutputStream; +import java.io.PrintStream; +import org.junit.Test; + +public final class RuntimeManagementCleanupTest { + + @Test + public void cleanUp() throws InterruptedException { + var cnt = 0; + for (var i = 1; i < 100; i++) { + cnt = 0; + var err = i % 10 == 0 ? System.err : new PrintStream(OutputStream.nullOutputStream()); + for (var threadStack : Thread.getAllStackTraces().entrySet()) { + if (threadStack.getKey().getName().startsWith("Thread-")) { + err.println(threadStack.getKey().getName()); + for (var frame : threadStack.getValue()) { + err.println(" " + frame); + } + cnt++; + } + } + err.println("Found " + cnt + " suspicious threads"); + if (cnt == 0) { + return; + } + System.gc(); + Thread.sleep(100); + } + fail("There are still " + cnt + " suspicious threads"); + } +} diff --git a/engine/runtime/src/main/java/org/enso/interpreter/runtime/ResourceManager.java b/engine/runtime/src/main/java/org/enso/interpreter/runtime/ResourceManager.java index 5a9bbaad62..b8295c6b94 100644 --- a/engine/runtime/src/main/java/org/enso/interpreter/runtime/ResourceManager.java +++ b/engine/runtime/src/main/java/org/enso/interpreter/runtime/ResourceManager.java @@ -7,23 +7,49 @@ import java.lang.ref.PhantomReference; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; import java.util.ArrayList; +import java.util.Collection; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import org.enso.interpreter.runtime.data.ManagedResource; /** Allows the context to attach garbage collection hooks on the removal of certain objects. */ public final class ResourceManager { - private final EnsoContext context; - private volatile boolean isClosed = false; - private volatile Thread workerThread; - private final ProcessItems worker = new ProcessItems(); + /** Amount of milliseconds to wait for another resource when none is pending. */ + private static final long KEEP_ALIVE = 1000; + + /** Queue with resources eligible for finalization */ private final ReferenceQueue referenceQueue = new ReferenceQueue<>(); - private final ConcurrentMap, Item> items = - new ConcurrentHashMap<>(); + + /** + * All the items that were issued, but haven't yet arrived at {@link #referenceQueue} for + * finalization. + * + *

@GuardedBy("this") + */ + private final List pendingItems = new ArrayList<>(); + + private final EnsoContext context; + + /** + * Indicates this manager is closed. If assigned to {@code processor} field, this manager is + * considered closed. + */ + private final ProcessItems CLOSED = new ProcessItems(null); + + /** + * The current processor (with own worker {@link Thread}) that is supposed to await {@link + * #removeNextQueuedItem(ProcessItems)} and process them by properly invoking their finalizers + * using a safepoint action. + * + *

If this field is set to {@link #CLOSED}, then this resource manager is considered to be + * closed. + * + *

@GuardedBy("this") + */ + private ProcessItems processor; /** * Creates a new instance of Resource Manager. @@ -60,11 +86,6 @@ public final class ResourceManager { } } - @CompilerDirectives.TruffleBoundary - private Item removeFromItems(PhantomReference it) { - return items.remove(it); - } - /** * Manually and unconditionally finalizes the resource. Ignores the parking mechanism, assuming * the user now has full control over the resource. @@ -100,20 +121,15 @@ public final class ResourceManager { * @return a wrapper object, containing the resource and serving as a reachability probe */ @CompilerDirectives.TruffleBoundary - public ManagedResource register(Object object, Object function) { - if (isClosed) { + public synchronized ManagedResource register(Object object, Object function) { + if (CLOSED == processor) { throw EnsoContext.get(null) .raiseAssertionPanic( null, "Can't register new resources after resource manager is closed.", null); } - if (workerThread == null || !workerThread.isAlive()) { - worker.setKilled(false); - workerThread = context.createThread(true, worker); - workerThread.start(); - } var resource = new ManagedResource(object, r -> new Item(r, object, function, referenceQueue)); var ref = (Item) resource.getPhantomReference(); - items.put(ref, ref); + addPendingItem(ref); return resource; } @@ -125,25 +141,92 @@ public final class ResourceManager { * will be run in it. */ public void shutdown() { - isClosed = true; - worker.setKilled(true); - if (workerThread != null) { - while (true) { - try { - workerThread.interrupt(); - workerThread.join(); - break; - } catch (InterruptedException ignored) { - } + Item[] toFinalize; + ProcessItems lastProcessor; + synchronized (this) { + if (processor == CLOSED) { + // already shut(-ting) down + return; } + toFinalize = pendingItems.toArray(Item[]::new); + lastProcessor = processor; + processor = CLOSED; } - for (PhantomReference key : items.keySet()) { - Item it = removeFromItems(key); - if (it != null) { + if (lastProcessor != null) { + for (var it : lastProcessor.awaitShutdown()) { // Finalize unconditionally – all other threads are dead by now. it.finalizeNow(context); } } + for (var it : toFinalize) { + // Finalize unconditionally – these items weren't picked by any thread. + it.finalizeNow(context); + } + } + + /** + * Adds pending item into the existing processor. If there is no processor, it allocates new and + * starts its processing thread. + */ + @CompilerDirectives.TruffleBoundary + private synchronized void addPendingItem(Item item) { + if (processor == null) { + processor = new ProcessItems(r -> context.createThread(true, r)); + } + pendingItems.add(item); + } + + @CompilerDirectives.TruffleBoundary + private synchronized void removeFromItems(PhantomReference it) { + if (it instanceof Item item) { + pendingItems.remove(item); + if (pendingItems.isEmpty() && processor != null) { + processor.awake(); + } + } + } + + /** + * Awaits next item in the queue, if any. + * + * @param p the processor that queries + * @return item from queue or {@code null} if {@code p} processor may be eligible for a shutdown + */ + @CompilerDirectives.TruffleBoundary + private Reference removeNextQueuedItem(ProcessItems p) { + boolean empty; + synchronized (this) { + if (processor != p) { + return null; + } + assert Thread.currentThread() == p.workerThread; + assert p.isActive(); + empty = pendingItems.isEmpty(); + } + try { + if (empty) { + return referenceQueue.remove(KEEP_ALIVE); + } else { + return referenceQueue.remove(); + } + } catch (InterruptedException ex) { + return null; + } + } + + /** If the current processor has no pending items, it deactivates it. */ + @CompilerDirectives.TruffleBoundary + private synchronized void shutdownProcessorIfNoPending(ProcessItems p) { + assert Thread.currentThread() == p.workerThread; + if (processor == p && pendingItems.isEmpty()) { + processor = null; + } + } + + /** Verifies whether given processor is still active. */ + @CompilerDirectives.TruffleBoundary + private synchronized boolean isActive(ProcessItems p) { + return processor == p; } /** @@ -152,26 +235,38 @@ public final class ResourceManager { * in {@link #perform} method inside of Enso execution context. */ private final class ProcessItems extends ThreadLocalAction implements Runnable { - /** - * @GuardedBy("pendingItems") - */ - private final List pendingItems = new ArrayList<>(); + private final Thread workerThread; /** - * @GuardedBy("pendingItems") + * @GuardedBy("toFinalize") */ - private Future request; + private final List toFinalize = new ArrayList<>(); - private volatile boolean killed = false; + /** + * @GuardedBy("toFinalize") + */ + private Future safepointRequest; - ProcessItems() { + ProcessItems(Function threadFactory) { super(false, false, true); + if (threadFactory != null) { + this.workerThread = threadFactory.apply(this); + this.workerThread.start(); + } else { + this.workerThread = null; + } + } + + /** Is this processor still active */ + final boolean isActive() { + assert workerThread != null; + return ResourceManager.this.isActive(this); } /** * Runs at a safe point in middle of regular Enso program execution. Gathers all available - * {@link #pendingItems} and runs their finalizers. Removes all processed items from {@link - * #pendingItems}. If there are any remaining, continues processing them. Otherwise finishes. + * {@link #toFinalize} and runs their finalizers. Removes all processed items from {@link + * #toFinalize}. If there are any remaining, continues processing them. Otherwise finishes. * * @param access not used for anything */ @@ -180,27 +275,27 @@ public final class ResourceManager { var isMyThreadChoosen = false; for (; ; ) { Item[] toProcess; - synchronized (pendingItems) { + synchronized (toFinalize) { if (!isMyThreadChoosen) { - if (request == null || request.isCancelled()) { - // some thread is already handing the request + if (safepointRequest == null || safepointRequest.isCancelled()) { + // some thread is already handing the safepointRequest return; } else { - // I am choosen and I will loop and process pendingItems + // I am choosen and I will loop and process lastProcessor // until they are available isMyThreadChoosen = true; - // signal others this request has choosen thread - request.cancel(false); + // signal others this safepointRequest has choosen thread + safepointRequest.cancel(false); } } - if (pendingItems.isEmpty()) { + if (toFinalize.isEmpty()) { // nothing to process anymore, - // signal request is finished and new one shall be scheduled - request = null; + // signal safepointRequest is finished and new one shall be scheduled + safepointRequest = null; return; } - toProcess = pendingItems.toArray(Item[]::new); - pendingItems.clear(); + toProcess = toFinalize.toArray(Item[]::new); + toFinalize.clear(); } for (var it : toProcess) { it.finalizeNow(context); @@ -212,44 +307,54 @@ public final class ResourceManager { /** * Running in its own thread. Waiting for {@link #referenceQueue} to be populated with GCed * items. Scheduling {@link #perform} action at safe points while passing the {@link Item}s to - * it via {@link #pendingItems}. + * it via {@link #toFinalize}. */ @Override public void run() { while (true) { - try { - Reference ref = referenceQueue.remove(); - if (!killed) { - if (ref instanceof Item it) { - it.flaggedForFinalization.set(true); - synchronized (pendingItems) { - if (request == null) { - request = context.submitThreadLocal(null, this); - } - pendingItems.add(it); - } + if (!isActive()) { + return; + } + var ref = removeNextQueuedItem(this); + if (ref instanceof Item it) { + it.flaggedForFinalization.set(true); + synchronized (toFinalize) { + if (safepointRequest == null) { + safepointRequest = context.submitThreadLocal(null, this); } + toFinalize.add(it); } - if (killed) { - return; - } - } catch (InterruptedException e) { - if (killed) { - return; - } + removeFromItems(it); + } else { + shutdownProcessorIfNoPending(this); } } } + /** Awakes the associated {@link #workerThread} by interrupting it. */ + void awake() { + if (workerThread != null) { + workerThread.interrupt(); + } + } + /** - * Sets the killed flag of this thread. This flag being set to {@code true} will force it to - * stop execution at the soonest possible safe point. Other than setting this flag, the thread - * should also be interrupted to read it, in case it is blocked on an interruptible operation. + * Awaits shutdown of the worker thread. Can only be called when this processor is no longer + * active. * - * @param killed whether the thread should stop execution upon reading the flag. + * @return the list of items that deserve to be finalized */ - void setKilled(boolean killed) { - this.killed = killed; + Collection awaitShutdown() { + assert !isActive() : "Ready to shutdown"; + assert Thread.currentThread() != workerThread : "Cannot shutdown own thread"; + awake(); + while (workerThread.isAlive()) { + try { + workerThread.join(); + } catch (InterruptedException ex) { + } + } + return toFinalize; } }