Prevent re-entrant execution of finalizers (#10602)

Fixes #10211 by avoiding re-entrant execution of finalizers.
This commit is contained in:
Jaroslav Tulach 2024-07-22 22:11:54 +02:00 committed by GitHub
parent 033e4ae323
commit b6bbfc5cda
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 182 additions and 79 deletions

View File

@ -274,6 +274,9 @@ impl RunContext {
// sbt:warning: java.lang.ClassNotFoundException:
// org.enso.interpreter.node.expression.builtin.bool.True
ide_ci::fs::remove_if_exists(&self.paths.repo_root.engine.runtime.target)?;
// cleanup distribution from previous build
// it is fast to assemble it again
ide_ci::fs::remove_if_exists(&self.paths.repo_root.built_distribution)?;
// We want to start this earlier, and await only before Engine build starts.
let perhaps_generate_java_from_rust_job =

View File

@ -24,6 +24,7 @@ class RuntimeManagementTest extends InterpreterTest {
"""import Standard.Base.Runtime.Thread
|import Standard.Base.IO
|import Standard.Base.Nothing
|import Standard.Base.Data.Numbers.Number
|
|foo x =
| if x == 0 then IO.println "Start." else Nothing
@ -84,8 +85,8 @@ class RuntimeManagementTest extends InterpreterTest {
if (round % 10 == 0) {
forceGC();
}
val res = eval("main a b = a * b").execute(7, 6)
assertResult(42)(res.asInt)
val res = eval("main a b = a + b").execute("Hello", "Enso")
assertResult("HelloEnso")(res.asString)
Thread.sleep(100)
totalOut ++= consumeOut
}
@ -130,6 +131,7 @@ class RuntimeManagementTest extends InterpreterTest {
|from Standard.Base.Runtime.Resource import Managed_Resource
|import Standard.Base.IO
|import Standard.Base.Nothing
|import Standard.Base.Data.Numbers.Number
|
|type Mock_File
| Value i
@ -164,6 +166,7 @@ class RuntimeManagementTest extends InterpreterTest {
|from Standard.Base.Runtime.Resource import Managed_Resource
|import Standard.Base.IO
|import Standard.Base.Nothing
|import Standard.Base.Data.Numbers.Number
|
|type Mock_File
| Value i

View File

@ -358,15 +358,19 @@ public abstract class SortVectorNode extends Node {
}
private Object attachDifferentComparatorsWarning(Object vector, List<Group> groups) {
var diffCompsMsg =
groups.stream()
.map(Group::comparator)
.map(comparator -> comparator.getQualifiedName().toString())
.collect(Collectors.joining(", "));
var text = Text.create("Different comparators: [" + diffCompsMsg + "]");
var ctx = EnsoContext.get(this);
var warn = Warning.create(ctx, text, this);
return WithWarnings.appendTo(ctx, vector, false, warn);
if (groups.size() > 1) {
var diffCompsMsg =
groups.stream()
.map(Group::comparator)
.map(comparator -> comparator.getQualifiedName().toString())
.collect(Collectors.joining(", "));
var text = Text.create("Different comparators: [" + diffCompsMsg + "]");
var ctx = EnsoContext.get(this);
var warn = Warning.create(ctx, text, this);
return WithWarnings.appendTo(ctx, vector, false, warn);
} else {
return vector;
}
}
private String getDefaultComparatorQualifiedName() {

View File

@ -6,20 +6,21 @@ import com.oracle.truffle.api.interop.InteropLibrary;
import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.enso.interpreter.runtime.data.ManagedResource;
/** Allows the context to attach garbage collection hooks on the removal of certain objects. */
public class ResourceManager {
public final class ResourceManager {
private final EnsoContext context;
private volatile boolean isClosed = false;
private volatile Thread workerThread;
private final Runner worker = new Runner();
private final ProcessItems worker = new ProcessItems();
private final ReferenceQueue<ManagedResource> referenceQueue = new ReferenceQueue<>();
private final ConcurrentMap<PhantomReference<ManagedResource>, Item> items =
new ConcurrentHashMap<>();
@ -39,10 +40,9 @@ public class ResourceManager {
*
* @param resource the resource to park.
*/
@CompilerDirectives.TruffleBoundary
public void park(ManagedResource resource) {
if (resource.getPhantomReference() instanceof Item it) {
it.getParkedCount().incrementAndGet();
it.park();
}
}
@ -52,14 +52,19 @@ public class ResourceManager {
*
* @param resource the resource to unpark.
*/
@CompilerDirectives.TruffleBoundary
public void unpark(ManagedResource resource) {
if (resource.getPhantomReference() instanceof Item it) {
it.getParkedCount().decrementAndGet();
scheduleFinalizationAtSafepoint(it);
if (it.unpark(context)) {
removeFromItems(it);
}
}
}
@CompilerDirectives.TruffleBoundary
private Item removeFromItems(PhantomReference<ManagedResource> it) {
return items.remove(it);
}
/**
* Manually and unconditionally finalizes the resource. Ignores the parking mechanism, assuming
* the user now has full control over the resource.
@ -69,7 +74,7 @@ public class ResourceManager {
@CompilerDirectives.TruffleBoundary
public void close(ManagedResource resource) {
if (resource.getPhantomReference() instanceof Item it) {
items.remove(it);
removeFromItems(it);
// Unconditional finalization user controls the resource manually.
it.finalizeNow(context);
}
@ -83,39 +88,7 @@ public class ResourceManager {
*/
@CompilerDirectives.TruffleBoundary
public void take(ManagedResource resource) {
items.remove(resource.getPhantomReference());
}
private void scheduleFinalizationAtSafepoint(Item it) {
if (it.isFlaggedForFinalization().get()) {
if (it.getParkedCount().get() == 0) {
// We already know that isFlaggedForFinalization was true at some
// point and there are no other threads still parking the underlying
// value. Note that it is impossible for parked count to increase after
// the value is flagged for finalization, as parking the value requires
// a live reference. We need to check if another thread didn't reach
// here earlier to perform the finalization and reset the flag, so that
// no further attempts are made.
boolean continueFinalizing = it.isFlaggedForFinalization().compareAndSet(true, false);
if (continueFinalizing) {
var futureToCancel = new AtomicReference<Future<Void>>(null);
var performFinalizeNow =
new ThreadLocalAction(false, false, true) {
@Override
protected void perform(ThreadLocalAction.Access access) {
var tmp = futureToCancel.getAndSet(null);
if (tmp == null) {
return;
}
tmp.cancel(false);
it.finalizeNow(context);
items.remove(it);
}
};
futureToCancel.set(context.submitThreadLocal(null, performFinalizeNow));
}
}
}
removeFromItems(resource.getPhantomReference());
}
/**
@ -165,7 +138,7 @@ public class ResourceManager {
}
}
for (PhantomReference<ManagedResource> key : items.keySet()) {
Item it = items.remove(key);
Item it = removeFromItems(key);
if (it != null) {
// Finalize unconditionally all other threads are dead by now.
it.finalizeNow(context);
@ -174,12 +147,68 @@ public class ResourceManager {
}
/**
* The worker action for the underlying logic of this module. At least one such thread must be
* spawned in order for this module to be operational.
* Processes {@link Item}s eligible for GC. Plays two roles. First of all cleans {@link
* #referenceQueue} in {@link #run()} method running in its own thread. Then it invokes finalizers
* in {@link #perform} method inside of Enso execution context.
*/
private class Runner implements Runnable {
private final class ProcessItems extends ThreadLocalAction implements Runnable {
/**
* @GuardedBy("pendingItems")
*/
private final List<Item> pendingItems = new ArrayList<>();
/**
* @GuardedBy("pendingItems")
*/
private Future<Void> request;
private volatile boolean killed = false;
ProcessItems() {
super(false, false, true);
}
/**
* Runs at a safe point in middle of regular Enso program execution. Gathers all available
* {@link #pendingItems} and runs their finalizers. Removes all processed items from {@link
* #pendingItems}. If there are any remaining, continues processing them. Otherwise finishes.
*
* @param access not used for anything
*/
@Override
protected void perform(ThreadLocalAction.Access access) {
for (; ; ) {
Item[] toProcess;
synchronized (pendingItems) {
request.cancel(false);
if (pendingItems.isEmpty()) {
// nothing to process,
// signal request is finished
request = null;
return;
}
toProcess = pendingItems.toArray(Item[]::new);
// mark as being processed
pendingItems.set(0, null);
}
try {
for (var it : toProcess) {
it.finalizeNow(context);
removeFromItems(it);
}
} finally {
synchronized (pendingItems) {
pendingItems.subList(0, toProcess.length).clear();
}
}
}
}
/**
* Running in its own thread. Waiting for {@link #referenceQueue} to be populated with GCed
* items. Scheduling {@link #perform} action at safe points while passing the {@link Item}s to
* it via {@link #pendingItems}.
*/
@Override
public void run() {
while (true) {
@ -187,8 +216,13 @@ public class ResourceManager {
Reference<? extends ManagedResource> ref = referenceQueue.remove();
if (!killed) {
if (ref instanceof Item it) {
it.isFlaggedForFinalization().set(true);
scheduleFinalizationAtSafepoint(it);
it.flaggedForFinalization.set(true);
synchronized (pendingItems) {
if (request == null) {
request = context.submitThreadLocal(null, this);
}
pendingItems.add(it);
}
}
}
if (killed) {
@ -209,16 +243,32 @@ public class ResourceManager {
*
* @param killed whether the thread should stop execution upon reading the flag.
*/
public void setKilled(boolean killed) {
void setKilled(boolean killed) {
this.killed = killed;
}
}
/** A storage representation of a finalizable object handled by this system. */
private static class Item extends PhantomReference<ManagedResource> {
private static final class Item extends PhantomReference<ManagedResource> {
private final Object underlying;
private final Object finalizer;
/**
* Returns the counter of actions parking this object. The object can be safely finalized only
* if it's unreachable {@link #isFlaggedForFinalization()} and this counter is zero.
*
* @return the parking actions counter
*/
private final AtomicInteger parkedCount = new AtomicInteger();
/**
* Returns the boolean representing finalization status of this object. The object should be
* removed by the first thread that observes this flag to be set to true and the {@link
* #getParkedCount()} to be zero. If a thread intends to perform the finalization, it should set
* this flag to {@code false}.
*
* @return the finalization flag
*/
private final AtomicBoolean flaggedForFinalization = new AtomicBoolean();
/**
@ -229,7 +279,7 @@ public class ResourceManager {
* @param reference a phantom reference used for tracking the reachability status of the
* resource.
*/
public Item(
private Item(
ManagedResource referent,
Object underlying,
Object finalizer,
@ -245,7 +295,8 @@ public class ResourceManager {
*
* @param context current execution context
*/
public void finalizeNow(EnsoContext context) {
@CompilerDirectives.TruffleBoundary
private void finalizeNow(EnsoContext context) {
try {
InteropLibrary.getUncached(finalizer).execute(finalizer, underlying);
} catch (Exception e) {
@ -253,26 +304,22 @@ public class ResourceManager {
}
}
/**
* Returns the counter of actions parking this object. The object can be safely finalized only
* if it's unreachable {@link #isFlaggedForFinalization()} and this counter is zero.
*
* @return the parking actions counter
*/
public AtomicInteger getParkedCount() {
return parkedCount;
private void park() {
parkedCount.incrementAndGet();
}
/**
* Returns the boolean representing finalization status of this object. The object should be
* removed by the first thread that observes this flag to be set to true and the {@link
* #getParkedCount()} to be zero. If a thread intends to perform the finalization, it should set
* this flag to {@code false}.
*
* @return the finalization flag
* @return {@code true} if the finalizer was run
*/
public AtomicBoolean isFlaggedForFinalization() {
return flaggedForFinalization;
private boolean unpark(EnsoContext context) {
if (parkedCount.decrementAndGet() == 0) {
boolean continueFinalizing = flaggedForFinalization.compareAndSet(true, false);
if (continueFinalizing) {
finalizeNow(context);
return true;
}
}
return false;
}
}
}

View File

@ -0,0 +1,41 @@
from Standard.Base import all
import Standard.Base.Runtime.Managed_Resource.Managed_Resource
import Standard.Base.Runtime.Ref.Ref
type My_Resource
Value counter:Ref
close self =
self.counter.modify (x-> x-1)
Nothing
allocate counter:Ref =
counter.modify (+1)
Managed_Resource.register (My_Resource.Value counter) close_resource
close_resource resource = resource.close
repeat_cleanup_until_done counter =
go i =
if counter.get == 0 then Nothing else
if i % 100 == 0 then
IO.println "Still "+counter.get.to_text+" resources to clean up..."
Runtime.gc
@Tail_Call go i+1
go 1
perform_test n:Integer println =
counter = Ref.new 0
println "Allocating "+n.to_text+" resources..."
0.up_to n . each _->
My_Resource.allocate counter
println "Cleaning up..."
repeat_cleanup_until_done counter
println "All cleaned up! Remaining: "+counter.get.to_text
counter.get
main n=1000000 =
perform_test n IO.println

View File

@ -2,6 +2,7 @@ from Standard.Base import all
import Standard.Base.Data.Vector.Builder
import Standard.Base.Errors.Illegal_State.Illegal_State
import Standard.Base.Runtime.Managed_Resource.Managed_Resource
import project.Runtime.GC_Example
from Standard.Test import all
@ -57,6 +58,10 @@ add_specs suite_builder = suite_builder.group "Managed_Resource" group_builder->
r_3 = Panic.recover Any <| Managed_Resource.bracket 42 (_-> Nothing) (_-> Panic.throw "action")
r_3.catch . should_equal "action"
group_builder.specify "allocate lots of resources at once" <|
remaining = GC_Example.perform_test 100000 (_->Nothing)
remaining . should_equal 0
main filter=Nothing =
suite = Test.build suite_builder->
add_specs suite_builder