workers: bulk fetch data in worker thread

Summary:
During an hg update we first prefetch all the data, then write all the
data to disk. There are cases where the prefetched data is not available during
the writing phase, in which case we fall back to fetching the files one-by-one.
This has truly atrocious performance.

Let's allow the worker threads to check for missing data then do bulk fetching
of it. In the case where the cache was completely lost for some reason, this
would reduce the number of serial fetches by 100x.

Note, the background workers already spawn their own ssh connection's, so
they're already getting some level of parallelism even when they're doing 1-by-1
fetching. That's why we aren't seeing a 100x improvement in performance.

Reviewed By: xavierd

Differential Revision: D23766424

fbshipit-source-id: d88a1e55b1c21e9cea7e50fc6dbfd8a27bd97bb0
This commit is contained in:
Durham Goode 2020-09-21 11:23:01 -07:00 committed by Facebook GitHub Bot
parent 7ae215cb32
commit 63d19e1eca

View File

@ -23,8 +23,11 @@ use crossbeam::channel::{bounded, Receiver, Sender};
use cpython_ext::{ExtractInner, PyNone, PyPath, PyPathBuf, ResultPyErrExt, Str};
use pyrevisionstore::contentstore;
use revisionstore::{ContentStore, HgIdDataStore, StoreKey, StoreResult};
use types::{HgId, Key, RepoPath, RepoPathBuf};
use revisionstore::{
datastore::RemoteDataStore, localstore::LocalStore, ContentStore, HgIdDataStore, StoreKey,
StoreResult,
};
use types::{HgId, Key, RepoPathBuf};
use vfs::{UpdateFlag, VFS};
pub fn init_module(py: Python, package: &str) -> PyResult<PyModule> {
@ -141,20 +144,14 @@ fn censor_if_needed(data: Bytes) -> Bytes {
}
}
/// Fetch the content of the passed in `hgid` and write it to `path`.
fn update(
state: &WriterState,
path: &RepoPath,
hgid: HgId,
flag: Option<UpdateFlag>,
) -> Result<usize> {
let key = Key::new(path.to_owned(), hgid);
/// Fetch the content of the passed in `key` and write it to it's path.
fn update(state: &WriterState, key: Key, flag: Option<UpdateFlag>) -> Result<usize> {
let content = state
.store
.get_file_content(&key)?
.ok_or_else(|| format_err!("Can't find key: {}", key))?;
let meta = match state.store.get_meta(StoreKey::hgid(key))? {
let meta = match state.store.get_meta(StoreKey::hgid(key.clone()))? {
StoreResult::NotFound(key) => {
return Err(format_err!("Can't find metadata for key: {:?}", key));
}
@ -168,18 +165,21 @@ fn update(
let content = censor_if_needed(content);
// Fast path: let's try to open the file directly, we'll handle the failure only if this fails.
match state.working_copy.write(path, &content, flag) {
match state.working_copy.write(&key.path, &content, flag) {
Ok(size) => Ok(size),
Err(e) => {
// Ideally, we shouldn't need to retry for some failures, but this is the slow path, any
// failures not due to a conflicting file would show up here again, so let's not worry
// about it.
state.working_copy.clear_conflicts(path).with_context(|| {
format!("Can't clear conflicts after handling error \"{:?}\"", e)
})?;
state
.working_copy
.write(path, &content, flag)
.clear_conflicts(&key.path)
.with_context(|| {
format!("Can't clear conflicts after handling error \"{:?}\"", e)
})?;
state
.working_copy
.write(&key.path, &content, flag)
.with_context(|| format!("Can't write after handling error \"{:?}\"", e))
}
}
@ -187,21 +187,36 @@ fn update(
fn threaded_writer(
state: WriterState,
chan: Receiver<Vec<(RepoPathBuf, HgId, Option<UpdateFlag>)>>,
chan: Receiver<Vec<(Key, Option<UpdateFlag>)>>,
) -> (usize, Vec<(RepoPathBuf, Option<UpdateFlag>)>) {
let mut failures = Vec::new();
let mut written = 0;
while let Ok(vec) = chan.recv() {
for (path, hgid, flag) in vec.into_iter() {
let res = update(&state, path.as_repo_path(), hgid, flag)
.with_context(|| format!("Can't update {} at {}", path, hgid));
let store_keys: Vec<_> = vec.iter().map(|(k, _)| StoreKey::hgid(k.clone())).collect();
let missing = match state.store.get_missing(&store_keys) {
Ok(missing) => missing,
Err(e) => {
tracing::warn!("{:?}", e);
let failed_inputs: Vec<_> = vec.into_iter().map(|(k, f)| (k.path, f)).collect();
failures.extend_from_slice(&failed_inputs);
continue;
}
};
if !missing.is_empty() {
// Any errors will get reported below.
let _ = state.store.prefetch(&missing);
}
for (key, flag) in vec.into_iter() {
let res = update(&state, key.clone(), flag)
.with_context(|| format!("Can't update {} at {}", key.path, key.hgid));
match res {
Ok(count) => written += count,
Err(e) => {
tracing::warn!("{:?}", e);
failures.push((path, flag));
failures.push((key.path, flag));
}
};
}
@ -227,7 +242,7 @@ impl WriterState {
}
py_class!(class writerworker |py| {
data inner: RefCell<Option<Worker<(usize, Vec<(RepoPathBuf, Option<UpdateFlag>)>), (RepoPathBuf, HgId, Option<UpdateFlag>)>>>;
data inner: RefCell<Option<Worker<(usize, Vec<(RepoPathBuf, Option<UpdateFlag>)>), (Key, Option<UpdateFlag>)>>>;
def __new__(_cls, contentstore: contentstore, root: &PyPath, numthreads: usize) -> PyResult<writerworker> {
let store = contentstore.extract_inner(py);
@ -257,7 +272,7 @@ py_class!(class writerworker |py| {
return Err(format_err!("Unknown flags: {}", flags)).map_pyerr(py);
};
self.inner(py).borrow_mut().as_mut().unwrap().push_work(py, (path, node, flags)).map_pyerr(py)?;
self.inner(py).borrow_mut().as_mut().unwrap().push_work(py, (Key::new(path, node), flags)).map_pyerr(py)?;
Ok(PyNone)
}
@ -371,7 +386,7 @@ mod tests {
datastore::{Delta, HgIdMutableDeltaStore},
testutil::make_config,
};
use types::testutil::key;
use types::{testutil::key, RepoPath};
#[test]
fn test_update_basic() -> Result<()> {
@ -391,7 +406,7 @@ mod tests {
let root = workingdir.as_ref().to_path_buf();
let state = WriterState::new(root, store)?;
let written = update(&state, &k.path, k.hgid.clone(), None)?;
let written = update(&state, k, None)?;
assert_eq!(written, 7);
@ -416,12 +431,7 @@ mod tests {
let root = workingdir.as_ref().to_path_buf();
let state = WriterState::new(root, store)?;
update(
&state,
&k.path,
k.hgid.clone(),
Some(UpdateFlag::Executable),
)?;
update(&state, k, Some(UpdateFlag::Executable))?;
let mut file = workingdir.as_ref().to_path_buf();
file.push("a");
@ -454,7 +464,7 @@ mod tests {
let root = workingdir.as_ref().to_path_buf();
let state = WriterState::new(root, store)?;
update(&state, &k.path, k.hgid.clone(), Some(UpdateFlag::Symlink))?;
update(&state, k, Some(UpdateFlag::Symlink))?;
let mut file = workingdir.as_ref().to_path_buf();
file.push("a");
@ -488,7 +498,7 @@ mod tests {
let root = workingdir.as_ref().to_path_buf();
let state = WriterState::new(root, store)?;
let written = update(&state, &k.path, k.hgid.clone(), None)?;
let written = update(&state, k, None)?;
assert_eq!(written, 7);
Ok(())
@ -518,7 +528,7 @@ mod tests {
let root = workingdir.as_ref().to_path_buf();
let state = WriterState::new(root, store)?;
let written = update(&state, &k.path, k.hgid.clone(), None)?;
let written = update(&state, k, None)?;
assert_eq!(written, 7);
let mut path = workingdir.as_ref().to_path_buf();
@ -554,7 +564,7 @@ mod tests {
let root = workingdir.as_ref().to_path_buf();
let state = WriterState::new(root, store)?;
assert!(update(&state, &k.path, k.hgid.clone(), None).is_err());
assert!(update(&state, k, None).is_err());
Ok(())
}
@ -581,7 +591,7 @@ mod tests {
let root = workingdir.as_ref().to_path_buf();
let state = WriterState::new(root, store)?;
let written = update(&state, &k.path, k.hgid.clone(), None)?;
let written = update(&state, k, None)?;
assert_eq!(written, 7);
let mut path = workingdir.as_ref().to_path_buf();
@ -803,7 +813,7 @@ mod tests {
let mut written_size = 0;
for key in keys.iter() {
written_size += update(&state, &key.path, key.hgid.clone(), None)?;
written_size += update(&state, key.clone(), None)?;
}
for key in keys.iter() {
@ -867,7 +877,7 @@ mod tests {
let state = WriterState::new(root, store)?;
for key in keys.iter() {
update(&state, &key.path, key.hgid.clone(), None)?;
update(&state, key.clone(), None)?;
}
let root = workingdir.as_ref().to_path_buf();