pythreading: integrate RGenerator with Python GC

Summary:
The `RGenerator` and `RGeneratorIter` types keep Python objects internally.
Let's be a good citizen and tell CPython the object graph so CPython can
properly GC related objects.

Reviewed By: muirdm

Differential Revision: D45898262

fbshipit-source-id: da2b04ec6f7928c486f3f691885bf2ed8cb06055
This commit is contained in:
Jun Wu 2023-05-16 20:05:57 -07:00 committed by Facebook GitHub Bot
parent fb329a2898
commit 05c699b80a

View File

@ -8,6 +8,8 @@
#![allow(non_camel_case_types)]
use std::cell::Cell;
use std::cell::RefCell;
use std::mem;
use std::sync::Condvar;
use std::sync::Mutex;
use std::thread;
@ -324,11 +326,21 @@ py_class!(class bug29988wrapper |py| {
// Reentrant generator. Can be iterated multiple times by using the `iter()` method.
py_class!(class RGenerator |py| {
// Main iterator (non-reentrant).
data iternext: Mutex<PyObject>;
// Main generator (non-reentrant).
//
// The generator cannot run from multiple threads. It will raise
// `ValueError: generator already executing` if doing so. One of the goals of
// `RGenerator` is to use internal locking so `RGenerator` can "run" from
// multiple threads.
//
// Uses `RefCell` instead of `Mutex` so `__traverse__` does not deadlock.
data iternext: RefCell<Option<PyObject>>;
// Mutex to ensure there is only one iternext running.
data iternext_mutex: Mutex<()>;
// Items produced by iter.
data iterlist: PyList;
data iterlist: RefCell<Option<PyList>>;
// Whether iteration was completed.
data itercompleted: Cell<bool>;
@ -339,7 +351,7 @@ py_class!(class RGenerator |py| {
/// Obtains an iterator that iterates from the beginning.
def iter(&self, skip: usize = 0) -> PyResult<RGeneratorIter> {
RGeneratorIter::create_instance(py, self.clone_ref(py), Cell::new(skip))
RGeneratorIter::create_instance(py, RefCell::new(Some(self.clone_ref(py))), Cell::new(skip))
}
/// Iterate to the end of the original generator.
@ -347,19 +359,52 @@ py_class!(class RGenerator |py| {
if self.itercompleted(py).get() {
Ok(0)
} else {
let iter = self.iter(py, self.iterlist(py).len(py))?;
let iter = ObjectProtocol::iter(iter.as_object(), py)?;
Ok(iter.count())
let iterlist = self.iterlist(py).borrow();
if let Some(iterlist) = iterlist.as_ref() {
let iter = self.iter(py, iterlist.len(py))?;
let iter = ObjectProtocol::iter(iter.as_object(), py)?;
Ok(iter.count())
} else {
Err(unavailable(py))
}
}
}
def list(&self) -> PyResult<PyList> {
Ok(self.iterlist(py).clone_ref(py))
let iterlist = self.iterlist(py).borrow();
if let Some(iterlist) = iterlist.as_ref() {
Ok(iterlist.clone_ref(py))
} else {
Err(unavailable(py))
}
}
def completed(&self) -> PyResult<bool> {
Ok(self.itercompleted(py).get())
}
def __traverse__(&self, visit) {
let iterlist = self.iterlist(py).borrow();
if let Some(ref obj) = &*iterlist {
visit.call(obj)?
}
drop(iterlist);
let iternext = self.iternext(py).borrow();
if let Some(ref obj) = &*iternext {
visit.call(obj)?
}
Ok(())
}
def __clear__(&self) {
let mut list = self.iterlist(py).borrow_mut();
let obj = mem::replace(&mut *list, None);
obj.release_ref(py);
drop(list);
let mut next = self.iternext(py).borrow_mut();
let obj = mem::replace(&mut *next, None);
obj.release_ref(py);
}
});
impl RGenerator {
@ -369,12 +414,22 @@ impl RGenerator {
Err(_) => iter.getattr(py, "next")?,
Ok(next) => next,
};
Self::create_instance(py, Mutex::new(next), PyList::new(py, &[]), Cell::new(false))
Self::create_instance(
py,
RefCell::new(Some(next)),
Mutex::new(()),
RefCell::new(Some(PyList::new(py, &[]))),
Cell::new(false),
)
}
}
fn unavailable(py: Python) -> PyErr {
PyErr::new::<exc::ValueError, _>(py, "RGenerator: cannot access fields after garbage collect")
}
py_class!(class RGeneratorIter |py| {
data rgen: RGenerator;
data rgen: RefCell<Option<RGenerator>>;
data index: Cell<usize>;
def __iter__(&self) -> PyResult<Self> {
@ -382,46 +437,82 @@ py_class!(class RGeneratorIter |py| {
}
def __next__(&self) -> PyResult<Option<PyObject>> {
let rgen = self.rgen(py).borrow();
let rgen = match &*rgen {
Some(rgen) => rgen,
None => return Err(unavailable(py)),
};
// Ensure that "__next__" is atomic by locking.
// Cannot rely on Python GIL because iternext.call(py) might release it.
let mutex = self.rgen(py).iternext(py);
if let Ok(locked) = mutex.try_lock() {
self.next_internal(py, &locked)
let mutex = rgen.iternext_mutex(py);
let locked = mutex.try_lock();
if let Ok(_) = locked {
if let Some(next) = &*rgen.iternext(py).borrow() {
self.next_internal(py, next)
} else {
Err(unavailable(py))
}
} else {
// Release Python GIL to give other threads chances to release mutex.
let locked = py.allow_threads(|| mutex.lock().unwrap());
self.next_internal(py, &locked)
let _locked = py.allow_threads(|| mutex.lock().unwrap());
if let Some(next) = &*rgen.iternext(py).borrow() {
self.next_internal(py, &next)
} else {
Err(unavailable(py))
}
}
}
def __traverse__(&self, visit) {
let rgen = self.rgen(py).borrow();
if let Some(ref obj) = &*rgen {
visit.call(obj)?
}
Ok(())
}
def __clear__(&self) {
let mut rgen = self.rgen(py).borrow_mut();
let obj = mem::replace(&mut *rgen, None);
obj.release_ref(py);
}
});
impl RGeneratorIter {
// The caller should use locking to ensure `iternext` is not being called
// from another thread.
fn next_internal(&self, py: Python, iternext: &PyObject) -> PyResult<Option<PyObject>> {
let rgen = self.rgen(py);
let index = self.index(py).get();
while rgen.iterlist(py).len(py) <= index && !rgen.itercompleted(py).get() {
match iternext.call(py, NoArgs, None) {
Ok(item) => {
rgen.iterlist(py).append(py, item);
}
Err(err) => {
// Could be StopIteration.
rgen.itercompleted(py).set(true);
return Err(err);
}
};
}
let result = if rgen.iterlist(py).len(py) > index {
Some(rgen.iterlist(py).get_item(py, index))
} else {
None
let rgen = self.rgen(py).borrow();
let rgen = match &*rgen {
Some(rgen) => rgen,
None => return Err(unavailable(py)),
};
let index = self.index(py).get();
let iterlist = rgen.iterlist(py).borrow();
if let Some(iterlist) = &*iterlist {
while iterlist.len(py) <= index && !rgen.itercompleted(py).get() {
match iternext.call(py, NoArgs, None) {
Ok(item) => {
iterlist.append(py, item);
}
Err(err) => {
// Could be StopIteration.
rgen.itercompleted(py).set(true);
return Err(err);
}
};
}
self.index(py).set(index + 1);
Ok(result)
let result = if iterlist.len(py) > index {
Some(iterlist.get_item(py, index))
} else {
None
};
self.index(py).set(index + 1);
Ok(result)
} else {
Err(unavailable(py))
}
}
}