Re-sync with internal repository

This commit is contained in:
Lukas Piatkowski 2019-12-10 02:53:58 -08:00 committed by Lukasz Piatkowski
parent 3a3d0df5be
commit fcc72578ee
6 changed files with 0 additions and 1925 deletions

View File

@ -1,16 +0,0 @@
[package]
name = "asyncmemo"
version = "0.0.1"
authors = ["Facebook"]
license = "GPLv2+"
edition = "2018"
[dependencies]
bytes = "0.4.5"
futures = "0.1.17"
heapsize = "0.4.2"
lazy_static = "1.0.2"
linked-hash-map = "0.5.1"
parking_lot = "0.6.1"
futures_ext = { path = "../futures_ext" }

View File

@ -1,238 +0,0 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License found in the LICENSE file in the root
* directory of this source tree.
*/
use std::hash::Hash;
use crate::weight::Weight;
use linked_hash_map::LinkedHashMap;
#[derive(Debug, Clone)]
pub struct BoundedHash<K, V>
where
K: Eq + Hash,
{
hash: LinkedHashMap<K, V>,
entrylimit: usize, // max number of entries
weightlimit: usize, // max weight of entries
entrysizes: usize, // sum of (completed) entry weights
}
impl<K, V> BoundedHash<K, V>
where
K: Eq + Hash,
V: Weight,
{
pub fn new(entrylimit: usize, weightlimit: usize) -> Self {
BoundedHash {
hash: LinkedHashMap::new(),
entrysizes: 0,
entrylimit,
weightlimit,
}
}
pub fn total_weight(&self) -> usize {
self.entrysizes
}
#[inline]
pub fn len(&self) -> usize {
self.hash.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.hash.is_empty()
}
fn remove_one(&mut self, v: &V) {
self.entrysizes -= v.get_weight();
}
/// Trim an entry with LRU policy
fn trim_one(&mut self) -> bool {
match self.hash.pop_front() {
Some((_k, v)) => {
self.remove_one(&v);
true
}
None => false,
}
}
/// Trim enough entries to make room for `additional` new ones.
pub fn trim_entries(&mut self, additional: usize) -> bool {
if additional > self.entrylimit {
return false;
}
let limit = self.entrylimit - additional;
while self.hash.len() > limit {
if !self.trim_one() {
break;
}
}
true
}
/// Trim enough weight to make room for `additional` new weight.
pub fn trim_weight(&mut self, additional: usize) -> bool {
if additional > self.weightlimit {
return false;
}
let limit = self.weightlimit - additional;
while self.total_weight() > limit {
if !self.trim_one() {
break;
}
}
true
}
/// Trim back to desired limits
pub fn trim(&mut self) {
self.trim_entries(0);
self.trim_weight(0);
}
pub fn clear(&mut self) {
self.hash.clear();
self.entrysizes = 0;
}
/// Trim a specific key, returning it if it existed, after updating the weight
pub fn remove(&mut self, key: &K) -> Option<V> {
self.hash.remove(key).map(|v| {
self.remove_one(&v);
v
})
}
/// Insert new entry, updating weights
///
/// Insert fails if there isn't capacity for the new entry, returning the key and value.
pub fn insert(&mut self, k: K, v: V) -> Result<Option<V>, (K, V)> {
// Remove the key if it's already in the hash
let oldv = self.hash.remove(&k);
if let Some(ref removed) = oldv {
self.entrysizes -= removed.get_weight();
}
if !self.trim_entries(1) {
// seems unlikely, but anyway
return Err((k, v));
}
let vw = v.get_weight();
if !self.trim_weight(vw) {
return Err((k, v));
}
self.entrysizes += vw;
self.hash.insert(k, v);
Ok(oldv)
}
#[cfg(test)]
#[inline]
pub fn get(&self, key: &K) -> Option<&V> {
self.hash.get(key)
}
#[inline]
pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
self.hash.get_mut(key)
}
}
#[cfg(test)]
mod test {
use super::*;
use std::ops::{Deref, DerefMut};
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
struct Weighted<T>(T, usize);
impl<T> Weight for Weighted<T> {
fn get_weight(&self) -> usize {
self.1
}
}
impl<T> Deref for Weighted<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for Weighted<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[test]
fn simple() {
let mut c = BoundedHash::new(10, 1000);
{
let ok = c.insert("hello", Weighted("world", 100)).is_ok();
assert!(ok, "insert failed");
assert_eq!(c.total_weight(), 100);
assert_eq!(c.len(), 1);
}
{
let v = c.get(&"hello").expect("get failed");
assert_eq!(v, &Weighted("world", 100));
}
{
let ok = c.remove(&"hello").is_some();
assert!(ok, "remove failed");
assert_eq!(c.total_weight(), 0);
assert_eq!(c.len(), 0);
}
}
#[test]
fn toobig() {
let mut c = BoundedHash::new(10, 1000);
let ok = c.insert("hello", Weighted("world", 100)).is_ok();
assert!(ok, "insert failed");
assert_eq!(c.total_weight(), 100);
assert_eq!(c.len(), 1);
let err = c.insert("bubble", Weighted("lead", 1001)).is_err();
assert!(err, "insert worked?");
assert_eq!(c.total_weight(), 100);
assert_eq!(c.len(), 1);
let ok = c.insert("bubble", Weighted("balloon", 880)).is_ok();
assert!(ok, "insert failed?");
assert_eq!(c.total_weight(), 100 + 880);
assert_eq!(c.len(), 2);
}
}

View File

