factor out file-based key-value store from FileBookmarks

Summary:
This code is generally useful and will also be used for linknodes.

Note that I marked `filekv` as a copy of `filebookmarks` to preserve blame
info. Git doesn't track renames so this unfortunately won't be transferred to
the open source repo.

Reviewed By: farnz

Differential Revision: D6015776

fbshipit-source-id: 2c17e2440e25e6179dd71d16a87095ad1b346b49
This commit is contained in:
Siddharth Agarwal 2017-10-10 07:25:40 -07:00 committed by Facebook Github Bot
parent 86f4489e7c
commit 10b1993ca0
2 changed files with 495 additions and 252 deletions

View File

@ -8,53 +8,38 @@
extern crate bookmarks;
extern crate bincode;
#[macro_use]
extern crate error_chain;
extern crate futures;
extern crate futures_cpupool;
extern crate nix;
extern crate percent_encoding;
extern crate rand;
extern crate serde;
#[cfg(test)]
extern crate tempdir;
extern crate filekv;
extern crate futures_ext;
extern crate storage_types;
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{self, SeekFrom};
use std::io::prelude::*;
use std::marker::PhantomData;
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use std::path::Path;
use std::str;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use bincode::{deserialize, serialize, Infinite};
use futures::{Async, Poll};
use futures::future::{poll_fn, Future, IntoFuture};
use futures::stream::{self, Stream};
use futures::{Future, Stream};
use futures_cpupool::CpuPool;
use nix::fcntl::{self, FlockArg};
use nix::sys::stat;
use percent_encoding::{percent_decode, percent_encode, DEFAULT_ENCODE_SET};
use serde::Serialize;
use serde::de::DeserializeOwned;
use bookmarks::{Bookmarks, BookmarksMut};
use filekv::FileKV;
use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use storage_types::{version_random, Version};
use storage_types::Version;
mod errors {
error_chain!{
foreign_links {
Bincode(::bincode::Error);
De(::serde::de::value::Error);
Io(::std::io::Error);
Nix(::nix::Error);
error_chain! {
links {
FileKV(::filekv::Error, ::filekv::ErrorKind);
}
}
}
@ -68,51 +53,45 @@ static PREFIX: &'static str = "bookmark:";
/// to a thread pool to avoid blocking the main thread. File accesses between these threads
/// are synchronized by a global map of per-path locks.
pub struct FileBookmarks<V> {
base: PathBuf,
pool: Arc<CpuPool>,
locks: Mutex<HashMap<String, Arc<Mutex<PathBuf>>>>,
_marker: PhantomData<V>,
kv: FileKV<V>,
}
impl<V> FileBookmarks<V> {
impl<V> FileBookmarks<V>
where
V: Send + Clone + Serialize + DeserializeOwned + 'static,
{
#[inline]
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
Self::open_with_pool(path, Arc::new(CpuPool::new_num_cpus()))
}
pub fn open_with_pool<P: AsRef<Path>>(path: P, pool: Arc<CpuPool>) -> Result<Self> {
if !path.as_ref().is_dir() {
bail!("'{}' is not a directory", path.as_ref().to_string_lossy());
}
Ok(FileBookmarks {
base: path.as_ref().to_path_buf(),
pool: pool,
locks: Mutex::new(HashMap::new()),
_marker: PhantomData,
kv: FileKV::open(path, PREFIX)?,
})
}
#[inline]
pub fn open_with_pool<P: AsRef<Path>>(path: P, pool: Arc<CpuPool>) -> Result<Self> {
Ok(FileBookmarks {
kv: FileKV::open_with_pool(path, PREFIX, pool)?,
})
}
#[inline]
pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
Self::create_with_pool(path, Arc::new(CpuPool::new_num_cpus()))
Ok(FileBookmarks {
kv: FileKV::create(path, PREFIX)?,
})
}
#[inline]
pub fn create_with_pool<P: AsRef<Path>>(path: P, pool: Arc<CpuPool>) -> Result<Self> {
let path = path.as_ref();
fs::create_dir_all(path)?;
Self::open_with_pool(path, pool)
Ok(FileBookmarks {
kv: FileKV::create_with_pool(path, PREFIX, pool)?,
})
}
}
/// Return a Mutex protecting the path to the file corresponding to the given key.
/// Ensures that file accesses across multiple threads in the pool are syncrhonized.
fn get_path_mutex(&self, key: &AsRef<[u8]>) -> Result<Arc<Mutex<PathBuf>>> {
let key_string = percent_encode(key.as_ref(), DEFAULT_ENCODE_SET).to_string();
let mut map = self.locks.lock().expect("Lock poisoned");
let mutex = map.entry(key_string.clone()).or_insert_with(|| {
let path = self.base.join(format!("{}{}", PREFIX, key_string));
Arc::new(Mutex::new(path))
});
Ok((*mutex).clone())
}
#[inline]
fn encode_key(key: &AsRef<[u8]>) -> String {
percent_encode(key.as_ref(), DEFAULT_ENCODE_SET).to_string()
}
impl<V> Bookmarks for FileBookmarks<V>
@ -125,42 +104,17 @@ where
type Get = BoxFuture<Option<(Self::Value, Version)>, Self::Error>;
type Keys = BoxStream<Vec<u8>, Self::Error>;
#[inline]
fn get(&self, key: &AsRef<[u8]>) -> Self::Get {
let pool = self.pool.clone();
self.get_path_mutex(key)
.into_future()
.and_then(move |mutex| {
let future = poll_fn(move || poll_get::<V>(&mutex));
pool.spawn(future)
})
.boxify()
self.kv.get(encode_key(key)).from_err().boxify()
}
fn keys(&self) -> Self::Keys {
// XXX: This traversal of the directory entries is unsynchronized and depends on
// platform-specific behavior with respect to the underlying directory entries.
// As a result, concurrent writes from other threads may produce strange results here.
let names = fs::read_dir(&self.base).map(|entries| {
entries
.map(|result| {
result
.map_err(From::from)
.map(|entry| entry.file_name().to_string_lossy().into_owned())
})
.filter(|result| match result {
&Ok(ref name) => name.starts_with(PREFIX),
&Err(_) => true,
})
.map(|result| {
result.and_then(|name| {
Ok(percent_decode(&name[PREFIX.len()..].as_bytes()).collect())
})
})
});
match names {
Ok(v) => stream::iter_ok(v).and_then(|x| x).boxify(),
Err(e) => stream::once(Err(e.into())).boxify(),
}
self.kv
.keys()
.and_then(|name| Ok(percent_decode(&name[..].as_bytes()).collect()))
.from_err()
.boxify()
}
}
@ -170,178 +124,20 @@ where
{
type Set = BoxFuture<Option<Version>, Self::Error>;
#[inline]
fn set(&self, key: &AsRef<[u8]>, value: &Self::Value, version: &Version) -> Self::Set {
let pool = self.pool.clone();
let value = value.clone();
let version = version.clone();
self.get_path_mutex(key)
.into_future()
.and_then(move |mutex| {
let future = poll_fn(move || poll_set(&mutex, &value, &version));
pool.spawn(future)
})
self.kv
.set(encode_key(key), value, version)
.from_err()
.boxify()
}
#[inline]
fn delete(&self, key: &AsRef<[u8]>, version: &Version) -> Self::Set {
let pool = self.pool.clone();
let version = version.clone();
self.get_path_mutex(key)
.into_future()
.and_then(move |mutex| {
let future = poll_fn(move || poll_delete(&mutex, &version));
pool.spawn(future)
})
.boxify()
self.kv.delete(encode_key(key), version).from_err().boxify()
}
}
/// Synchronous implementation of the get operation for the bookmark store. Intended to
/// be used in conjunction with poll_fn() and a CpuPool to dispatch it onto a thread pool.
fn poll_get<V>(path_mutex: &Arc<Mutex<PathBuf>>) -> Poll<Option<(V, Version)>, Error>
where
V: DeserializeOwned,
{
let path = path_mutex.lock().expect("Lock poisoned");
let result = match File::open(&*path) {
Ok(mut file) => {
// Block until we get an advisory lock on this file.
let fd = file.as_raw_fd();
fcntl::flock(fd, FlockArg::LockShared)?;
// Ensure file wasn't deleted between opening and locking.
if stat::fstat(fd)?.st_nlink > 0 {
let mut buf = Vec::new();
let _ = file.read_to_end(&mut buf)?;
Ok(Some(deserialize(&buf)?))
} else {
Ok(None)
}
}
Err(e) => {
// Return None instead of an Error if the file doesn't exist.
match e.kind() {
io::ErrorKind::NotFound => Ok(None),
_ => Err(e.into()),
}
}
};
result.map(Async::Ready)
}
/// Synchronous implementation of the set operation for the bookmark store. Intended to
/// be used in conjunction with poll_fn() and a CpuPool to dispatch it onto a thread pool.
fn poll_set<V>(
path_mutex: &Arc<Mutex<PathBuf>>,
value: &V,
version: &Version,
) -> Poll<Option<Version>, Error>
where
V: Serialize,
{
let path = path_mutex.lock().expect("Lock poisoned");
let mut options = OpenOptions::new();
options.read(true).write(true);
// If we expect the file to not exist, disallow opening an existing file.
if *version == Version::absent() {
options.create_new(true);
}
let result = match options.open(&*path) {
Ok(mut file) => {
// Block until we get an advisory lock on this file.
let fd = file.as_raw_fd();
fcntl::flock(fd, FlockArg::LockExclusive)?;
// Read version.
let file_version = if *version == Version::absent() {
Version::absent()
} else {
let mut buf = Vec::new();
let _ = file.read_to_end(&mut buf)?;
deserialize::<(String, Version)>(&buf)?.1
};
// Write out new value if versions match.
if file_version == *version {
let new_version = version_random();
let out = serialize(&(value, new_version), Infinite)?;
file.seek(SeekFrom::Start(0))?;
file.set_len(0)?;
file.write_all(&out)?;
Ok(Some(new_version))
} else {
Ok(None)
}
}
Err(e) => {
// We can only get EEXIST if the version was specified as absent but
// the file exists. This is a version mismatch, so return None accordingly.
match e.kind() {
io::ErrorKind::AlreadyExists => Ok(None),
_ => Err(e.into()),
}
}
};
result.map(Async::Ready)
}
/// Synchronous implementation of the delete operation for the bookmark store. Intended to
/// be used in conjunction with poll_fn() and a CpuPool to dispatch it onto a thread pool.
fn poll_delete(
path_mutex: &Arc<Mutex<PathBuf>>,
version: &Version,
) -> Poll<Option<Version>, Error> {
let path = path_mutex.lock().expect("Lock poisoned");
let result = match File::open(&*path) {
Ok(mut file) => {
// Block until we get an advisory lock on this file.
let fd = file.as_raw_fd();
fcntl::flock(fd, FlockArg::LockExclusive)?;
// Read version.
let mut buf = Vec::new();
let _ = file.read_to_end(&mut buf)?;
let file_version = deserialize::<(String, Version)>(&buf)?.1;
// Unlink files if version matches, reporting success if the file
// has already been deleted by another thread or process.
if file_version == *version {
fs::remove_file(&*path).or_else(|e| match e.kind() {
io::ErrorKind::NotFound => Ok(()),
_ => Err(e),
})?;
Ok(Some(Version::absent()))
} else {
Ok(None)
}
}
Err(e) => {
// Check for absent version if the file doesn't exist.
match e.kind() {
io::ErrorKind::NotFound => {
if *version == Version::absent() {
// Report successful deletion of non-existent bookmark.
Ok(Some(Version::absent()))
} else {
// Version mismatch.
Ok(None)
}
}
_ => Err(e.into()),
}
}
};
result.map(Async::Ready)
}
#[cfg(test)]
mod test {
use super::*;

447
storage/filekv/src/lib.rs Normal file
View File

@ -0,0 +1,447 @@
// Copyright (c) 2017-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
//! A file-based key-value store. Uses `flock(2)` to guarantee cross-process consistency for reads
//! and writes.
#![deny(warnings)]
#![feature(conservative_impl_trait)]
extern crate bincode;
#[macro_use]
extern crate error_chain;
extern crate futures;
extern crate futures_cpupool;
extern crate nix;
extern crate rand;
extern crate serde;
#[cfg(test)]
extern crate tempdir;
extern crate futures_ext;
extern crate storage_types;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::fs::{self, File, OpenOptions};
use std::io::{self, SeekFrom};
use std::io::prelude::*;
use std::marker::PhantomData;
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use std::str;
use std::sync::{Arc, Mutex};
use bincode::{deserialize, serialize, Infinite};
use futures::{Async, Poll};
use futures::future::{poll_fn, Future, IntoFuture};
use futures::stream::{self, Stream};
use futures_cpupool::CpuPool;
use nix::fcntl::{self, FlockArg};
use nix::sys::stat;
use serde::Serialize;
use serde::de::DeserializeOwned;
use futures_ext::{BoxStream, StreamExt};
use storage_types::{version_random, Version};
mod errors {
error_chain! {
foreign_links {
Bincode(::bincode::Error);
De(::serde::de::value::Error);
Io(::std::io::Error);
Nix(::nix::Error);
}
}
}
pub use errors::*;
/// A basic file-based persistent bookmark store.
///
/// Key-value pairs are stored as files in the specified base directory. File operations are
/// dispatched to a thread pool to avoid blocking the main thread. File accesses between these
/// threads are synchronized by a global map of per-path locks.
pub struct FileKV<V> {
base: PathBuf,
prefix: String,
pool: Arc<CpuPool>,
locks: Mutex<HashMap<String, Arc<Mutex<PathBuf>>>>,
_marker: PhantomData<V>,
}
impl<V> FileKV<V>
where
V: Send + Clone + Serialize + DeserializeOwned + 'static,
{
pub fn open<P, S>(path: P, prefix: S) -> Result<Self>
where
P: AsRef<Path>,
S: Into<String>,
{
Self::open_with_pool(path, prefix, Arc::new(CpuPool::new_num_cpus()))
}
pub fn open_with_pool<P, S>(path: P, prefix: S, pool: Arc<CpuPool>) -> Result<Self>
where
P: AsRef<Path>,
S: Into<String>,
{
if !path.as_ref().is_dir() {
bail!("'{}' is not a directory", path.as_ref().to_string_lossy());
}
Ok(FileKV {
base: path.as_ref().to_path_buf(),
prefix: prefix.into(),
pool: pool,
locks: Mutex::new(HashMap::new()),
_marker: PhantomData,
})
}
pub fn create<P, S>(path: P, prefix: S) -> Result<Self>
where
P: AsRef<Path>,
S: Into<String>,
{
Self::create_with_pool(path, prefix, Arc::new(CpuPool::new_num_cpus()))
}
pub fn create_with_pool<P, S>(path: P, prefix: S, pool: Arc<CpuPool>) -> Result<Self>
where
P: AsRef<Path>,
S: Into<String>,
{
let path = path.as_ref();
fs::create_dir_all(path)?;
Self::open_with_pool(path, prefix, pool)
}
/// Return a Mutex protecting the path to the file corresponding to the given key.
/// Ensures that file accesses across multiple threads in the pool are syncrhonized.
fn get_path_mutex<Q: Into<String>>(&self, key: Q) -> Result<Arc<Mutex<PathBuf>>> {
let mut map = self.locks.lock().expect("Lock poisoned");
match map.entry(key.into()) {
Entry::Occupied(occupied) => {
let mutex = occupied.get();
Ok((*mutex).clone())
}
Entry::Vacant(vacant) => {
let path = self.base.join(format!("{}{}", self.prefix, vacant.key()));
let mutex = vacant.insert(Arc::new(Mutex::new(path)));
Ok((*mutex).clone())
}
}
}
pub fn get<Q: Into<String>>(
&self,
key: Q,
) -> impl Future<Item = Option<(V, Version)>, Error = Error> {
let pool = self.pool.clone();
self.get_path_mutex(key)
.into_future()
.and_then(move |mutex| {
let future = poll_fn(move || poll_get::<V>(&mutex));
pool.spawn(future)
})
}
pub fn keys(&self) -> BoxStream<String, Error> {
// XXX: This traversal of the directory entries is unsynchronized and depends on
// platform-specific behavior with respect to the underlying directory entries.
// As a result, concurrent writes from other threads may produce strange results here.
let prefix = self.prefix.clone();
let prefix_len = prefix.len();
let names = fs::read_dir(&self.base).map(|entries| {
entries
.map(|result| {
result
.map_err(From::from)
.map(|entry| entry.file_name().to_string_lossy().into_owned())
})
.filter(move |result| match result {
&Ok(ref name) => name.starts_with(&prefix),
&Err(_) => true,
})
.map(move |result| {
result.and_then(|name| Ok(name[prefix_len..].into()))
})
});
match names {
Ok(v) => stream::iter_ok(v).and_then(|x| x).boxify(),
Err(e) => stream::once(Err(e.into())).boxify(),
}
}
pub fn set<Q: Into<String>>(
&self,
key: Q,
value: &V,
version: &Version,
) -> impl Future<Item = Option<Version>, Error = Error> {
let pool = self.pool.clone();
let value = value.clone();
let version = version.clone();
self.get_path_mutex(key)
.into_future()
.and_then(move |mutex| {
let future = poll_fn(move || poll_set(&mutex, &value, &version));
pool.spawn(future)
})
}
// Convenience function for creating new keys (since initial version is always "absent").
#[inline]
pub fn set_new<Q: Into<String>>(
&self,
key: Q,
value: &V,
) -> impl Future<Item = Option<Version>, Error = Error> {
self.set(key, value, &Version::absent())
}
pub fn delete<Q: Into<String>>(
&self,
key: Q,
version: &Version,
) -> impl Future<Item = Option<Version>, Error = Error> {
let pool = self.pool.clone();
let version = version.clone();
self.get_path_mutex(key)
.into_future()
.and_then(move |mutex| {
let future = poll_fn(move || poll_delete(&mutex, &version));
pool.spawn(future)
})
}
}
/// Synchronous implementation of the get operation for the bookmark store. Intended to
/// be used in conjunction with poll_fn() and a CpuPool to dispatch it onto a thread pool.
fn poll_get<V>(path_mutex: &Arc<Mutex<PathBuf>>) -> Poll<Option<(V, Version)>, Error>
where
V: DeserializeOwned,
{
let path = path_mutex.lock().expect("Lock poisoned");
let result = match File::open(&*path) {
Ok(mut file) => {
// Block until we get an advisory lock on this file.
let fd = file.as_raw_fd();
fcntl::flock(fd, FlockArg::LockShared)?;
// Ensure file wasn't deleted between opening and locking.
if stat::fstat(fd)?.st_nlink > 0 {
let mut buf = Vec::new();
let _ = file.read_to_end(&mut buf)?;
Ok(Some(deserialize(&buf)?))
} else {
Ok(None)
}
}
Err(e) => {
// Return None instead of an Error if the file doesn't exist.
match e.kind() {
io::ErrorKind::NotFound => Ok(None),
_ => Err(e.into()),
}
}
};
result.map(Async::Ready)
}
/// Synchronous implementation of the set operation for the bookmark store. Intended to
/// be used in conjunction with poll_fn() and a CpuPool to dispatch it onto a thread pool.
fn poll_set<V>(
path_mutex: &Arc<Mutex<PathBuf>>,
value: &V,
version: &Version,
) -> Poll<Option<Version>, Error>
where
V: Serialize,
{
let path = path_mutex.lock().expect("Lock poisoned");
let mut options = OpenOptions::new();
options.read(true).write(true);
// If we expect the file to not exist, disallow opening an existing file.
if *version == Version::absent() {
options.create_new(true);
}
let result = match options.open(&*path) {
Ok(mut file) => {
// Block until we get an advisory lock on this file.
let fd = file.as_raw_fd();
fcntl::flock(fd, FlockArg::LockExclusive)?;
// Read version.
let file_version = if *version == Version::absent() {
Version::absent()
} else {
let mut buf = Vec::new();
let _ = file.read_to_end(&mut buf)?;
deserialize::<(String, Version)>(&buf)?.1
};
// Write out new value if versions match.
if file_version == *version {
let new_version = version_random();
let out = serialize(&(value, new_version), Infinite)?;
file.seek(SeekFrom::Start(0))?;
file.set_len(0)?;
file.write_all(&out)?;
Ok(Some(new_version))
} else {
Ok(None)
}
}
Err(e) => {
// We can only get EEXIST if the version was specified as absent but
// the file exists. This is a version mismatch, so return None accordingly.
match e.kind() {
io::ErrorKind::AlreadyExists => Ok(None),
_ => Err(e.into()),
}
}
};
result.map(Async::Ready)
}
/// Synchronous implementation of the delete operation for the bookmark store. Intended to
/// be used in conjunction with poll_fn() and a CpuPool to dispatch it onto a thread pool.
fn poll_delete(
path_mutex: &Arc<Mutex<PathBuf>>,
version: &Version,
) -> Poll<Option<Version>, Error> {
let path = path_mutex.lock().expect("Lock poisoned");
let result = match File::open(&*path) {
Ok(mut file) => {
// Block until we get an advisory lock on this file.
let fd = file.as_raw_fd();
fcntl::flock(fd, FlockArg::LockExclusive)?;
// Read version.
let mut buf = Vec::new();
let _ = file.read_to_end(&mut buf)?;
let file_version = deserialize::<(String, Version)>(&buf)?.1;
// Unlink files if version matches, reporting success if the file
// has already been deleted by another thread or process.
if file_version == *version {
fs::remove_file(&*path).or_else(|e| match e.kind() {
io::ErrorKind::NotFound => Ok(()),
_ => Err(e),
})?;
Ok(Some(Version::absent()))
} else {
Ok(None)
}
}
Err(e) => {
// Check for absent version if the file doesn't exist.
match e.kind() {
io::ErrorKind::NotFound => {
if *version == Version::absent() {
// Report successful deletion of non-existent bookmark.
Ok(Some(Version::absent()))
} else {
// Version mismatch.
Ok(None)
}
}
_ => Err(e.into()),
}
}
};
result.map(Async::Ready)
}
#[cfg(test)]
mod test {
use super::*;
use futures::{Future, Stream};
use tempdir::TempDir;
#[test]
fn basic() {
let tmp = TempDir::new("filekv_basic").unwrap();
let kv = FileKV::open(tmp.path(), "kv:").unwrap();
let foo = "foo";
let one = "1".to_string();
let two = "2".to_string();
let three = "3".to_string();
assert_eq!(kv.get(foo).wait().unwrap(), None);
let absent = Version::absent();
let foo_v1 = kv.set(foo, &one, &absent).wait().unwrap().unwrap();
assert_eq!(kv.get(foo).wait().unwrap(), Some((one.clone(), foo_v1)));
let foo_v2 = kv.set(foo, &two, &foo_v1).wait().unwrap().unwrap();
// Should fail due to version mismatch.
assert_eq!(kv.set(foo, &three, &foo_v1).wait().unwrap(), None);
assert_eq!(kv.delete(foo, &foo_v2).wait().unwrap().unwrap(), absent);
assert_eq!(kv.get(foo).wait().unwrap(), None);
// Even though bookmark doesn't exist, this should fail with a version mismatch.
assert_eq!(kv.delete(foo, &foo_v2).wait().unwrap(), None);
// Deleting it with the absent version should work.
assert_eq!(kv.delete(foo, &absent).wait().unwrap().unwrap(), absent);
}
#[test]
fn persistence() {
let tmp = TempDir::new("filebookmarks_heads_persistence").unwrap();
let foo = "foo";
let bar = "bar".to_string();
let version;
{
let kv = FileKV::open(tmp.path(), "kv:").unwrap();
version = kv.set_new(foo, &bar).wait().unwrap().unwrap();
}
let kv = FileKV::open(tmp.path(), "kv:").unwrap();
assert_eq!(kv.get(foo).wait().unwrap(), Some((bar, version)));
}
#[test]
fn list() {
let tmp = TempDir::new("filebookmarks_heads_basic").unwrap();
let kv = FileKV::open(tmp.path(), "kv:").unwrap();
let one = "1";
let two = "2";
let three = "3";
let _ = kv.set_new(one, &"foo".to_string()).wait().unwrap().unwrap();
let _ = kv.set_new(two, &"bar".to_string()).wait().unwrap().unwrap();
let _ = kv.set_new(three, &"baz".to_string())
.wait()
.unwrap()
.unwrap();
let mut result = kv.keys().collect().wait().unwrap();
result.sort();
let expected = vec![one, two, three];
assert_eq!(result, expected);
}
}