mirror of
https://github.com/facebook/sapling.git
synced 2024-10-11 01:07:15 +03:00
mononoke/glusterblob: add fanout over the whole cluster
Summary: Rather than just talking to a single host, make connections to as many machines as possible, and randomly fan out operations to them. Reviewed By: lukaspiatkowski Differential Revision: D13915510 fbshipit-source-id: fb8a8c2c0a0abaa9ba0a39d4702c0d81fbe590f6
This commit is contained in:
parent
322c029423
commit
ec2eb2e57c
@ -18,7 +18,6 @@ use std::sync::Arc;
|
||||
use bytes::{BigEndian, ByteOrder};
|
||||
use cloned::cloned;
|
||||
use failure_ext::{format_err, Error};
|
||||
use futures::future;
|
||||
use futures::prelude::*;
|
||||
use futures_ext::{BoxFuture, FutureExt};
|
||||
use futures_stats::Timed;
|
||||
@ -59,12 +58,18 @@ struct GlusterBlobMetadata {
|
||||
xxhash64: Option<u64>,
|
||||
}
|
||||
|
||||
pub struct Glusterblob {
|
||||
/// Connection to a single Gluster node
|
||||
struct GlusterCtxt {
|
||||
ctxt: AsyncNfsContext,
|
||||
host: String,
|
||||
stats: Arc<GlusterStats>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Glusterblob {
|
||||
ctxts: Vec<GlusterCtxt>,
|
||||
export: String,
|
||||
basepath: PathBuf,
|
||||
stats: Arc<GlusterStats>,
|
||||
}
|
||||
|
||||
impl Glusterblob {
|
||||
@ -122,27 +127,41 @@ impl Glusterblob {
|
||||
}
|
||||
.into_future();
|
||||
|
||||
hosts.and_then(move |hosts| {
|
||||
let ctxts = hosts.into_iter().map({
|
||||
let ctxts = hosts
|
||||
.and_then({
|
||||
cloned!(export);
|
||||
move |host| AsyncNfsContext::mount(&host, &*export).map(|ctxt| (ctxt, host))
|
||||
move |hosts| {
|
||||
let conns = hosts.into_iter().map({
|
||||
cloned!(export);
|
||||
move |host| {
|
||||
AsyncNfsContext::mount(&host, &*export)
|
||||
.and_then(|ctxt| ctxt.set_auth(UID, GID).map(|()| ctxt))
|
||||
.map(|ctxt| GlusterCtxt {
|
||||
ctxt,
|
||||
host: host.clone(),
|
||||
stats: Arc::new(GlusterStats::new(host)),
|
||||
})
|
||||
}
|
||||
});
|
||||
futures::future::join_all(conns.map(|conn| conn.then(|res| Ok(res))))
|
||||
}
|
||||
})
|
||||
.then(|res| match res {
|
||||
Err(_) => panic!("error?"),
|
||||
Ok(ctxts) => {
|
||||
if ctxts.is_empty() {
|
||||
Err(failure_ext::err_msg("No successful connections"))
|
||||
} else {
|
||||
Ok(ctxts.into_iter().filter_map(Result::ok).collect())
|
||||
}
|
||||
}
|
||||
});
|
||||
future::select_ok(ctxts)
|
||||
.map(|((ctxt, host), _)| (ctxt, host)) // don't care about unchosen hosts
|
||||
.and_then(|(ctxt, host)| ctxt.set_auth(UID, GID).map(move |()| (ctxt, host)))
|
||||
.from_err() // io::Error -> Error
|
||||
.map(move |(ctxt, host)| Glusterblob {
|
||||
ctxt,
|
||||
host: host.clone(),
|
||||
export,
|
||||
basepath,
|
||||
stats: Arc::new(GlusterStats::new(host)),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_hostname(&self) -> &str {
|
||||
&*self.host
|
||||
ctxts.map(move |ctxts| Glusterblob {
|
||||
ctxts,
|
||||
export,
|
||||
basepath,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_export(&self) -> &str {
|
||||
@ -153,6 +172,14 @@ impl Glusterblob {
|
||||
&*self.basepath
|
||||
}
|
||||
|
||||
fn pick_context(&self) -> (AsyncNfsContext, Arc<GlusterStats>) {
|
||||
let conn = self
|
||||
.ctxts
|
||||
.choose(&mut rand::thread_rng())
|
||||
.expect("No contexts");
|
||||
(conn.ctxt.clone(), conn.stats.clone())
|
||||
}
|
||||
|
||||
/// Return the path to a dir for a given key
|
||||
fn keydir(&self, key: &str) -> PathBuf {
|
||||
let hash = name_xxhash(key);
|
||||
@ -189,20 +216,19 @@ impl Glusterblob {
|
||||
fn create_keydir(&self, key: &str) -> impl Future<Item = PathBuf, Error = io::Error> {
|
||||
let path = self.keydir(key);
|
||||
|
||||
let (ctxt, _) = self.pick_context();
|
||||
|
||||
// stat first to check if its there (don't worry about perms or even if its actually a dir)
|
||||
self.ctxt
|
||||
.stat64(path.clone())
|
||||
.then(missing_is_none)
|
||||
.and_then({
|
||||
cloned!(self.ctxt, path);
|
||||
move |found| match found {
|
||||
Some(_) => Ok(path).into_future().left_future(),
|
||||
None => ctxt
|
||||
.mkpath(path.clone(), DIRMODE)
|
||||
.map(move |()| path)
|
||||
.right_future(),
|
||||
}
|
||||
})
|
||||
ctxt.stat64(path.clone()).then(missing_is_none).and_then({
|
||||
cloned!(ctxt, path);
|
||||
move |found| match found {
|
||||
Some(_) => Ok(path).into_future().left_future(),
|
||||
None => ctxt
|
||||
.mkpath(path.clone(), DIRMODE)
|
||||
.map(move |()| path)
|
||||
.right_future(),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn data_xxhash(data: &BlobstoreBytes) -> u64 {
|
||||
@ -212,12 +238,10 @@ impl Glusterblob {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Glusterblob {
|
||||
impl fmt::Debug for GlusterCtxt {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fmt.debug_struct("Glusterblob")
|
||||
fmt.debug_struct("GlusterCtxt")
|
||||
.field("host", &self.host)
|
||||
.field("export", &self.host)
|
||||
.field("basepath", &self.basepath.display())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@ -229,12 +253,13 @@ impl Blobstore for Glusterblob {
|
||||
let datapath = path.join(Self::keyfile(&*key));
|
||||
let metapath = path.join(Self::metafile(&*key));
|
||||
|
||||
let (ctxt, stats) = self.pick_context();
|
||||
|
||||
// Open path; if it doesn't exist then succeed with None, otherwise return the failure.
|
||||
// If it opens OK, then stat to get the size of the file, and try to read it in a single
|
||||
// read. Fail if its a short (or long) read.
|
||||
// TODO: do multiple reads to get whole file.
|
||||
let data = self
|
||||
.ctxt
|
||||
let data = ctxt
|
||||
.open(datapath, OFlag::O_RDONLY)
|
||||
.then(missing_is_none)
|
||||
.and_then(|found| match found {
|
||||
@ -257,8 +282,7 @@ impl Blobstore for Glusterblob {
|
||||
None => Ok(None).into_future().left_future(),
|
||||
});
|
||||
|
||||
let meta = self
|
||||
.ctxt
|
||||
let meta = ctxt
|
||||
.open(metapath, OFlag::O_RDONLY)
|
||||
.then(missing_is_none)
|
||||
.and_then(|found| match found {
|
||||
@ -300,7 +324,6 @@ impl Blobstore for Glusterblob {
|
||||
}
|
||||
})
|
||||
.timed({
|
||||
cloned!(self.stats);
|
||||
move |futst, res| {
|
||||
match res {
|
||||
Ok(_) => stats
|
||||
@ -319,9 +342,11 @@ impl Blobstore for Glusterblob {
|
||||
/// for the same key, the implementation may return any `value` it's been given in response
|
||||
/// to a `get` for that `key`.
|
||||
fn put(&self, _ctx: CoreContext, key: String, value: BlobstoreBytes) -> BoxFuture<(), Error> {
|
||||
let (ctxt, stats) = self.pick_context();
|
||||
|
||||
self.create_keydir(&*key)
|
||||
.and_then({
|
||||
cloned!(self.ctxt);
|
||||
cloned!(ctxt);
|
||||
move |path| {
|
||||
let tmpfile = path.join(Self::tmpfile(&*key, "data"));
|
||||
let file = path.join(Self::keyfile(&*key));
|
||||
@ -410,7 +435,6 @@ impl Blobstore for Glusterblob {
|
||||
}
|
||||
})
|
||||
.timed({
|
||||
cloned!(self.stats);
|
||||
move |futst, res| {
|
||||
match res {
|
||||
Ok(_) => stats
|
||||
@ -434,14 +458,15 @@ impl Blobstore for Glusterblob {
|
||||
let datapath = path.join(Self::keyfile(&*key));
|
||||
let metapath = path.join(Self::metafile(&*key));
|
||||
|
||||
let check_data = self.ctxt.stat64(datapath).then(missing_is_none);
|
||||
let check_meta = self.ctxt.stat64(metapath).then(missing_is_none);
|
||||
let (ctxt, stats) = self.pick_context();
|
||||
|
||||
let check_data = ctxt.stat64(datapath).then(missing_is_none);
|
||||
let check_meta = ctxt.stat64(metapath).then(missing_is_none);
|
||||
|
||||
check_data
|
||||
.join(check_meta)
|
||||
.map(|(data, meta)| data.is_some() && meta.is_some())
|
||||
.timed({
|
||||
cloned!(self.stats);
|
||||
move |futst, res| {
|
||||
match res {
|
||||
Ok(_) => stats
|
||||
|
Loading…
Reference in New Issue
Block a user