mirror of
https://github.com/facebook/sapling.git
synced 2024-12-25 22:11:52 +03:00
lfs: add client support for received compressed responses
Summary: As it says in the title, this adds support for receiving compressed responses in the revisionstore LFS client. This is controlled by a flag, which I'll roll out through dynamicconfig. The hope is that this should greatly improve our throughput to corp, where our bandwidth is fairly scarce. Reviewed By: StanislavGlebik Differential Revision: D23652306 fbshipit-source-id: 53bf86d194657564bc3bd532e1a62208d39666df
This commit is contained in:
parent
acc0d428cc
commit
d7081f6aba
@ -0,0 +1,83 @@
|
||||
# Copyright (c) Facebook, Inc. and its affiliates.
|
||||
#
|
||||
# This software may be used and distributed according to the terms of the
|
||||
# GNU General Public License found in the LICENSE file in the root
|
||||
# directory of this source tree.
|
||||
|
||||
$ . "${TEST_FIXTURES}/library.sh"
|
||||
|
||||
Setup a Mononoke repo.
|
||||
|
||||
$ LFS_THRESHOLD="10" setup_common_config "blob_files"
|
||||
$ cd "$TESTTMP"
|
||||
|
||||
Start Mononoke & LFS.
|
||||
|
||||
$ mononoke
|
||||
$ wait_for_mononoke
|
||||
$ lfs_url="$(lfs_server --scuba-log-file "$TESTTMP/scuba.json")/repo"
|
||||
|
||||
Create a repo. Add a large file. Make it actually large to make sure we surface
|
||||
any block size boundaries or such.
|
||||
|
||||
$ hgmn_init repo
|
||||
$ cd repo
|
||||
$ yes 2>/dev/null | head -c 2MiB > large
|
||||
$ hg add large
|
||||
$ hg ci -ma
|
||||
$ hgmn push -q --to master --create
|
||||
$ cd "$TESTTMP"
|
||||
|
||||
Clone the repo. Take a unique cache path to go to the server, and turn off compression.
|
||||
|
||||
$ cd "$TESTTMP"
|
||||
$ hgmn_clone ssh://user@dummy/repo repo2 --noupdate --config extensions.remotenames=
|
||||
$ cd repo2
|
||||
$ setup_hg_modern_lfs "$lfs_url" 10B
|
||||
$ setconfig "remotefilelog.cachepath=$TESTTMP/cachepath2"
|
||||
$ setconfig "lfs.accept-zstd=False"
|
||||
|
||||
Update. Check for compression. It shouldn't be used.
|
||||
|
||||
$ hgmn up master -q
|
||||
$ sha256sum large
|
||||
76903e148255cbd5ba91d3f47fe04759afcffdf64104977fc83f688892ac0dfd large
|
||||
|
||||
$ wait_for_json_record_count "$TESTTMP/scuba.json" 2
|
||||
$ jq .int.response_content_length < "$TESTTMP/scuba.json"
|
||||
280
|
||||
2097152
|
||||
$ jq .int.response_bytes_sent < "$TESTTMP/scuba.json"
|
||||
280
|
||||
2097152
|
||||
$ jq .normal.response_content_encoding < "$TESTTMP/scuba.json"
|
||||
null
|
||||
null
|
||||
$ truncate -s 0 "$TESTTMP/scuba.json"
|
||||
|
||||
Clone again. This time, enable compression
|
||||
|
||||
$ cd "$TESTTMP"
|
||||
$ hgmn_clone ssh://user@dummy/repo repo3 --noupdate --config extensions.remotenames=
|
||||
$ cd repo3
|
||||
$ setup_hg_modern_lfs "$lfs_url" 10B
|
||||
$ setconfig "remotefilelog.cachepath=$TESTTMP/cachepath3"
|
||||
$ setconfig "lfs.accept-zstd=True"
|
||||
|
||||
Update again. This time, we should have compression.
|
||||
|
||||
$ hgmn up master -q
|
||||
$ sha256sum large
|
||||
76903e148255cbd5ba91d3f47fe04759afcffdf64104977fc83f688892ac0dfd large
|
||||
|
||||
$ wait_for_json_record_count "$TESTTMP/scuba.json" 2
|
||||
$ jq .int.response_content_length < "$TESTTMP/scuba.json"
|
||||
280
|
||||
null
|
||||
$ jq .int.response_bytes_sent < "$TESTTMP/scuba.json"
|
||||
280
|
||||
202
|
||||
$ jq .normal.response_content_encoding < "$TESTTMP/scuba.json"
|
||||
null
|
||||
"zstd"
|
||||
$ truncate -s 0 "$TESTTMP/scuba.json"
|
@ -48,6 +48,7 @@ tracing = "0.1"
|
||||
types = { path = "../types" }
|
||||
url = "2.1.0"
|
||||
util = { path = "../util" }
|
||||
zstd = { version = "0.5" }
|
||||
|
||||
[dev-dependencies]
|
||||
maplit = "1.0"
|
||||
|
@ -10,7 +10,7 @@ use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
convert::TryInto,
|
||||
fs::File,
|
||||
io::{ErrorKind, Read, Write},
|
||||
io::{Cursor, ErrorKind, Read, Write},
|
||||
iter, mem,
|
||||
ops::Range,
|
||||
path::{Path, PathBuf},
|
||||
@ -97,6 +97,7 @@ struct HttpLfsRemote {
|
||||
backoff_times: Vec<f32>,
|
||||
request_timeout: Duration,
|
||||
client: HttpClient,
|
||||
accept_zstd: bool,
|
||||
}
|
||||
|
||||
enum LfsRemoteInner {
|
||||
@ -906,6 +907,7 @@ impl LfsRemoteInner {
|
||||
backoff_times: Vec<f32>,
|
||||
request_timeout: Duration,
|
||||
add_extra: impl Fn(Request) -> Request,
|
||||
accept_zstd: bool,
|
||||
) -> Result<Option<Bytes>> {
|
||||
let mut backoff = backoff_times.into_iter();
|
||||
let mut rng = thread_rng();
|
||||
@ -916,6 +918,10 @@ impl LfsRemoteInner {
|
||||
.header("Content-Type", "application/vnd.git-lfs+json")
|
||||
.header("User-Agent", &user_agent);
|
||||
|
||||
if accept_zstd {
|
||||
req = req.header("Accept-Encoding", "zstd");
|
||||
}
|
||||
|
||||
if let Some(auth) = &auth {
|
||||
if let (Some(cert), Some(key)) = (&auth.cert, &auth.key) {
|
||||
req = req.creds(cert, key)?;
|
||||
@ -942,6 +948,8 @@ impl LfsRemoteInner {
|
||||
};
|
||||
|
||||
let status = reply.status;
|
||||
let headers = reply.headers;
|
||||
|
||||
if status.is_success() {
|
||||
let mut body = reply.body;
|
||||
let mut chunks: Vec<Vec<u8>> = vec![];
|
||||
@ -953,7 +961,23 @@ impl LfsRemoteInner {
|
||||
for chunk in chunks.into_iter() {
|
||||
result.extend_from_slice(&chunk);
|
||||
}
|
||||
return Ok(Some(result.freeze()));
|
||||
let result = result.freeze();
|
||||
|
||||
let content_encoding = headers.get("Content-Encoding");
|
||||
|
||||
let result = match content_encoding
|
||||
.map(|c| std::str::from_utf8(c.as_bytes()))
|
||||
.transpose()
|
||||
.with_context(|| format!("Invalid Content-Encoding: {:?}", content_encoding))?
|
||||
{
|
||||
Some("identity") | None => result,
|
||||
Some("zstd") => Bytes::from(zstd::stream::decode_all(Cursor::new(&result))?),
|
||||
Some(other) => {
|
||||
return Err(format_err!("Unsupported Content-Encoding: {}", other));
|
||||
}
|
||||
};
|
||||
|
||||
return Ok(Some(result));
|
||||
}
|
||||
|
||||
if status.is_server_error() {
|
||||
@ -1012,6 +1036,7 @@ impl LfsRemoteInner {
|
||||
http.backoff_times.clone(),
|
||||
http.request_timeout,
|
||||
move |builder| builder.body(batch_json.clone()),
|
||||
http.accept_zstd,
|
||||
)
|
||||
.await
|
||||
};
|
||||
@ -1036,6 +1061,7 @@ impl LfsRemoteInner {
|
||||
oid: Sha256,
|
||||
read_from_store: impl Fn(Sha256) -> Result<Option<Bytes>> + Send + 'static,
|
||||
write_to_store: impl Fn(Sha256, Bytes) -> Result<()> + Send + 'static,
|
||||
accept_zstd: bool,
|
||||
) -> Result<()> {
|
||||
let body = if op == Operation::Upload {
|
||||
spawn_blocking(move || read_from_store(oid)).await??
|
||||
@ -1070,6 +1096,7 @@ impl LfsRemoteInner {
|
||||
builder.header("Content-Length", 0)
|
||||
}
|
||||
},
|
||||
accept_zstd,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@ -1132,6 +1159,7 @@ impl LfsRemoteInner {
|
||||
oid,
|
||||
read_from_store,
|
||||
write_to_store,
|
||||
http.accept_zstd,
|
||||
)
|
||||
};
|
||||
|
||||
@ -1226,6 +1254,8 @@ impl LfsRemote {
|
||||
let request_timeout =
|
||||
Duration::from_millis(config.get_or("lfs", "requesttimeout", || 10_000)?);
|
||||
|
||||
let accept_zstd = config.get_or("lfs", "accept-zstd", || true)?;
|
||||
|
||||
let client = http_client("lfs");
|
||||
|
||||
Ok(Self {
|
||||
@ -1240,6 +1270,7 @@ impl LfsRemote {
|
||||
backoff_times,
|
||||
request_timeout,
|
||||
client,
|
||||
accept_zstd,
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user