mirror of
https://github.com/facebook/sapling.git
synced 2024-10-12 01:39:21 +03:00
edenapi: add method to get file history
Summary: Add a `get_history()` method to the `EdenApi` trait that queries the API server's `getfilehistory` endpoint and writes the resulting history entries to a historypack in the user's cache. Differential Revision: D14223269 fbshipit-source-id: bf69c767f5a89177c36e755250330dbbbc219e4f
This commit is contained in:
parent
0fc20f828d
commit
6181da2178
@ -11,8 +11,10 @@ hyper = "0.12.14"
|
||||
hyper-rustls = "0.15.0"
|
||||
lazy_static = "1.2.0"
|
||||
log = "0.4.6"
|
||||
percent-encoding = "1.0.1"
|
||||
revisionstore = { path = "../revisionstore" }
|
||||
rustls = "0.14.0"
|
||||
serde_json = "1.0.38"
|
||||
tokio = "0.1.11"
|
||||
types = { path = "../types" }
|
||||
url = "1.7.2"
|
||||
|
@ -5,16 +5,20 @@ use std::{
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use bytes::Bytes;
|
||||
use bytes::{Buf, Bytes, IntoBuf};
|
||||
use failure::{ensure, Error, Fallible};
|
||||
use futures::{stream::futures_unordered, Future, IntoFuture, Stream};
|
||||
use futures::{stream, Future, IntoFuture, Stream};
|
||||
use hyper::Chunk;
|
||||
use log::debug;
|
||||
use percent_encoding::{percent_encode, DEFAULT_ENCODE_SET};
|
||||
use serde_json::Deserializer;
|
||||
use tokio::runtime::Runtime;
|
||||
use url::Url;
|
||||
|
||||
use revisionstore::{DataPackVersion, Delta, Metadata, MutableDataPack, MutablePack};
|
||||
use types::Key;
|
||||
use revisionstore::{
|
||||
DataPackVersion, Delta, HistoryPackVersion, Metadata, MutableDataPack, MutableHistoryPack,
|
||||
MutablePack,
|
||||
};
|
||||
use types::{Key, PackHistoryEntry};
|
||||
use url_ext::UrlExt;
|
||||
|
||||
use crate::client::{EdenApiHttpClient, HyperClient};
|
||||
@ -22,11 +26,17 @@ use crate::client::{EdenApiHttpClient, HyperClient};
|
||||
mod paths {
|
||||
pub const HEALTH_CHECK: &str = "/health_check";
|
||||
pub const GET_FILE: &str = "gethgfile/";
|
||||
pub const GET_HISTORY: &str = "getfilehistory/";
|
||||
}
|
||||
|
||||
pub trait EdenApi {
|
||||
fn health_check(&self) -> Fallible<()>;
|
||||
fn get_files(&self, keys: impl IntoIterator<Item = Key>) -> Fallible<PathBuf>;
|
||||
fn get_history(
|
||||
&self,
|
||||
keys: impl IntoIterator<Item = Key>,
|
||||
max_depth: Option<u32>,
|
||||
) -> Fallible<PathBuf>;
|
||||
}
|
||||
|
||||
impl EdenApi for EdenApiHttpClient {
|
||||
@ -37,7 +47,7 @@ impl EdenApi for EdenApiHttpClient {
|
||||
let url = self.base_url.join(paths::HEALTH_CHECK)?.to_uri();
|
||||
|
||||
let fut = self.client.get(url).map_err(Error::from).and_then(|res| {
|
||||
debug!("Received response: {:#?}", &res);
|
||||
log::debug!("Received response: {:#?}", &res);
|
||||
let status = res.status();
|
||||
res.into_body()
|
||||
.concat2()
|
||||
@ -67,23 +77,40 @@ impl EdenApi for EdenApiHttpClient {
|
||||
let client = Arc::clone(&self.client);
|
||||
let prefix = self.repo_base_url()?.join(paths::GET_FILE)?;
|
||||
|
||||
// Construct an iterator of Futures, each representing an individual
|
||||
// getfile request.
|
||||
let get_file_futures = keys
|
||||
.into_iter()
|
||||
.map(move |key| get_file(&client, &prefix, key));
|
||||
|
||||
// Construct a Future that executes the getfiles requests concurrently,
|
||||
// returned the results in a Vec in arbitrary order.
|
||||
let work = futures_unordered(get_file_futures).collect();
|
||||
let work = stream::futures_unordered(get_file_futures).collect();
|
||||
|
||||
// Run the Futures.
|
||||
let mut runtime = Runtime::new()?;
|
||||
let files = runtime.block_on(work)?;
|
||||
|
||||
// Write the downloaded file content to disk.
|
||||
write_datapack(self.pack_cache_path(), files)
|
||||
}
|
||||
|
||||
/// Fetch the history of the specified file from the API server and write
|
||||
/// it to a historypack in the configured cache directory. Returns the path
|
||||
/// of the resulting packfile.
|
||||
fn get_history(
|
||||
&self,
|
||||
keys: impl IntoIterator<Item = Key>,
|
||||
max_depth: Option<u32>,
|
||||
) -> Fallible<PathBuf> {
|
||||
let client = Arc::clone(&self.client);
|
||||
let prefix = self.repo_base_url()?.join(paths::GET_HISTORY)?;
|
||||
|
||||
let get_history_futures = keys
|
||||
.into_iter()
|
||||
.map(move |key| get_history(&client, &prefix, key, max_depth).collect());
|
||||
|
||||
let work = stream::futures_unordered(get_history_futures).collect();
|
||||
|
||||
let mut runtime = Runtime::new()?;
|
||||
let entries = runtime.block_on(work)?.into_iter().flatten();
|
||||
|
||||
write_historypack(self.pack_cache_path(), entries)
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch an individual file from the API server by Key.
|
||||
@ -92,6 +119,7 @@ fn get_file(
|
||||
url_prefix: &Url,
|
||||
key: Key,
|
||||
) -> impl Future<Item = (Key, Bytes), Error = Error> {
|
||||
log::debug!("Fetching file content for key: {:#?}", &key);
|
||||
let filenode = key.node().to_hex();
|
||||
url_prefix
|
||||
.join(&filenode)
|
||||
@ -121,7 +149,59 @@ fn get_file(
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates a new datapack in the given directory, and populates it with the file
|
||||
/// Fetch the history of an individual file from the API server by Key.
|
||||
fn get_history(
|
||||
client: &Arc<HyperClient>,
|
||||
url_prefix: &Url,
|
||||
key: Key,
|
||||
max_depth: Option<u32>,
|
||||
) -> impl Stream<Item = PackHistoryEntry, Error = Error> {
|
||||
log::debug!("Fetching history for key: {:#?}", &key);
|
||||
let filenode = key.node().to_hex();
|
||||
let filename = url_encode(&key.name());
|
||||
url_prefix
|
||||
.join(&format!("{}/", &filenode))
|
||||
.into_future()
|
||||
.and_then(move |url| url.join(&filename))
|
||||
.map(move |mut url| {
|
||||
if let Some(depth) = max_depth {
|
||||
url.query_pairs_mut()
|
||||
.append_pair("depth", &depth.to_string());
|
||||
}
|
||||
url
|
||||
})
|
||||
.from_err()
|
||||
.and_then({
|
||||
let client = Arc::clone(client);
|
||||
move |url| client.get(url.to_uri()).from_err()
|
||||
})
|
||||
.and_then(|res| {
|
||||
let status = res.status();
|
||||
res.into_body()
|
||||
.concat2()
|
||||
.from_err()
|
||||
.map(|body: Chunk| body.into_bytes())
|
||||
.and_then(move |body: Bytes| {
|
||||
// If we got an error, intepret the body as an error
|
||||
// message and fail the Future.
|
||||
ensure!(
|
||||
status.is_success(),
|
||||
"Request failed (status code: {:?}): {:?}",
|
||||
&status,
|
||||
String::from_utf8_lossy(&body).into_owned(),
|
||||
);
|
||||
Ok(body)
|
||||
})
|
||||
})
|
||||
.map(move |body: Bytes| {
|
||||
let entries = Deserializer::from_reader(body.into_buf().reader()).into_iter();
|
||||
stream::iter_result(entries).from_err()
|
||||
})
|
||||
.flatten_stream()
|
||||
.map(move |entry| PackHistoryEntry::from_loose(entry, key.name().to_vec()))
|
||||
}
|
||||
|
||||
/// Create a new datapack in the given directory, and populate it with the file
|
||||
/// contents provided by the given iterator. Each Delta written to the datapack is
|
||||
/// assumed to contain the full text of the corresponding file, and as a result the
|
||||
/// base revision for each file is always specified as None.
|
||||
@ -129,7 +209,7 @@ fn write_datapack(
|
||||
pack_dir: impl AsRef<Path>,
|
||||
files: impl IntoIterator<Item = (Key, Bytes)>,
|
||||
) -> Fallible<PathBuf> {
|
||||
let mut datapack = MutableDataPack::new(pack_dir.as_ref(), DataPackVersion::One)?;
|
||||
let mut datapack = MutableDataPack::new(pack_dir, DataPackVersion::One)?;
|
||||
for (key, data) in files {
|
||||
let metadata = Metadata {
|
||||
size: Some(data.len() as u64),
|
||||
@ -144,3 +224,20 @@ fn write_datapack(
|
||||
}
|
||||
datapack.close()
|
||||
}
|
||||
|
||||
/// Create a new historypack in the given directory, and populate it
|
||||
/// with the given history entries.
|
||||
fn write_historypack(
|
||||
pack_dir: impl AsRef<Path>,
|
||||
entries: impl IntoIterator<Item = PackHistoryEntry>,
|
||||
) -> Fallible<PathBuf> {
|
||||
let mut historypack = MutableHistoryPack::new(pack_dir, HistoryPackVersion::One)?;
|
||||
for entry in entries {
|
||||
historypack.add_entry(&entry)?;
|
||||
}
|
||||
historypack.close()
|
||||
}
|
||||
|
||||
fn url_encode(bytes: &[u8]) -> String {
|
||||
percent_encode(bytes, DEFAULT_ENCODE_SET).to_string()
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ pub(crate) type HyperClient = Client<HttpsConnector<HttpConnector>, Body>;
|
||||
/// An HTTP client for the Eden API.
|
||||
///
|
||||
/// # Example
|
||||
/// ```
|
||||
/// ```rust,ignore
|
||||
/// use failure::Fallible;
|
||||
/// use edenapi::{EdenApi, EdenApiHttpClient};
|
||||
///
|
||||
|
Loading…
Reference in New Issue
Block a user