@ -1,603 +0,0 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License found in the LICENSE file in the root
* directory of this source tree.
*/
//! Asynchronous Memoizing Cache
//!
//! This crate implements a cache to memoize results calculated by some async process.
//!
//! The primary access method is `Asyncmemo::get()`. If the result has been previously calculated
//! and cached, then the result is directly returned from the cache.
//!
//! Otherwise an implementation of `Filler` which produces new results. Since the process
//! constructing the result is async, each query to fetch the result will poll the process
//! and will update the cache when it finishes.
//!
//! If a value is requested from the cache while the value is being computed, then that task
//! will be added to a notification list and will be woken when the computation changes state
//! (either completes, fails, or makes progress in its async state machine).
//!
//! The `fill()` function returns an instance of `Future`, and as such can fail. In that case
//! no result will be cached, and the error will be returned to the task that's currently
//! calling that future's `poll()`. Other tasks waiting will be woken, but they'll simply get
//! cache miss and will attempt to compute the result again. There is no negative caching
//! or rate limiting, so if process is prone to failure then it can "succeed" but return a
//! sentinel value representing the failure which the application can handle with its own logic.
//!
//! TODO: add interface to allow multiple implementations of the underlying cache, to allow
//! eviction and other policies to be controlled.
//!
//! TODO: entry invalidation interface
#![deny(warnings)]
use std::collections::hash_map::DefaultHasher;
use std::fmt::{self, Debug};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::thread;
use std::usize;
use futures::future::{IntoFuture, Shared, SharedError, SharedItem};
use futures::{Async, Future, Poll};
use parking_lot::Mutex;
use stats::prelude::*;
use futures_ext::BoxFuture;
#[cfg(test)]
mod test;
mod boundedhash;
mod weight;
use crate::boundedhash::BoundedHash;
pub use crate::weight::Weight;
define_stats! {
prefix = "asyncmemo";
memo_futures_estimate: dynamic_timeseries(
"memo_futures_estimate.{}", (tag: &'static str); AVG),
total_weight: dynamic_timeseries(
"per_shard.total_weight.{}", (tag: &'static str); AVG),
entry_num: dynamic_timeseries(
"per_shard.entry_num.{}", (tag: &'static str); AVG),
}
const SHARD_NUM: usize = 1000;
/// Asynchronous memoizing cache for async processes
///
/// The cache requires an instance of an implementation of the `Filler` trait
/// to generate new results.
pub struct Asyncmemo<F>
where
F: Filler,
F::Key: Eq + Hash,
{
stats_tag: &'static str,
inner: Arc<AsyncmemoInner<F>>,
}
impl<F> Debug for Asyncmemo<F>
where
F: Filler,
F::Key: Eq + Hash + Debug,
<<F as Filler>::Value as IntoFuture>::Future: Debug,
<<F as Filler>::Value as IntoFuture>::Item: Debug,
<<F as Filler>::Value as IntoFuture>::Error: Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Asyncmemo")
.field("stats_tag", &self.stats_tag)
.field("inner", &self.inner)
.finish()
}
}
/// Generate a result for the cache.
///
/// The function implemented by `fill()` should be referentially transparent - the output
/// should only depend on the value of the `key` parameter.
/// It may fail - the failure will be propagated to one of the callers, and the result won't
/// be cached.
pub trait Filler: Sized {
type Key: Eq + Hash;
type Value: IntoFuture + 'static;
fn fill(&self, cache: &Asyncmemo<Self>, key: &Self::Key) -> Self::Value;
}
type FillerSlot<F> = Slot<
<<<F as Filler>::Value as IntoFuture>::Future as Future>::Item,
<<<F as Filler>::Value as IntoFuture>::Future as Future>::Error,
>;
// We really want a type bound on F, but currently that emits an annoying E0122 warning
type CacheHash<F> = BoundedHash<<F as Filler>::Key, FillerSlot<F>>;
struct AsyncmemoInner<F>
where
F: Filler,
F::Key: Eq + Hash,
{
hash_vec: Vec<Mutex<CacheHash<F>>>,
filler: F,
}
impl<F> Debug for AsyncmemoInner<F>
where
F: Filler,
F::Key: Eq + Hash + Debug,
<<F as Filler>::Value as IntoFuture>::Future: Debug,
<<F as Filler>::Value as IntoFuture>::Error: Debug,
<<F as Filler>::Value as IntoFuture>::Item: Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut fmt_struct = fmt.debug_struct("AsyncmemoInner");
for (idx, hash) in self.hash_vec.iter().enumerate() {
let hash = hash.lock();
fmt_struct.field(&format!("hash_vec[{}]", idx), &*hash);
}
fmt_struct.finish()
}
}
// User-supplied future is wrapped into SharedAsyncmemoFuture. With that the internal future can
// be polled by many MemoFutures at once. Note that error type of the Shared future is SharedError.
// This type derefs to the underlying error, but not all underlying errors implement clone
// (for example, failure Error is not cloneable).
// That means that we have a few options: return SharedError to a user (undesirable) or use some
// hacks to overcome this restriction. We've chosen the second option - see SharedAsyncmemoError
// below.
struct SharedAsyncmemoFuture<Item, Error> {
// Future can only be None when it's dropped (see Drop implementation)
future: Option<Shared<BoxFuture<Item, SharedAsyncmemoError<Error>>>>,
}
impl<Item, Error> SharedAsyncmemoFuture<Item, Error> {
fn new(future: Shared<BoxFuture<Item, SharedAsyncmemoError<Error>>>) -> Self {
SharedAsyncmemoFuture {
future: Some(future),
}
}
}
impl<Item, Error> Future for SharedAsyncmemoFuture<Item, Error> {
type Item = SharedItem<Item>;
type Error = SharedError<SharedAsyncmemoError<Error>>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.future {
Some(ref mut future) => future.poll(),
None => panic!("unexpected state"),
}
}
}
impl<Item, Error> Clone for SharedAsyncmemoFuture<Item, Error> {
fn clone(&self) -> Self {
match self.future {
Some(ref future) => SharedAsyncmemoFuture::new(future.clone()),
None => panic!("unexpected state"),
}
}
}
impl<Item, Error> Drop for SharedAsyncmemoFuture<Item, Error> {
fn drop(&mut self) {
if thread::panicking() {
// Shared future grabs a lock during the Drop. The lock is poisoned when thread
// panics, and that aborts the process. In turn that causes #[should_panic] tests
// to fail. Workaround the problem by forgetting shared future if the thread is
// panicking.
let future = self.future.take().unwrap();
std::mem::forget(future);
}
}
}
// Asyncmemo doesn't do negative caching. So the first MemoFuture that polls the underlying errored
// SharedFuture grabs the lock and replaces Some(err) with None. This first future then returns
// error to the user, but others user Filler to start new Future instead.
type SharedAsyncmemoError<Error> = Mutex<Option<Error>>;
// Result of polling SharedAsyncmemoFuture: either it returns normal poll result
// (i.e. Ready, NotReady, Err), or the fact that the error was already processed by another future
enum SharedAsyncmemoFuturePoll<Item, Error> {
PollResult(Poll<Item, Error>),
MovedError,
}
fn wrap_filler_future<Fut: Future + Send + 'static>(
fut: Fut,
) -> SharedAsyncmemoFuture<<Fut as Future>::Item, <Fut as Future>::Error> {
let fut: BoxFuture<<Fut as Future>::Item, SharedAsyncmemoError<<Fut as Future>::Error>> =
Box::new(fut.map_err(|err| Mutex::new(Some(err))));
SharedAsyncmemoFuture::new(fut.shared())
}
enum Slot<Item, Error> {
Waiting(SharedAsyncmemoFuture<Item, Error>), // waiting for entry to become available
Complete(Item), // got value
}
impl<Item, Error> Debug for Slot<Item, Error> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
&Slot::Waiting(..) => fmt.write_str("Waiting"),
&Slot::Complete(..) => fmt.write_str("Complete"),
}
}
}
impl<Item, Error> Weight for Slot<Item, Error>
where
Item: Weight,
{
fn get_weight(&self) -> usize {
match self {
&Slot::Waiting(..) => 0,
&Slot::Complete(ref v) => v.get_weight(),
}
}
}
/// Pending result from a cache lookup
pub struct MemoFuture<F>
where
F: Filler,
F::Key: Eq + Hash,
{
cache: Asyncmemo<F>,
key: F::Key,
internal_future: Option<
SharedAsyncmemoFuture<
<<F::Value as IntoFuture>::Future as Future>::Item,
<<F::Value as IntoFuture>::Future as Future>::Error,
>,
>,
}
impl<F> Debug for MemoFuture<F>
where
F: Filler,
F::Key: Eq + Hash + Debug,
<<F as Filler>::Value as IntoFuture>::Future: Debug,
<<F as Filler>::Value as IntoFuture>::Item: Debug,
<<F as Filler>::Value as IntoFuture>::Error: Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("MemoFuture")
.field("cache", &self.cache)
.field("key", &self.key)
.finish()
}
}
impl<F> MemoFuture<F>
where
F: Filler,
F::Key: Eq + Hash + Weight + Clone,
<F::Value as IntoFuture>::Future: Send,
<F::Value as IntoFuture>::Item: Weight + Clone,
{
// Return the current state of a slot, if present
fn slot_present(&self) -> Option<FillerSlot<F>> {
let mut hash = self.cache.inner.hash_vec[self.cache.get_shard(&self.key)].lock();
self.report_stats(&*hash);
if let Some(entry) = hash.get_mut(&self.key) {
match entry {
// straightforward cache hit
&mut Slot::Complete(ref val) => return Some(Slot::Complete(val.clone())),
// In-flight future
&mut Slot::Waiting(ref fut) => return Some(Slot::Waiting(fut.clone())),
}
}
None
}
fn slot_remove(&self) {
let mut hash = self.cache.inner.hash_vec[self.cache.get_shard(&self.key)].lock();
let _ = hash.remove(&self.key);
self.report_stats(&*hash);
}
fn slot_insert(&self, slot: FillerSlot<F>) {
let mut hash = self.cache.inner.hash_vec[self.cache.get_shard(&self.key)].lock();
match hash.insert(self.key.clone(), slot) {
Err((_k, _v)) => {
// failed to insert for capacity reasons; remove entry we're not going to use
// XXX retry once?
hash.remove(&self.key);
}
Ok(Some(val @ Slot::Complete(_))) => {
// If we just kicked out a complete value, put it back, since at best
// we're replacing a complete value with another one (which should be
// identical), but at worst we could be making it regress. This could only
// happen if in-progress slot got evicted and the computation restarted.
let _ = hash.insert(self.key.clone(), val);
// trim cache if that made it oversized
hash.trim();
}
Ok(Some(_)) | Ok(None) => (), // nothing (interesting) there
}
self.report_stats(&*hash);
}
fn report_stats(&self, hash: &CacheHash<F>) {
STATS::memo_futures_estimate.add_value(
Arc::strong_count(&self.cache.inner) as i64,
(self.cache.stats_tag,),
);
STATS::total_weight.add_value(hash.total_weight() as i64, (self.cache.stats_tag,));
STATS::entry_num.add_value(hash.len() as i64, (self.cache.stats_tag,));
}
fn poll_real_future(
&mut self,
mut real_future: SharedAsyncmemoFuture<<Self as Future>::Item, <Self as Future>::Error>,
) -> SharedAsyncmemoFuturePoll<<Self as Future>::Item, <Self as Future>::Error> {
match real_future.poll() {
Err(err) => {
self.slot_remove();
match err.lock().take() {
Some(err) => SharedAsyncmemoFuturePoll::PollResult(Err(err)),
None => SharedAsyncmemoFuturePoll::MovedError,
}
}
Ok(Async::NotReady) => {
self.slot_insert(Slot::Waiting(real_future.clone()));
self.internal_future = Some(real_future);
SharedAsyncmemoFuturePoll::PollResult(Ok(Async::NotReady))
}
Ok(Async::Ready(val)) => {
self.slot_insert(Slot::Complete((*val).clone()));
SharedAsyncmemoFuturePoll::PollResult(Ok(Async::Ready((*val).clone())))
}
}
}
fn handle(&mut self) -> Poll<<Self as Future>::Item, <Self as Future>::Error> {
// This is a 3-step process:
// 1) Poll internal future if it is present. Internal future is present only if we have
// polled this MemoFuture before. Continue if the error was already moved away.
// 2) Search for the future in the cache and poll. Continue if we can't find it or if the
// error was moved away.
// 3) Get a future from the filler and poll it. Note that in that case error can't be
// moved away, because this future is not shared with any other MemoFuture.
let internal_future = self.internal_future.take();
if let Some(internal_future) = internal_future {
if let SharedAsyncmemoFuturePoll::PollResult(poll) =
self.poll_real_future(internal_future)
{
return poll;
}
// There was an Error, but another future has already replaced it with None.
// In that case we want to start the future again.
}
// First check to see if we already have a slot for this key and process it accordingly.
match self.slot_present() {
None => (), // nothing there for this key
Some(Slot::Complete(v)) => return Ok(Async::Ready(v)),
Some(Slot::Waiting(fut)) => {
if let SharedAsyncmemoFuturePoll::PollResult(poll) = self.poll_real_future(fut) {
return poll;
}
// There was an Error, but another future has already replaced it with None.
// In that case we want to start the future again.
}
};
// Slot was not present, but we have a placeholder now. Construct the Future and
// start running it.
let filler = self
.cache
.inner
.filler
.fill(&self.cache, &self.key)
.into_future();
let fut = wrap_filler_future(filler);
if let SharedAsyncmemoFuturePoll::PollResult(poll) = self.poll_real_future(fut) {
return poll;
}
panic!("internal error: just created future's error was already removed");
}
}
impl<F> Future for MemoFuture<F>
where
F: Filler,
F::Key: Eq + Hash + Weight + Clone,
<F::Value as IntoFuture>::Future: Send,
<F::Value as IntoFuture>::Item: Weight + Clone,
{
type Item = <F::Value as IntoFuture>::Item;
type Error = <F::Value as IntoFuture>::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.handle()
}
}
impl<F> Asyncmemo<F>
where
F: Filler,
F::Key: Hash + Eq + Weight,
<F::Value as IntoFuture>::Item: Weight,
{
/// Construct a new bounded cache. It enforces two distinct limits:
/// - entrylimit - the max number of entries
/// - weightlimit - the max abstract "weight" of the entries (both keys and values)
///
/// Weight is typically memory use.
pub fn with_limits(
stats_tag: &'static str,
fill: F,
entrylimit: usize,
weightlimit: usize,
) -> Self {
Self::with_limits_and_shards(stats_tag, fill, entrylimit, weightlimit, SHARD_NUM)
}
fn with_limits_and_shards(
stats_tag: &'static str,
fill: F,
entrylimit: usize,
weightlimit: usize,
shards: usize,
) -> Self {
assert!(entrylimit > 0);
assert!(weightlimit > 0);
let hash_vec = {
let entrylimit = entrylimit / shards;
let weightlimit = weightlimit / shards;
let mut hash_vec = Vec::new();
for _ in 0..shards {
hash_vec.push(Mutex::new(BoundedHash::new(entrylimit, weightlimit)))
}
hash_vec
};
let inner = AsyncmemoInner {
hash_vec,
filler: fill,
};
Asyncmemo {
stats_tag,
inner: Arc::new(inner),
}
}
/// Construct an unbounded cache.
///
/// This is pretty dangerous for any non-toy use.
pub fn new_unbounded(stats_tag: &'static str, fill: F, shards: usize) -> Self {
Self::with_limits_and_shards(stats_tag, fill, usize::MAX, usize::MAX, shards)
}
/// Look up a result for a particular key/arg
///
/// The future will either complete immediately if the result is already
/// known, or will wait for an in-progress result (perhaps failing), or
/// initiate new process to generate a result.
///
/// This only caches successful results - it does not cache errors as a
/// negative cache.
pub fn get<K: Into<F::Key>>(&self, key: K) -> MemoFuture<F> {
MemoFuture {
cache: self.clone(),
key: key.into(),
internal_future: None,
}
}
/// Check to see if we have a cached result already, and thus that
/// get will return quickly.
///
/// Be wary of time-of-check to time-of-use changes:
/// `if key_present_in_cache(key) { get(key) }` is an anti-pattern, as the key can be
/// evicted before the `get`, and there could be a fetch in progress that will make
/// `get` fast.
pub fn key_present_in_cache<K: Into<F::Key>>(&self, key: K) -> bool {
let key = key.into();
let mut locked = self.inner.hash_vec[self.get_shard(&key)].lock();
match locked.get_mut(&key) {
Some(Slot::Complete(_)) => true,
_ => false,
}
}
/// Invalidate a specific key
pub fn invalidate<K: Into<F::Key>>(&self, key: K) {
let key = key.into();
let mut locked = self.inner.hash_vec[self.get_shard(&key)].lock();
let _ = locked.remove(&key);
}
/// Reset the cache. This removes all results (complete and in-progress) from the cache.
/// This drops the futures of in-progress entries, which should propagate cancellation
/// if necessary.
pub fn clear(&self) {
for hash in &self.inner.hash_vec {
let mut locked = hash.lock();
locked.clear()
}
}
/// Trim cache size to limits.
pub fn trim(&self) {
for hash in &self.inner.hash_vec {
let mut locked = hash.lock();
locked.trim_entries(0);
locked.trim_weight(0);
}
}
/// Return number of entries in cache.
pub fn len(&self) -> usize {
let mut len = 0;
for hash in &self.inner.hash_vec {
let hash = hash.lock();
len += hash.len();
}
len
}
/// Return current "weight" of the cache entries
pub fn total_weight(&self) -> usize {
let mut total = 0;
for hash in &self.inner.hash_vec {
let hash = hash.lock();
total += hash.total_weight()
}
total
}
/// Return true if cache is empty.
pub fn is_empty(&self) -> bool {
let mut is_empty = true;
for hash in &self.inner.hash_vec {
let hash = hash.lock();
is_empty = is_empty && hash.is_empty();
}
is_empty
}
fn get_shard<K: Hash>(&self, key: &K) -> usize {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
(hasher.finish() % (self.inner.hash_vec.len() as u64)) as usize
}
}
impl<F> Clone for Asyncmemo<F>
where
F: Filler,
F::Key: Eq + Hash,
{
fn clone(&self) -> Self {
Asyncmemo {
stats_tag: self.stats_tag.clone(),
inner: self.inner.clone(),
}
}
}

