diff --git a/bookmarks/filebookmarks/src/lib.rs b/bookmarks/filebookmarks/src/lib.rs index 22bace7678..a61d16c782 100644 --- a/bookmarks/filebookmarks/src/lib.rs +++ b/bookmarks/filebookmarks/src/lib.rs @@ -8,53 +8,38 @@ extern crate bookmarks; -extern crate bincode; #[macro_use] extern crate error_chain; extern crate futures; extern crate futures_cpupool; -extern crate nix; extern crate percent_encoding; -extern crate rand; extern crate serde; #[cfg(test)] extern crate tempdir; +extern crate filekv; extern crate futures_ext; extern crate storage_types; -use std::collections::HashMap; -use std::fs::{self, File, OpenOptions}; -use std::io::{self, SeekFrom}; -use std::io::prelude::*; -use std::marker::PhantomData; -use std::os::unix::io::AsRawFd; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::str; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; -use bincode::{deserialize, serialize, Infinite}; -use futures::{Async, Poll}; -use futures::future::{poll_fn, Future, IntoFuture}; -use futures::stream::{self, Stream}; +use futures::{Future, Stream}; use futures_cpupool::CpuPool; -use nix::fcntl::{self, FlockArg}; -use nix::sys::stat; use percent_encoding::{percent_decode, percent_encode, DEFAULT_ENCODE_SET}; use serde::Serialize; use serde::de::DeserializeOwned; use bookmarks::{Bookmarks, BookmarksMut}; +use filekv::FileKV; use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt}; -use storage_types::{version_random, Version}; +use storage_types::Version; mod errors { - error_chain!{ - foreign_links { - Bincode(::bincode::Error); - De(::serde::de::value::Error); - Io(::std::io::Error); - Nix(::nix::Error); + error_chain! { + links { + FileKV(::filekv::Error, ::filekv::ErrorKind); } } } @@ -68,51 +53,45 @@ static PREFIX: &'static str = "bookmark:"; /// to a thread pool to avoid blocking the main thread. File accesses between these threads /// are synchronized by a global map of per-path locks. pub struct FileBookmarks { - base: PathBuf, - pool: Arc, - locks: Mutex>>>, - _marker: PhantomData, + kv: FileKV, } -impl FileBookmarks { +impl FileBookmarks +where + V: Send + Clone + Serialize + DeserializeOwned + 'static, +{ + #[inline] pub fn open>(path: P) -> Result { - Self::open_with_pool(path, Arc::new(CpuPool::new_num_cpus())) - } - - pub fn open_with_pool>(path: P, pool: Arc) -> Result { - if !path.as_ref().is_dir() { - bail!("'{}' is not a directory", path.as_ref().to_string_lossy()); - } - Ok(FileBookmarks { - base: path.as_ref().to_path_buf(), - pool: pool, - locks: Mutex::new(HashMap::new()), - _marker: PhantomData, + kv: FileKV::open(path, PREFIX)?, }) } + #[inline] + pub fn open_with_pool>(path: P, pool: Arc) -> Result { + Ok(FileBookmarks { + kv: FileKV::open_with_pool(path, PREFIX, pool)?, + }) + } + + #[inline] pub fn create>(path: P) -> Result { - Self::create_with_pool(path, Arc::new(CpuPool::new_num_cpus())) + Ok(FileBookmarks { + kv: FileKV::create(path, PREFIX)?, + }) } + #[inline] pub fn create_with_pool>(path: P, pool: Arc) -> Result { - let path = path.as_ref(); - fs::create_dir_all(path)?; - Self::open_with_pool(path, pool) + Ok(FileBookmarks { + kv: FileKV::create_with_pool(path, PREFIX, pool)?, + }) } +} - /// Return a Mutex protecting the path to the file corresponding to the given key. - /// Ensures that file accesses across multiple threads in the pool are syncrhonized. - fn get_path_mutex(&self, key: &AsRef<[u8]>) -> Result>> { - let key_string = percent_encode(key.as_ref(), DEFAULT_ENCODE_SET).to_string(); - let mut map = self.locks.lock().expect("Lock poisoned"); - let mutex = map.entry(key_string.clone()).or_insert_with(|| { - let path = self.base.join(format!("{}{}", PREFIX, key_string)); - Arc::new(Mutex::new(path)) - }); - Ok((*mutex).clone()) - } +#[inline] +fn encode_key(key: &AsRef<[u8]>) -> String { + percent_encode(key.as_ref(), DEFAULT_ENCODE_SET).to_string() } impl Bookmarks for FileBookmarks @@ -125,42 +104,17 @@ where type Get = BoxFuture, Self::Error>; type Keys = BoxStream, Self::Error>; + #[inline] fn get(&self, key: &AsRef<[u8]>) -> Self::Get { - let pool = self.pool.clone(); - self.get_path_mutex(key) - .into_future() - .and_then(move |mutex| { - let future = poll_fn(move || poll_get::(&mutex)); - pool.spawn(future) - }) - .boxify() + self.kv.get(encode_key(key)).from_err().boxify() } fn keys(&self) -> Self::Keys { - // XXX: This traversal of the directory entries is unsynchronized and depends on - // platform-specific behavior with respect to the underlying directory entries. - // As a result, concurrent writes from other threads may produce strange results here. - let names = fs::read_dir(&self.base).map(|entries| { - entries - .map(|result| { - result - .map_err(From::from) - .map(|entry| entry.file_name().to_string_lossy().into_owned()) - }) - .filter(|result| match result { - &Ok(ref name) => name.starts_with(PREFIX), - &Err(_) => true, - }) - .map(|result| { - result.and_then(|name| { - Ok(percent_decode(&name[PREFIX.len()..].as_bytes()).collect()) - }) - }) - }); - match names { - Ok(v) => stream::iter_ok(v).and_then(|x| x).boxify(), - Err(e) => stream::once(Err(e.into())).boxify(), - } + self.kv + .keys() + .and_then(|name| Ok(percent_decode(&name[..].as_bytes()).collect())) + .from_err() + .boxify() } } @@ -170,178 +124,20 @@ where { type Set = BoxFuture, Self::Error>; + #[inline] fn set(&self, key: &AsRef<[u8]>, value: &Self::Value, version: &Version) -> Self::Set { - let pool = self.pool.clone(); - let value = value.clone(); - let version = version.clone(); - self.get_path_mutex(key) - .into_future() - .and_then(move |mutex| { - let future = poll_fn(move || poll_set(&mutex, &value, &version)); - pool.spawn(future) - }) + self.kv + .set(encode_key(key), value, version) + .from_err() .boxify() } + #[inline] fn delete(&self, key: &AsRef<[u8]>, version: &Version) -> Self::Set { - let pool = self.pool.clone(); - let version = version.clone(); - self.get_path_mutex(key) - .into_future() - .and_then(move |mutex| { - let future = poll_fn(move || poll_delete(&mutex, &version)); - pool.spawn(future) - }) - .boxify() + self.kv.delete(encode_key(key), version).from_err().boxify() } } -/// Synchronous implementation of the get operation for the bookmark store. Intended to -/// be used in conjunction with poll_fn() and a CpuPool to dispatch it onto a thread pool. -fn poll_get(path_mutex: &Arc>) -> Poll, Error> -where - V: DeserializeOwned, -{ - let path = path_mutex.lock().expect("Lock poisoned"); - - let result = match File::open(&*path) { - Ok(mut file) => { - // Block until we get an advisory lock on this file. - let fd = file.as_raw_fd(); - fcntl::flock(fd, FlockArg::LockShared)?; - - // Ensure file wasn't deleted between opening and locking. - if stat::fstat(fd)?.st_nlink > 0 { - let mut buf = Vec::new(); - let _ = file.read_to_end(&mut buf)?; - Ok(Some(deserialize(&buf)?)) - } else { - Ok(None) - } - } - Err(e) => { - // Return None instead of an Error if the file doesn't exist. - match e.kind() { - io::ErrorKind::NotFound => Ok(None), - _ => Err(e.into()), - } - } - }; - - result.map(Async::Ready) -} - -/// Synchronous implementation of the set operation for the bookmark store. Intended to -/// be used in conjunction with poll_fn() and a CpuPool to dispatch it onto a thread pool. -fn poll_set( - path_mutex: &Arc>, - value: &V, - version: &Version, -) -> Poll, Error> -where - V: Serialize, -{ - let path = path_mutex.lock().expect("Lock poisoned"); - let mut options = OpenOptions::new(); - options.read(true).write(true); - - // If we expect the file to not exist, disallow opening an existing file. - if *version == Version::absent() { - options.create_new(true); - } - - let result = match options.open(&*path) { - Ok(mut file) => { - // Block until we get an advisory lock on this file. - let fd = file.as_raw_fd(); - fcntl::flock(fd, FlockArg::LockExclusive)?; - - // Read version. - let file_version = if *version == Version::absent() { - Version::absent() - } else { - let mut buf = Vec::new(); - let _ = file.read_to_end(&mut buf)?; - deserialize::<(String, Version)>(&buf)?.1 - }; - - // Write out new value if versions match. - if file_version == *version { - let new_version = version_random(); - let out = serialize(&(value, new_version), Infinite)?; - file.seek(SeekFrom::Start(0))?; - file.set_len(0)?; - file.write_all(&out)?; - Ok(Some(new_version)) - } else { - Ok(None) - } - } - Err(e) => { - // We can only get EEXIST if the version was specified as absent but - // the file exists. This is a version mismatch, so return None accordingly. - match e.kind() { - io::ErrorKind::AlreadyExists => Ok(None), - _ => Err(e.into()), - } - } - }; - - result.map(Async::Ready) -} - -/// Synchronous implementation of the delete operation for the bookmark store. Intended to -/// be used in conjunction with poll_fn() and a CpuPool to dispatch it onto a thread pool. -fn poll_delete( - path_mutex: &Arc>, - version: &Version, -) -> Poll, Error> { - let path = path_mutex.lock().expect("Lock poisoned"); - - let result = match File::open(&*path) { - Ok(mut file) => { - // Block until we get an advisory lock on this file. - let fd = file.as_raw_fd(); - fcntl::flock(fd, FlockArg::LockExclusive)?; - - // Read version. - let mut buf = Vec::new(); - let _ = file.read_to_end(&mut buf)?; - let file_version = deserialize::<(String, Version)>(&buf)?.1; - - // Unlink files if version matches, reporting success if the file - // has already been deleted by another thread or process. - if file_version == *version { - fs::remove_file(&*path).or_else(|e| match e.kind() { - io::ErrorKind::NotFound => Ok(()), - _ => Err(e), - })?; - Ok(Some(Version::absent())) - } else { - Ok(None) - } - } - Err(e) => { - // Check for absent version if the file doesn't exist. - match e.kind() { - io::ErrorKind::NotFound => { - if *version == Version::absent() { - // Report successful deletion of non-existent bookmark. - Ok(Some(Version::absent())) - } else { - // Version mismatch. - Ok(None) - } - } - _ => Err(e.into()), - } - } - }; - - result.map(Async::Ready) -} - - #[cfg(test)] mod test { use super::*; diff --git a/storage/filekv/src/lib.rs b/storage/filekv/src/lib.rs new file mode 100644 index 0000000000..ccc7d48c9c --- /dev/null +++ b/storage/filekv/src/lib.rs @@ -0,0 +1,447 @@ +// Copyright (c) 2017-present, Facebook, Inc. +// All Rights Reserved. +// +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2 or any later version. + +//! A file-based key-value store. Uses `flock(2)` to guarantee cross-process consistency for reads +//! and writes. + +#![deny(warnings)] +#![feature(conservative_impl_trait)] + +extern crate bincode; +#[macro_use] +extern crate error_chain; +extern crate futures; +extern crate futures_cpupool; +extern crate nix; +extern crate rand; +extern crate serde; +#[cfg(test)] +extern crate tempdir; + +extern crate futures_ext; +extern crate storage_types; + +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::fs::{self, File, OpenOptions}; +use std::io::{self, SeekFrom}; +use std::io::prelude::*; +use std::marker::PhantomData; +use std::os::unix::io::AsRawFd; +use std::path::{Path, PathBuf}; +use std::str; +use std::sync::{Arc, Mutex}; + +use bincode::{deserialize, serialize, Infinite}; +use futures::{Async, Poll}; +use futures::future::{poll_fn, Future, IntoFuture}; +use futures::stream::{self, Stream}; +use futures_cpupool::CpuPool; +use nix::fcntl::{self, FlockArg}; +use nix::sys::stat; +use serde::Serialize; +use serde::de::DeserializeOwned; + +use futures_ext::{BoxStream, StreamExt}; +use storage_types::{version_random, Version}; + +mod errors { + error_chain! { + foreign_links { + Bincode(::bincode::Error); + De(::serde::de::value::Error); + Io(::std::io::Error); + Nix(::nix::Error); + } + } +} +pub use errors::*; + +/// A basic file-based persistent bookmark store. +/// +/// Key-value pairs are stored as files in the specified base directory. File operations are +/// dispatched to a thread pool to avoid blocking the main thread. File accesses between these +/// threads are synchronized by a global map of per-path locks. +pub struct FileKV { + base: PathBuf, + prefix: String, + pool: Arc, + locks: Mutex>>>, + _marker: PhantomData, +} + +impl FileKV +where + V: Send + Clone + Serialize + DeserializeOwned + 'static, +{ + pub fn open(path: P, prefix: S) -> Result + where + P: AsRef, + S: Into, + { + Self::open_with_pool(path, prefix, Arc::new(CpuPool::new_num_cpus())) + } + + pub fn open_with_pool(path: P, prefix: S, pool: Arc) -> Result + where + P: AsRef, + S: Into, + { + if !path.as_ref().is_dir() { + bail!("'{}' is not a directory", path.as_ref().to_string_lossy()); + } + + Ok(FileKV { + base: path.as_ref().to_path_buf(), + prefix: prefix.into(), + pool: pool, + locks: Mutex::new(HashMap::new()), + _marker: PhantomData, + }) + } + + pub fn create(path: P, prefix: S) -> Result + where + P: AsRef, + S: Into, + { + Self::create_with_pool(path, prefix, Arc::new(CpuPool::new_num_cpus())) + } + + pub fn create_with_pool(path: P, prefix: S, pool: Arc) -> Result + where + P: AsRef, + S: Into, + { + let path = path.as_ref(); + fs::create_dir_all(path)?; + Self::open_with_pool(path, prefix, pool) + } + + /// Return a Mutex protecting the path to the file corresponding to the given key. + /// Ensures that file accesses across multiple threads in the pool are syncrhonized. + fn get_path_mutex>(&self, key: Q) -> Result>> { + let mut map = self.locks.lock().expect("Lock poisoned"); + match map.entry(key.into()) { + Entry::Occupied(occupied) => { + let mutex = occupied.get(); + Ok((*mutex).clone()) + } + Entry::Vacant(vacant) => { + let path = self.base.join(format!("{}{}", self.prefix, vacant.key())); + let mutex = vacant.insert(Arc::new(Mutex::new(path))); + Ok((*mutex).clone()) + } + } + } + + pub fn get>( + &self, + key: Q, + ) -> impl Future, Error = Error> { + let pool = self.pool.clone(); + self.get_path_mutex(key) + .into_future() + .and_then(move |mutex| { + let future = poll_fn(move || poll_get::(&mutex)); + pool.spawn(future) + }) + } + + pub fn keys(&self) -> BoxStream { + // XXX: This traversal of the directory entries is unsynchronized and depends on + // platform-specific behavior with respect to the underlying directory entries. + // As a result, concurrent writes from other threads may produce strange results here. + + let prefix = self.prefix.clone(); + let prefix_len = prefix.len(); + + let names = fs::read_dir(&self.base).map(|entries| { + entries + .map(|result| { + result + .map_err(From::from) + .map(|entry| entry.file_name().to_string_lossy().into_owned()) + }) + .filter(move |result| match result { + &Ok(ref name) => name.starts_with(&prefix), + &Err(_) => true, + }) + .map(move |result| { + result.and_then(|name| Ok(name[prefix_len..].into())) + }) + }); + match names { + Ok(v) => stream::iter_ok(v).and_then(|x| x).boxify(), + Err(e) => stream::once(Err(e.into())).boxify(), + } + } + + pub fn set>( + &self, + key: Q, + value: &V, + version: &Version, + ) -> impl Future, Error = Error> { + let pool = self.pool.clone(); + let value = value.clone(); + let version = version.clone(); + self.get_path_mutex(key) + .into_future() + .and_then(move |mutex| { + let future = poll_fn(move || poll_set(&mutex, &value, &version)); + pool.spawn(future) + }) + } + + // Convenience function for creating new keys (since initial version is always "absent"). + #[inline] + pub fn set_new>( + &self, + key: Q, + value: &V, + ) -> impl Future, Error = Error> { + self.set(key, value, &Version::absent()) + } + + + pub fn delete>( + &self, + key: Q, + version: &Version, + ) -> impl Future, Error = Error> { + let pool = self.pool.clone(); + let version = version.clone(); + self.get_path_mutex(key) + .into_future() + .and_then(move |mutex| { + let future = poll_fn(move || poll_delete(&mutex, &version)); + pool.spawn(future) + }) + } +} + +/// Synchronous implementation of the get operation for the bookmark store. Intended to +/// be used in conjunction with poll_fn() and a CpuPool to dispatch it onto a thread pool. +fn poll_get(path_mutex: &Arc>) -> Poll, Error> +where + V: DeserializeOwned, +{ + let path = path_mutex.lock().expect("Lock poisoned"); + + let result = match File::open(&*path) { + Ok(mut file) => { + // Block until we get an advisory lock on this file. + let fd = file.as_raw_fd(); + fcntl::flock(fd, FlockArg::LockShared)?; + + // Ensure file wasn't deleted between opening and locking. + if stat::fstat(fd)?.st_nlink > 0 { + let mut buf = Vec::new(); + let _ = file.read_to_end(&mut buf)?; + Ok(Some(deserialize(&buf)?)) + } else { + Ok(None) + } + } + Err(e) => { + // Return None instead of an Error if the file doesn't exist. + match e.kind() { + io::ErrorKind::NotFound => Ok(None), + _ => Err(e.into()), + } + } + }; + + result.map(Async::Ready) +} + +/// Synchronous implementation of the set operation for the bookmark store. Intended to +/// be used in conjunction with poll_fn() and a CpuPool to dispatch it onto a thread pool. +fn poll_set( + path_mutex: &Arc>, + value: &V, + version: &Version, +) -> Poll, Error> +where + V: Serialize, +{ + let path = path_mutex.lock().expect("Lock poisoned"); + let mut options = OpenOptions::new(); + options.read(true).write(true); + + // If we expect the file to not exist, disallow opening an existing file. + if *version == Version::absent() { + options.create_new(true); + } + + let result = match options.open(&*path) { + Ok(mut file) => { + // Block until we get an advisory lock on this file. + let fd = file.as_raw_fd(); + fcntl::flock(fd, FlockArg::LockExclusive)?; + + // Read version. + let file_version = if *version == Version::absent() { + Version::absent() + } else { + let mut buf = Vec::new(); + let _ = file.read_to_end(&mut buf)?; + deserialize::<(String, Version)>(&buf)?.1 + }; + + // Write out new value if versions match. + if file_version == *version { + let new_version = version_random(); + let out = serialize(&(value, new_version), Infinite)?; + file.seek(SeekFrom::Start(0))?; + file.set_len(0)?; + file.write_all(&out)?; + Ok(Some(new_version)) + } else { + Ok(None) + } + } + Err(e) => { + // We can only get EEXIST if the version was specified as absent but + // the file exists. This is a version mismatch, so return None accordingly. + match e.kind() { + io::ErrorKind::AlreadyExists => Ok(None), + _ => Err(e.into()), + } + } + }; + + result.map(Async::Ready) +} + +/// Synchronous implementation of the delete operation for the bookmark store. Intended to +/// be used in conjunction with poll_fn() and a CpuPool to dispatch it onto a thread pool. +fn poll_delete( + path_mutex: &Arc>, + version: &Version, +) -> Poll, Error> { + let path = path_mutex.lock().expect("Lock poisoned"); + + let result = match File::open(&*path) { + Ok(mut file) => { + // Block until we get an advisory lock on this file. + let fd = file.as_raw_fd(); + fcntl::flock(fd, FlockArg::LockExclusive)?; + + // Read version. + let mut buf = Vec::new(); + let _ = file.read_to_end(&mut buf)?; + let file_version = deserialize::<(String, Version)>(&buf)?.1; + + // Unlink files if version matches, reporting success if the file + // has already been deleted by another thread or process. + if file_version == *version { + fs::remove_file(&*path).or_else(|e| match e.kind() { + io::ErrorKind::NotFound => Ok(()), + _ => Err(e), + })?; + Ok(Some(Version::absent())) + } else { + Ok(None) + } + } + Err(e) => { + // Check for absent version if the file doesn't exist. + match e.kind() { + io::ErrorKind::NotFound => { + if *version == Version::absent() { + // Report successful deletion of non-existent bookmark. + Ok(Some(Version::absent())) + } else { + // Version mismatch. + Ok(None) + } + } + _ => Err(e.into()), + } + } + }; + + result.map(Async::Ready) +} + +#[cfg(test)] +mod test { + use super::*; + use futures::{Future, Stream}; + use tempdir::TempDir; + + #[test] + fn basic() { + let tmp = TempDir::new("filekv_basic").unwrap(); + let kv = FileKV::open(tmp.path(), "kv:").unwrap(); + + let foo = "foo"; + let one = "1".to_string(); + let two = "2".to_string(); + let three = "3".to_string(); + + assert_eq!(kv.get(foo).wait().unwrap(), None); + + let absent = Version::absent(); + let foo_v1 = kv.set(foo, &one, &absent).wait().unwrap().unwrap(); + assert_eq!(kv.get(foo).wait().unwrap(), Some((one.clone(), foo_v1))); + + let foo_v2 = kv.set(foo, &two, &foo_v1).wait().unwrap().unwrap(); + + // Should fail due to version mismatch. + assert_eq!(kv.set(foo, &three, &foo_v1).wait().unwrap(), None); + + assert_eq!(kv.delete(foo, &foo_v2).wait().unwrap().unwrap(), absent); + assert_eq!(kv.get(foo).wait().unwrap(), None); + + // Even though bookmark doesn't exist, this should fail with a version mismatch. + assert_eq!(kv.delete(foo, &foo_v2).wait().unwrap(), None); + + // Deleting it with the absent version should work. + assert_eq!(kv.delete(foo, &absent).wait().unwrap().unwrap(), absent); + } + + #[test] + fn persistence() { + let tmp = TempDir::new("filebookmarks_heads_persistence").unwrap(); + let foo = "foo"; + let bar = "bar".to_string(); + + let version; + { + let kv = FileKV::open(tmp.path(), "kv:").unwrap(); + version = kv.set_new(foo, &bar).wait().unwrap().unwrap(); + } + + let kv = FileKV::open(tmp.path(), "kv:").unwrap(); + assert_eq!(kv.get(foo).wait().unwrap(), Some((bar, version))); + } + + #[test] + fn list() { + let tmp = TempDir::new("filebookmarks_heads_basic").unwrap(); + let kv = FileKV::open(tmp.path(), "kv:").unwrap(); + + let one = "1"; + let two = "2"; + let three = "3"; + + let _ = kv.set_new(one, &"foo".to_string()).wait().unwrap().unwrap(); + let _ = kv.set_new(two, &"bar".to_string()).wait().unwrap().unwrap(); + let _ = kv.set_new(three, &"baz".to_string()) + .wait() + .unwrap() + .unwrap(); + + let mut result = kv.keys().collect().wait().unwrap(); + result.sort(); + + let expected = vec![one, two, three]; + assert_eq!(result, expected); + } +}