When there are no pendingItems for a KEEP_ALIVE period, stop RequestQueue observer thread (#10890)

This commit is contained in:
Jaroslav Tulach 2024-08-27 10:10:07 +02:00 committed by GitHub
parent bd3ebc5000
commit 1500849c32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 220 additions and 80 deletions

View File

@ -0,0 +1,35 @@
package org.enso.interpreter.test.instrument;
import static org.junit.Assert.fail;
import java.io.OutputStream;
import java.io.PrintStream;
import org.junit.Test;
public final class RuntimeManagementCleanupTest {
@Test
public void cleanUp() throws InterruptedException {
var cnt = 0;
for (var i = 1; i < 100; i++) {
cnt = 0;
var err = i % 10 == 0 ? System.err : new PrintStream(OutputStream.nullOutputStream());
for (var threadStack : Thread.getAllStackTraces().entrySet()) {
if (threadStack.getKey().getName().startsWith("Thread-")) {
err.println(threadStack.getKey().getName());
for (var frame : threadStack.getValue()) {
err.println(" " + frame);
}
cnt++;
}
}
err.println("Found " + cnt + " suspicious threads");
if (cnt == 0) {
return;
}
System.gc();
Thread.sleep(100);
}
fail("There are still " + cnt + " suspicious threads");
}
}

View File

@ -7,23 +7,49 @@ import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.enso.interpreter.runtime.data.ManagedResource;
/** Allows the context to attach garbage collection hooks on the removal of certain objects. */
public final class ResourceManager {
private final EnsoContext context;
private volatile boolean isClosed = false;
private volatile Thread workerThread;
private final ProcessItems worker = new ProcessItems();
/** Amount of milliseconds to wait for another resource when none is pending. */
private static final long KEEP_ALIVE = 1000;
/** Queue with resources eligible for finalization */
private final ReferenceQueue<ManagedResource> referenceQueue = new ReferenceQueue<>();
private final ConcurrentMap<PhantomReference<ManagedResource>, Item> items =
new ConcurrentHashMap<>();
/**
* All the items that were issued, but haven't yet arrived at {@link #referenceQueue} for
* finalization.
*
* <p>@GuardedBy("this")
*/
private final List<Item> pendingItems = new ArrayList<>();
private final EnsoContext context;
/**
* Indicates this manager is closed. If assigned to {@code processor} field, this manager is
* considered closed.
*/
private final ProcessItems CLOSED = new ProcessItems(null);
/**
* The current processor (with own worker {@link Thread}) that is supposed to await {@link
* #removeNextQueuedItem(ProcessItems)} and process them by properly invoking their finalizers
* using a safepoint action.
*
* <p>If this field is set to {@link #CLOSED}, then this resource manager is considered to be
* closed.
*
* <p>@GuardedBy("this")
*/
private ProcessItems processor;
/**
* Creates a new instance of Resource Manager.
@ -60,11 +86,6 @@ public final class ResourceManager {
}
}
@CompilerDirectives.TruffleBoundary
private Item removeFromItems(PhantomReference<ManagedResource> it) {
return items.remove(it);
}
/**
* Manually and unconditionally finalizes the resource. Ignores the parking mechanism, assuming
* the user now has full control over the resource.
@ -100,20 +121,15 @@ public final class ResourceManager {
* @return a wrapper object, containing the resource and serving as a reachability probe
*/
@CompilerDirectives.TruffleBoundary
public ManagedResource register(Object object, Object function) {
if (isClosed) {
public synchronized ManagedResource register(Object object, Object function) {
if (CLOSED == processor) {
throw EnsoContext.get(null)
.raiseAssertionPanic(
null, "Can't register new resources after resource manager is closed.", null);
}
if (workerThread == null || !workerThread.isAlive()) {
worker.setKilled(false);
workerThread = context.createThread(true, worker);
workerThread.start();
}
var resource = new ManagedResource(object, r -> new Item(r, object, function, referenceQueue));
var ref = (Item) resource.getPhantomReference();
items.put(ref, ref);
addPendingItem(ref);
return resource;
}
@ -125,25 +141,92 @@ public final class ResourceManager {
* will be run in it.
*/
public void shutdown() {
isClosed = true;
worker.setKilled(true);
if (workerThread != null) {
while (true) {
try {
workerThread.interrupt();
workerThread.join();
break;
} catch (InterruptedException ignored) {
}
Item[] toFinalize;
ProcessItems lastProcessor;
synchronized (this) {
if (processor == CLOSED) {
// already shut(-ting) down
return;
}
toFinalize = pendingItems.toArray(Item[]::new);
lastProcessor = processor;
processor = CLOSED;
}
for (PhantomReference<ManagedResource> key : items.keySet()) {
Item it = removeFromItems(key);
if (it != null) {
if (lastProcessor != null) {
for (var it : lastProcessor.awaitShutdown()) {
// Finalize unconditionally all other threads are dead by now.
it.finalizeNow(context);
}
}
for (var it : toFinalize) {
// Finalize unconditionally these items weren't picked by any thread.
it.finalizeNow(context);
}
}
/**
* Adds pending item into the existing processor. If there is no processor, it allocates new and
* starts its processing thread.
*/
@CompilerDirectives.TruffleBoundary
private synchronized void addPendingItem(Item item) {
if (processor == null) {
processor = new ProcessItems(r -> context.createThread(true, r));
}
pendingItems.add(item);
}
@CompilerDirectives.TruffleBoundary
private synchronized void removeFromItems(PhantomReference<ManagedResource> it) {
if (it instanceof Item item) {
pendingItems.remove(item);
if (pendingItems.isEmpty() && processor != null) {
processor.awake();
}
}
}
/**
* Awaits next item in the queue, if any.
*
* @param p the processor that queries
* @return item from queue or {@code null} if {@code p} processor may be eligible for a shutdown
*/
@CompilerDirectives.TruffleBoundary
private Reference<? extends ManagedResource> removeNextQueuedItem(ProcessItems p) {
boolean empty;
synchronized (this) {
if (processor != p) {
return null;
}
assert Thread.currentThread() == p.workerThread;
assert p.isActive();
empty = pendingItems.isEmpty();
}
try {
if (empty) {
return referenceQueue.remove(KEEP_ALIVE);
} else {
return referenceQueue.remove();
}
} catch (InterruptedException ex) {
return null;
}
}
/** If the current processor has no pending items, it deactivates it. */
@CompilerDirectives.TruffleBoundary
private synchronized void shutdownProcessorIfNoPending(ProcessItems p) {
assert Thread.currentThread() == p.workerThread;
if (processor == p && pendingItems.isEmpty()) {
processor = null;
}
}
/** Verifies whether given processor is still active. */
@CompilerDirectives.TruffleBoundary
private synchronized boolean isActive(ProcessItems p) {
return processor == p;
}
/**
@ -152,26 +235,38 @@ public final class ResourceManager {
* in {@link #perform} method inside of Enso execution context.
*/
private final class ProcessItems extends ThreadLocalAction implements Runnable {
/**
* @GuardedBy("pendingItems")
*/
private final List<Item> pendingItems = new ArrayList<>();
private final Thread workerThread;
/**
* @GuardedBy("pendingItems")
* @GuardedBy("toFinalize")
*/
private Future<Void> request;
private final List<Item> toFinalize = new ArrayList<>();
private volatile boolean killed = false;
/**
* @GuardedBy("toFinalize")
*/
private Future<Void> safepointRequest;
ProcessItems() {
ProcessItems(Function<Runnable, Thread> threadFactory) {
super(false, false, true);
if (threadFactory != null) {
this.workerThread = threadFactory.apply(this);
this.workerThread.start();
} else {
this.workerThread = null;
}
}
/** Is this processor still active */
final boolean isActive() {
assert workerThread != null;
return ResourceManager.this.isActive(this);
}
/**
* Runs at a safe point in middle of regular Enso program execution. Gathers all available
* {@link #pendingItems} and runs their finalizers. Removes all processed items from {@link
* #pendingItems}. If there are any remaining, continues processing them. Otherwise finishes.
* {@link #toFinalize} and runs their finalizers. Removes all processed items from {@link
* #toFinalize}. If there are any remaining, continues processing them. Otherwise finishes.
*
* @param access not used for anything
*/
@ -180,27 +275,27 @@ public final class ResourceManager {
var isMyThreadChoosen = false;
for (; ; ) {
Item[] toProcess;
synchronized (pendingItems) {
synchronized (toFinalize) {
if (!isMyThreadChoosen) {
if (request == null || request.isCancelled()) {
// some thread is already handing the request
if (safepointRequest == null || safepointRequest.isCancelled()) {
// some thread is already handing the safepointRequest
return;
} else {
// I am choosen and I will loop and process pendingItems
// I am choosen and I will loop and process lastProcessor
// until they are available
isMyThreadChoosen = true;
// signal others this request has choosen thread
request.cancel(false);
// signal others this safepointRequest has choosen thread
safepointRequest.cancel(false);
}
}
if (pendingItems.isEmpty()) {
if (toFinalize.isEmpty()) {
// nothing to process anymore,
// signal request is finished and new one shall be scheduled
request = null;
// signal safepointRequest is finished and new one shall be scheduled
safepointRequest = null;
return;
}
toProcess = pendingItems.toArray(Item[]::new);
pendingItems.clear();
toProcess = toFinalize.toArray(Item[]::new);
toFinalize.clear();
}
for (var it : toProcess) {
it.finalizeNow(context);
@ -212,44 +307,54 @@ public final class ResourceManager {
/**
* Running in its own thread. Waiting for {@link #referenceQueue} to be populated with GCed
* items. Scheduling {@link #perform} action at safe points while passing the {@link Item}s to
* it via {@link #pendingItems}.
* it via {@link #toFinalize}.
*/
@Override
public void run() {
while (true) {
try {
Reference<? extends ManagedResource> ref = referenceQueue.remove();
if (!killed) {
if (ref instanceof Item it) {
it.flaggedForFinalization.set(true);
synchronized (pendingItems) {
if (request == null) {
request = context.submitThreadLocal(null, this);
}
pendingItems.add(it);
}
if (!isActive()) {
return;
}
var ref = removeNextQueuedItem(this);
if (ref instanceof Item it) {
it.flaggedForFinalization.set(true);
synchronized (toFinalize) {
if (safepointRequest == null) {
safepointRequest = context.submitThreadLocal(null, this);
}
toFinalize.add(it);
}
if (killed) {
return;
}
} catch (InterruptedException e) {
if (killed) {
return;
}
removeFromItems(it);
} else {
shutdownProcessorIfNoPending(this);
}
}
}
/** Awakes the associated {@link #workerThread} by interrupting it. */
void awake() {
if (workerThread != null) {
workerThread.interrupt();
}
}
/**
* Sets the killed flag of this thread. This flag being set to {@code true} will force it to
* stop execution at the soonest possible safe point. Other than setting this flag, the thread
* should also be interrupted to read it, in case it is blocked on an interruptible operation.
* Awaits shutdown of the worker thread. Can only be called when this processor is no longer
* active.
*
* @param killed whether the thread should stop execution upon reading the flag.
* @return the list of items that deserve to be finalized
*/
void setKilled(boolean killed) {
this.killed = killed;
Collection<Item> awaitShutdown() {
assert !isActive() : "Ready to shutdown";
assert Thread.currentThread() != workerThread : "Cannot shutdown own thread";
awake();
while (workerThread.isAlive()) {
try {
workerThread.join();
} catch (InterruptedException ex) {
}
}
return toFinalize;
}
}