View File

@ -1,913 +0,0 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License found in the LICENSE file in the root
* directory of this source tree.
*/
use super::*;
use futures::executor::{spawn, Notify, NotifyHandle, Spawn};
use futures_ext::FutureExt;
use std::cell::RefCell;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
use std::usize;
// A simple operation, also keep track of the number of times invoked
struct Upperer<'a>(&'a AtomicUsize);
impl<'a> Filler for Upperer<'a> {
type Key = String;
type Value = Result<String, ()>;
fn fill(&self, _cache: &Asyncmemo<Self>, key: &Self::Key) -> Self::Value {
self.0.fetch_add(1, Ordering::Relaxed);
Ok(key.to_uppercase())
}
}
#[test]
fn simple() {
let count = AtomicUsize::new(0);
let c = Asyncmemo::new_unbounded("test", Upperer(&count), 1);
assert!(c.is_empty());
assert_eq!(c.len(), 0);
assert_eq!(count.load(Ordering::Relaxed), 0);
let v = c.get("foo").wait().unwrap();
assert_eq!(v, "FOO");
assert_eq!(count.load(Ordering::Relaxed), 1);
assert!(!c.is_empty());
assert_eq!(c.len(), 1);
let v = c.get("foo").wait().unwrap();
assert_eq!(v, "FOO");
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(c.len(), 1);
let v = c.get("bar").wait().unwrap();
assert_eq!(v, "BAR");
assert_eq!(count.load(Ordering::Relaxed), 2);
assert_eq!(c.len(), 2);
}
#[test]
fn clear() {
let count = AtomicUsize::new(0);
let c = Asyncmemo::new_unbounded("test", Upperer(&count), 1);
assert!(c.is_empty());
assert_eq!(c.len(), 0);
assert_eq!(count.load(Ordering::Relaxed), 0);
let v = c.get("foo").wait().unwrap();
assert_eq!(v, "FOO");
assert_eq!(count.load(Ordering::Relaxed), 1);
assert!(!c.is_empty());
assert_eq!(c.len(), 1);
c.clear();
assert!(c.is_empty());
assert_eq!(c.len(), 0);
let v = c.get("foo").wait().unwrap();
assert_eq!(v, "FOO");
assert_eq!(count.load(Ordering::Relaxed), 2);
assert!(!c.is_empty());
assert_eq!(c.len(), 1);
}
#[test]
fn size_limit() {
let count = AtomicUsize::new(0);
let c = Asyncmemo::with_limits_and_shards("test", Upperer(&count), 3, usize::MAX, 1);
assert_eq!(c.len(), 0);
let v1 = c.get("hello").wait().unwrap();
assert_eq!(v1, "HELLO", "c={:#?}", c);
assert_eq!(c.len(), 1, "c={:#?}", c);
let v2 = c.get("goodbye").wait().unwrap();
assert_eq!(v2, "GOODBYE", "c={:#?}", c);
assert_eq!(c.len(), 2, "c={:#?}", c);
let v3 = c.get("world").wait().unwrap();
assert_eq!(v3, "WORLD", "c={:#?}", c);
assert_eq!(c.len(), 3, "c={:#?}", c);
let v4 = c.get("ungulate").wait().unwrap();
assert_eq!(v4, "UNGULATE", "c={:#?}", c);
assert_eq!(c.len(), 3, "c={:#?}", c);
}
#[test]
fn weight_limit_simple() {
let count = AtomicUsize::new(0);
let c = Asyncmemo::with_limits_and_shards("test", Upperer(&count), 3, 100, 1);
assert_eq!(c.len(), 0);
let v1 = c.get("hello").wait().unwrap();
assert_eq!(v1, "HELLO", "c={:#?}", c);
assert_eq!(c.len(), 1, "c={:#?}", c);
// Note - this test can fail if "HELLO" was allocated differently
// inside asyncmemo or in the test. If that the case, then fix the test or disable it.
let expected_weight = String::from("HELLO").get_weight();
assert_eq!(c.total_weight(), expected_weight, "c={:#?}", c);
}
#[test]
fn weight_limit_eviction() {
let count = AtomicUsize::new(0);
let c = Asyncmemo::with_limits_and_shards("test", Upperer(&count), 1, usize::MAX, 1);
assert_eq!(c.len(), 0);
let v1 = c.get("hello").wait().unwrap();
assert_eq!(v1, "HELLO", "c={:#?}", c);
assert_eq!(c.len(), 1, "c={:#?}", c);
let expected_weight = String::from("HELLO").get_weight();
assert_eq!(c.total_weight(), expected_weight, "c={:#?}", c);
let v1 = c.get("hell").wait().unwrap();
assert_eq!(v1, "HELL", "c={:#?}", c);
assert_eq!(c.len(), 1, "c={:#?}", c);
let expected_weight = String::from("HELL").get_weight();
assert_eq!(c.total_weight(), expected_weight, "c={:#?}", c);
}
#[derive(Debug)]
struct Delay<V> {
remains: usize,
v: Option<Result<V, ()>>,
}
impl<V> Future for Delay<V> {
type Item = V;
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.remains == 0 {
match self.v.take() {
None => Err(()),
Some(Ok(v)) => Ok(Async::Ready(v)),
Some(Err(e)) => Err(e),
}
} else {
self.remains -= 1;
Ok(Async::NotReady)
}
}
}
#[derive(Debug)]
struct Delayed<'a>(&'a AtomicUsize, usize);
impl<'a> Filler for Delayed<'a> {
type Key = String;
type Value = Delay<String>;
fn fill(&self, _cache: &Asyncmemo<Self>, key: &Self::Key) -> Self::Value {
self.0.fetch_add(1, Ordering::Relaxed);
Delay {
remains: self.1,
v: Some(Ok(key.to_uppercase())),
}
}
}
struct DummyNotify {}
impl Notify for DummyNotify {
fn notify(&self, _id: usize) {}
}
#[test]
fn delayed() {
let count = AtomicUsize::new(0);
let c = Asyncmemo::new_unbounded("test", Delayed(&count, 5), 1);
let notify_handle = NotifyHandle::from(Arc::new(DummyNotify {}));
let dummy_id = 0;
assert!(c.is_empty());
assert_eq!(c.len(), 0);
assert_eq!(count.load(Ordering::Relaxed), 0);
let mut v = spawn(c.get("foo"));
assert_eq!(count.load(Ordering::Relaxed), 0);
assert_eq!(
v.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::NotReady),
"v={:#?}",
v
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
v.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::NotReady),
"v={:#?}",
v
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
v.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::NotReady),
"v={:#?}",
v
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
v.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::NotReady),
"v={:#?}",
v
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
v.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::NotReady),
"v={:#?}",
v
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
v.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::Ready("FOO".into())),
"v={:#?}",
v
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
v.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::Ready("FOO".into())),
"v={:#?}",
v
);
assert_eq!(count.load(Ordering::Relaxed), 1);
}
struct Fib(Arc<AtomicUsize>);
impl Filler for Fib {
type Key = u32;
type Value = BoxFuture<u32, ()>;
fn fill(&self, cache: &Asyncmemo<Self>, key: &u32) -> Self::Value {
self.0.fetch_add(1, Ordering::Relaxed);
let key = *key;
if key == 1 {
let f = Delay::<u32> {
remains: 1,
v: Some(Ok(1)),
};
Box::new(f) as BoxFuture<u32, ()>
} else {
let f = cache.get(key - 1).and_then(move |f| Delay {
remains: 1,
v: Some(Ok(key + f)),
});
Box::new(f) as BoxFuture<u32, ()>
}
}
}
#[test]
fn fibonacci() {
let count = Arc::new(AtomicUsize::new(0));
let c = Asyncmemo::new_unbounded("test", Fib(count.clone()), 1);
let notify_handle = NotifyHandle::from(Arc::new(DummyNotify {}));
let dummy_id = 0;
{
let mut fib = spawn(c.get(1u32));
assert_eq!(
fib.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::NotReady)
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
fib.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::Ready(1))
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
fib.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::Ready(1))
);
assert_eq!(count.load(Ordering::Relaxed), 1);
println!(
"1: fib.poll()={:?}",
fib.poll_future_notify(&notify_handle, dummy_id)
);
}
{
let mut fib = spawn(c.get(1u32));
assert_eq!(
fib.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::Ready(1))
);
assert_eq!(count.load(Ordering::Relaxed), 1);
println!(
"1: fib.poll()={:?}",
fib.poll_future_notify(&notify_handle, dummy_id)
);
}
{
let mut fib = spawn(c.get(2u32));
let res = fib.poll_future_notify(&notify_handle, dummy_id);
println!("2: fib.poll()={:?}", res);
assert_eq!(res, Ok(Async::NotReady));
assert_eq!(count.load(Ordering::Relaxed), 2);
let res = fib.poll_future_notify(&notify_handle, dummy_id);
println!("2: fib.poll()={:?}", res);
assert_eq!(res, Ok(Async::Ready(3)));
assert_eq!(count.load(Ordering::Relaxed), 2);
let res = fib.poll_future_notify(&notify_handle, dummy_id);
println!("2: fib.poll()={:?}", res);
assert_eq!(res, Ok(Async::Ready(3)));
assert_eq!(count.load(Ordering::Relaxed), 2);
}
{
println!("future 4");
let mut fib = spawn(c.get(4u32));
let res = fib.poll_future_notify(&notify_handle, dummy_id);
println!("4: fib.poll()={:?}", res);
assert_eq!(res, Ok(Async::NotReady));
assert_eq!(count.load(Ordering::Relaxed), 4);
let res = fib.poll_future_notify(&notify_handle, dummy_id);
println!("4: fib.poll()={:?}", res);
assert_eq!(res, Ok(Async::Ready(10)));
assert_eq!(count.load(Ordering::Relaxed), 4);
let res = fib.poll_future_notify(&notify_handle, dummy_id);
println!("4: fib.poll()={:?}", res);
assert_eq!(res, Ok(Async::Ready(10)));
assert_eq!(count.load(Ordering::Relaxed), 4);
}
}
#[derive(Debug)]
struct Fails<'a>(&'a AtomicUsize);
impl<'a> Filler for Fails<'a> {
type Key = String;
type Value = Delay<String>;
fn fill(&self, _cache: &Asyncmemo<Self>, _: &Self::Key) -> Self::Value {
self.0.fetch_add(1, Ordering::Relaxed);
Delay {
remains: 3,
v: Some(Err(())),
}
}
}
#[test]
fn failing() {
let count = AtomicUsize::new(0);
let c = Asyncmemo::new_unbounded("test", Fails(&count), 1);
let notify_handle = NotifyHandle::from(Arc::new(DummyNotify {}));
let dummy_id = 0;
assert!(c.is_empty());
assert_eq!(c.len(), 0);
assert_eq!(count.load(Ordering::Relaxed), 0);
let mut v = spawn(c.get("foo"));
assert_eq!(count.load(Ordering::Relaxed), 0);
assert_eq!(
v.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::NotReady),
"v={:#?}",
v
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
v.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::NotReady),
"v={:#?}",
v
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
v.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::NotReady),
"v={:#?}",
v
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
v.poll_future_notify(&notify_handle, dummy_id),
Err(()),
"v={:#?}",
v
);
assert_eq!(count.load(Ordering::Relaxed), 1);
// retry
assert_eq!(
v.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::NotReady),
"v={:#?}",
v
);
assert_eq!(count.load(Ordering::Relaxed), 2);
}
#[test]
fn multiwait() {
let count = AtomicUsize::new(0);
let c = Asyncmemo::new_unbounded("test", Delayed(&count, 5), 1);
let notify_handle = NotifyHandle::from(Arc::new(DummyNotify {}));
let dummy_id = 0;
assert!(c.is_empty());
assert_eq!(c.len(), 0);
let mut v1 = spawn(c.get("foo"));
assert_eq!(count.load(Ordering::Relaxed), 0);
let mut v2 = spawn(c.get("foo"));
assert_eq!(count.load(Ordering::Relaxed), 0);
// polling on either future advances the state machine until its complete
assert_eq!(
v1.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::NotReady),
"v={:#?}",
v1
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
v2.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::NotReady),
"v={:#?}",
v2
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
v1.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::NotReady),
"v={:#?}",
v1
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
v2.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::NotReady),
"v={:#?}",
v2
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
v1.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::NotReady),
"v={:#?}",
v1
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
v2.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::Ready("FOO".into())),
"v={:#?}",
v2
);
assert_eq!(count.load(Ordering::Relaxed), 1);
assert_eq!(
v1.poll_future_notify(&notify_handle, dummy_id),
Ok(Async::Ready("FOO".into())),
"v={:#?}",
v1
);
assert_eq!(count.load(Ordering::Relaxed), 1);
}
struct SimpleNotify {
pub was_notified: Mutex<bool>,
}
impl SimpleNotify {
fn new() -> Self {
SimpleNotify {
was_notified: Mutex::new(false),
}
}
}
impl Notify for SimpleNotify {
fn notify(&self, _id: usize) {
*self.was_notified.lock().unwrap() = true;
}
}
struct SpawnedFutureAndNotify<T> {
spawned: Spawn<T>,
simple_notify: Arc<SimpleNotify>,
notify_handle: NotifyHandle,
}
impl<T> SpawnedFutureAndNotify<T>
where
T: Future,
{
fn new(fut: T) -> Self {
let simple_notify = Arc::new(SimpleNotify::new());
SpawnedFutureAndNotify {
spawned: spawn(fut),
simple_notify: simple_notify.clone(),
notify_handle: NotifyHandle::from(simple_notify),
}
}
fn poll(&mut self) -> Poll<<T as Future>::Item, <T as Future>::Error> {
self.spawned.poll_future_notify(&self.notify_handle, 0)
}
fn was_notified(&self) -> bool {
*self.simple_notify.was_notified.lock().unwrap()
}
}
#[test]
fn timer_multiwait() {
let count = AtomicUsize::new(0);
let c = Asyncmemo::new_unbounded("test", Delayed(&count, 2), 1);
let mut v1 = SpawnedFutureAndNotify::new(c.get("foo"));
let mut v2 = SpawnedFutureAndNotify::new(c.get("foo"));
assert_eq!(v1.poll(), Ok(Async::NotReady));
assert_eq!(v2.poll(), Ok(Async::NotReady));
assert_eq!(v2.poll(), Ok(Async::Ready(String::from("FOO"))));
assert!(v1.was_notified());
}
struct Timered {
res: Vec<Result<String, ()>>,
cur_result: RefCell<usize>,
remains: usize,
}
impl Timered {
fn new(res: Vec<Result<String, ()>>, remains: usize) -> Self {
Timered {
res,
cur_result: RefCell::new(0),
remains,
}
}
}
impl Filler for Timered {
type Key = String;
type Value = Delay<String>;
fn fill(&self, _cache: &Asyncmemo<Self>, _key: &Self::Key) -> Self::Value {
let index = *self.cur_result.borrow();
let res = self.res.get(index).unwrap().clone();
*self.cur_result.borrow_mut() = index + 1;
Delay {
remains: self.remains,
v: Some(res),
}
}
}
#[test]
fn test_timer_future() {
let c = Asyncmemo::new_unbounded("test", Timered::new(vec![Ok("RES".into())], 2), 1);
let mut v1 = SpawnedFutureAndNotify::new(c.get("res"));
let mut v2 = SpawnedFutureAndNotify::new(c.get("res"));
assert_eq!(v1.poll(), Ok(Async::NotReady));
assert_eq!(v2.poll(), Ok(Async::NotReady));
assert_eq!(v1.poll(), Ok(Async::Ready(String::from("RES"))));
assert_eq!(v2.poll(), Ok(Async::Ready(String::from("RES"))));
assert!(v1.was_notified());
assert!(v2.was_notified());
}
#[test]
fn test_timer_future_many_futures() {
let c = Asyncmemo::new_unbounded("test", Timered::new(vec![Ok("RES".into())], 9), 1);
let mut futs: Vec<_> = (1..10)
.map(|_| {
let mut f = SpawnedFutureAndNotify::new(c.get("res"));
assert_eq!(f.poll(), Ok(Async::NotReady));
f
})
.collect();
assert_eq!(futs[0].poll(), Ok(Async::Ready(String::from("RES"))));
for f in futs {
assert!(f.was_notified());
}
}
#[test]
fn test_drop() {
{
let c = Asyncmemo::new_unbounded("test", Timered::new(vec![Ok("RES".into())], 2), 2);
let mut v1 = SpawnedFutureAndNotify::new(c.get("res"));
let mut v2 = SpawnedFutureAndNotify::new(c.get("res"));
assert_eq!(v1.poll(), Ok(Async::NotReady));
assert_eq!(v2.poll(), Ok(Async::NotReady));
std::mem::drop(v1);
assert_eq!(v2.poll(), Ok(Async::Ready(String::from("RES"))));
assert!(v2.was_notified());
}
{
// Vice-versa: drop the future that was polled second, check that first was notified
let c = Asyncmemo::new_unbounded("test", Timered::new(vec![Ok("RES".into())], 2), 1);
let mut v1 = SpawnedFutureAndNotify::new(c.get("res"));
let mut v2 = SpawnedFutureAndNotify::new(c.get("res"));
assert_eq!(v1.poll(), Ok(Async::NotReady));
assert_eq!(v2.poll(), Ok(Async::NotReady));
std::mem::drop(v2);
assert_eq!(v1.poll(), Ok(Async::Ready(String::from("RES"))));
assert!(v1.was_notified());
}
}
#[test]
fn test_poll_after_sporadic_failure() {
let c = Asyncmemo::new_unbounded("test", Timered::new(vec![Err(()), Ok("RES".into())], 2), 1);
let mut v1 = SpawnedFutureAndNotify::new(c.get("res"));
let mut v2 = SpawnedFutureAndNotify::new(c.get("res"));
assert_eq!(v1.poll(), Ok(Async::NotReady));
assert_eq!(v2.poll(), Ok(Async::NotReady));
// Sporadic error has failed, polling second future should succeed
assert_eq!(v1.poll(), Err(()));
assert!(v1.was_notified());
assert!(v2.was_notified());
assert_eq!(v2.poll(), Ok(Async::NotReady));
assert_eq!(v2.poll(), Ok(Async::NotReady));
assert_eq!(v2.poll(), Ok(Async::Ready("RES".into())));
}
struct SlowPollUpperrer {
res: Result<String, String>,
}
impl SlowPollUpperrer {
fn new(res: Result<String, String>) -> Self {
SlowPollUpperrer { res }
}
}
impl Filler for SlowPollUpperrer {
type Key = String;
type Value = futures_ext::BoxFuture<String, String>;
fn fill(&self, _cache: &Asyncmemo<Self>, _key: &Self::Key) -> Self::Value {
thread::sleep(Duration::from_millis(100));
return self.res.clone().into_future().boxify();
}
}
#[test]
fn slow_poll_success() {
// Two futures poll at roughly the same time.
// Poll is slow, so one future does the poll, another goes to the Polling state.
// Make sure that second future is woken up after the first one succeed
let c = Asyncmemo::new_unbounded("test", SlowPollUpperrer::new(Ok("RES".into())), 1);
let t1 = thread::spawn({
let c = c.clone();
move || {
let fut = c.get("res");
assert!(spawn(fut).wait_future().is_ok());
}
});
let t2 = thread::spawn({
let c = c.clone();
move || {
let fut = c.get("res");
assert!(spawn(fut).wait_future().is_ok());
}
});
t1.join().unwrap();
t2.join().unwrap();
}
#[test]
fn slow_poll_err() {
// Two futures poll at roughly the same time.
// Poll is slow, so one future does the poll, another goes to the Polling state.
// Make sure that second future is woken up after the first one errored
let c = Asyncmemo::new_unbounded("test", SlowPollUpperrer::new(Err("RES".into())), 1);
fn assert_send<T: Send>(_t: &T) {}
assert_send(&c);
let t1 = thread::spawn({
let c = c.clone();
move || {
let fut = c.get("res");
assert!(spawn(fut).wait_future().is_err());
}
});
let t2 = thread::spawn({
let c = c.clone();
move || {
let fut = c.get("res");
assert!(spawn(fut).wait_future().is_err());
}
});
t1.join().unwrap();
t2.join().unwrap();
}
struct SignalBasedUpperrer {
res: Result<String, String>,
signal: Arc<AtomicBool>,
}
impl SignalBasedUpperrer {
fn new(res: Result<String, String>, signal: Arc<AtomicBool>) -> Self {
SignalBasedUpperrer { res, signal }
}
}
impl Filler for SignalBasedUpperrer {
type Key = String;
type Value = futures_ext::BoxFuture<String, String>;
fn fill(&self, _cache: &Asyncmemo<Self>, key: &Self::Key) -> Self::Value {
if !key.starts_with("skipwaiting") {
loop {
if self.signal.load(Ordering::SeqCst) {
break;
}
}
}
return self.res.clone().into_future().boxify();
}
}
#[test]
fn slow_poll_invalidate() {
let signal = Arc::new(AtomicBool::new(false));
let c = Asyncmemo::new_unbounded(
"test",
SignalBasedUpperrer::new(Ok("RES".into()), signal.clone()),
1,
);
let t1 = thread::spawn({
let c = c.clone();
move || {
let fut = c.get("res");
assert!(spawn(fut).wait_future().is_ok());
}
});
let t2 = thread::spawn({
let c = c.clone();
move || {
let fut = c.get("res");
assert!(spawn(fut).wait_future().is_ok());
}
});
// Make sure one future is in Polling, another future Polls.
thread::sleep(Duration::from_millis(50));
c.invalidate("res");
assert_eq!(c.len(), 0);
// Allow futures to proceed
signal.store(true, Ordering::SeqCst);
t1.join().unwrap();
t2.join().unwrap();
}
#[test]
fn slow_clear_invalidate() {
let signal = Arc::new(AtomicBool::new(false));
let c = Asyncmemo::new_unbounded(
"test",
SignalBasedUpperrer::new(Ok("RES".into()), signal.clone()),
1,
);
let t1 = thread::spawn({
let c = c.clone();
move || {
let fut = c.get("res");
assert!(spawn(fut).wait_future().is_ok());
}
});
let t2 = thread::spawn({
let c = c.clone();
move || {
let fut = c.get("res");
assert!(spawn(fut).wait_future().is_ok());
}
});
// Make sure one future is in Polling, another future Polls.
thread::sleep(Duration::from_millis(50));
c.clear();
assert_eq!(c.len(), 0);
// Allow futures to proceed
signal.store(true, Ordering::SeqCst);
t1.join().unwrap();
t2.join().unwrap();
}
#[test]
fn polling_hash_trimming() {
// Spawn two futures for the same key - one will be in Polling state.
// Spawn one more future that skips waiting on the signal and evicts the previous futures
// from the cache. Make sure nothing is deadlocked.
let signal = Arc::new(AtomicBool::new(false));
let longskipkey = String::from("skipwaiting");
let c = Asyncmemo::with_limits_and_shards(
"test",
SignalBasedUpperrer::new(Ok("RES".into()), signal.clone()),
1,
// Make space exactly for one future and it's result
longskipkey.get_weight() + String::from("RES").get_weight(),
1,
);
let t1 = thread::spawn({
let c = c.clone();
move || {
let fut = c.get("res");
assert!(spawn(fut).wait_future().is_ok());
}
});
let t2 = thread::spawn({
let c = c.clone();
move || {
let fut = c.get("res");
assert!(spawn(fut).wait_future().is_ok());
}
});
// Make sure one future is in Polling state, another future polls.
thread::sleep(Duration::from_millis(50));
// Evict "res" future
let fut = c.get(longskipkey);
assert!(spawn(fut).wait_future().is_ok());
assert_eq!(c.len(), 1);
// Allow futures to proceed
signal.store(true, Ordering::SeqCst);
t1.join().unwrap();
t2.join().unwrap();
}

View File

@ -1,98 +0,0 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License found in the LICENSE file in the root
* directory of this source tree.
*/
use bytes::Bytes;
use std::mem;
use heapsize::HeapSizeOf;
/// Return the "weight" of a type.
///
/// This is an abstract value which can mean anything, but it typically
/// relates to memory consumption. The expectation is that calling `get_weight()`
/// fairly cheap - ideally O(1).
pub trait Weight {
fn get_weight(&self) -> usize;
}
impl Weight for String {
#[inline]
fn get_weight(&self) -> usize {
mem::size_of::<Self>() + self.heap_size_of_children()
}
}
impl Weight for u32 {
#[inline]
fn get_weight(&self) -> usize {
mem::size_of::<Self>()
}
}
impl Weight for u64 {
#[inline]
fn get_weight(&self) -> usize {
mem::size_of::<Self>()
}
}
impl Weight for i32 {
#[inline]
fn get_weight(&self) -> usize {
mem::size_of::<Self>()
}
}
impl Weight for i64 {
#[inline]
fn get_weight(&self) -> usize {
mem::size_of::<Self>()
}
}
impl Weight for Bytes {
#[inline]
fn get_weight(&self) -> usize {
self.len()
}
}
impl<A, B> Weight for (A, B)
where
A: Weight,
B: Weight,
{
#[inline]
fn get_weight(&self) -> usize {
self.0.get_weight() + self.1.get_weight()
}
}
impl<A, B, C> Weight for (A, B, C)
where
A: Weight,
B: Weight,
C: Weight,
{
#[inline]
fn get_weight(&self) -> usize {
self.0.get_weight() + self.1.get_weight() + self.2.get_weight()
}
}
impl<A> Weight for Option<A>
where
A: Weight,
{
#[inline]
fn get_weight(&self) -> usize {
let inner_size = self.as_ref().map(Weight::get_weight).unwrap_or(0);
mem::size_of::<Self>() - mem::size_of::<A>() + inner_size
}
}

View File

@ -1,57 +0,0 @@
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License found in the LICENSE file in the root
# directory of this source tree.
'''Utilities for dealing with tar files.'''
import os
import tarfile
from typing import List
def from_dirs(dirs: List[str], tar_filename: str):
'''Create a .tar.gz file from a given list of directories.'''
with tarfile.open(tar_filename, 'w:gz') as tar:
for dir in dirs:
tar.add(dir, arcname=os.path.basename(dir))
def extractall_safe(path: str, dest: str) -> str:
'''Extract a tar safely. Raise an exception if there are any bad paths.
Returns the subdirectory the files were extracted to.'''
tar = tarfile.open(path)
firstdir = _check_members(path, tar)
tar.extractall(dest)
return os.path.join(dest, firstdir)
def _check_members(path: str, tar: tarfile.TarFile) -> str:
firstdir = None
for finfo in tar:
if _badpath(finfo.name):
raise RuntimeError('{} has bad path: {}'.format(path, finfo.name))
if firstdir is None:
firstdir = _firstdir(finfo.name)
elif firstdir != _firstdir(finfo.name):
raise RuntimeError(
'{}: expected path {} to begin with {}'.
format(path, finfo.name, firstdir)
)
if firstdir is None:
raise RuntimeError('{}: empty tar file'.format(path))
return firstdir
def _badpath(path: str) -> bool:
return path.startswith('..') or path.startswith('/')
def _firstdir(rel_path: str) -> str:
return rel_path.split('/', 1)[0]