mononoke: upload missing lfs objects

Reviewed By: krallin

Differential Revision: D24595980

fbshipit-source-id: 397930e00a75b0343ed13839783501fe3d535ccd
This commit is contained in:
Stanislau Hlebik 2020-10-28 14:07:52 -07:00 committed by Facebook GitHub Bot
parent 0cfb846cb0
commit d40f15876d
5 changed files with 135 additions and 44 deletions

View File

@ -21,6 +21,7 @@ bookmarks = { path = "../bookmarks" }
cmdlib = { path = "../cmdlib" }
context = { path = "../server/context" }
dbbookmarks = { path = "../bookmarks/dbbookmarks" }
filestore = { path = "../filestore" }
getbundle_response = { path = "../repo_client/getbundle_response" }
hgserver_config = { path = "../../../configerator/structs/scm/mononoke/hgserverconf" }
lfs_protocol = { path = "../lfs_protocol" }

View File

@ -166,6 +166,7 @@ pub enum FilenodeVerifier {
impl FilenodeVerifier {
fn verify_entries(
&self,
ctx: CoreContext,
filenode_entries: &HashMap<MPath, Vec<PreparedFilenodeEntry>>,
) -> impl Future<Item = (), Error = Error> + 'static {
match self {
@ -182,7 +183,9 @@ impl FilenodeVerifier {
})
.collect();
lfs_verifier.verify_lfs_presence(&lfs_blobs).right_future()
lfs_verifier
.ensure_lfs_presence(ctx, &lfs_blobs)
.right_future()
}
}
}
@ -261,7 +264,8 @@ fn create_bundle_impl(
};
// Check that the filenodes pass the verifier prior to serializing them.
let verify_ok = filenode_verifier.verify_entries(&prepared_filenode_entries);
let verify_ok =
filenode_verifier.verify_entries(ctx.clone(), &prepared_filenode_entries);
let filenode_entries =
create_filenodes(ctx.clone(), repo.clone(), prepared_filenode_entries).boxify();

View File

@ -7,20 +7,26 @@
use std::sync::Arc;
use anyhow::{Context, Error};
use anyhow::{anyhow, Context, Error};
use blobstore::Blobstore;
use bytes_old::Bytes as BytesOld;
use cloned::cloned;
use context::CoreContext;
use failure_ext::FutureFailureExt;
use futures_ext::{try_boxfuture, FutureExt};
use futures_old::{Future, IntoFuture, Stream};
use filestore::{fetch_stream, FetchKey};
use futures::{compat::Future01CompatExt, FutureExt, TryFutureExt};
use futures_ext::{try_boxfuture, FutureExt as OldFutureExt};
use futures_old::{future, stream, Future, IntoFuture, Stream};
use http::{status::StatusCode, uri::Uri};
use hyper::Request;
use hyper::{client::HttpConnector, Client};
use hyper::{Body, Request};
use hyper_openssl::HttpsConnector;
use slog::{info, warn};
use thiserror::Error;
use lfs_protocol::{
ObjectStatus, Operation, RequestBatch, RequestObject, ResponseBatch, Sha256 as LfsSha256,
Transfer,
ObjectAction, ObjectStatus, Operation, RequestBatch, RequestObject, ResponseBatch,
ResponseObject, Sha256 as LfsSha256, Transfer,
};
use mononoke_types::hash::Sha256;
@ -40,13 +46,14 @@ pub enum ErrorKind {
BatchRequestFailed(StatusCode),
#[error("Reading the response for a batch request failed")]
BatchRequestReadFailed,
#[error("LFS objects are missing: {0:?}")]
LfsObjectsMissing(Vec<RequestObject>),
#[error("An error ocurred receiving a response from upstream ({0}): {1}")]
UpstreamError(StatusCode, String),
}
struct LfsVerifierInner {
client: HttpsHyperClient,
batch_uri: Uri,
blobstore: Arc<dyn Blobstore>,
}
#[derive(Clone)]
@ -55,22 +62,27 @@ pub struct LfsVerifier {
}
impl LfsVerifier {
pub fn new(batch_uri: Uri) -> Result<Self, Error> {
pub fn new(batch_uri: Uri, blobstore: Arc<dyn Blobstore>) -> Result<Self, Error> {
let connector = HttpsConnector::new(4)?;
let client = Client::builder().build(connector);
let inner = LfsVerifierInner { batch_uri, client };
let inner = LfsVerifierInner {
batch_uri,
client,
blobstore,
};
Ok(Self {
inner: Arc::new(inner),
})
}
pub fn verify_lfs_presence(
pub fn ensure_lfs_presence(
&self,
ctx: CoreContext,
blobs: &[(Sha256, u64)],
) -> impl Future<Item = (), Error = Error> {
let batch = build_download_request_batch(blobs);
let batch = build_upload_request_batch(blobs);
let body: BytesOld =
try_boxfuture!(serde_json::to_vec(&batch).context(ErrorKind::SerializationFailed))
.into();
@ -82,6 +94,9 @@ impl LfsVerifier {
.context(ErrorKind::RequestCreationFailed)
);
let blobstore = self.inner.blobstore.clone();
let client = self.inner.client.clone();
self.inner
.client
.request(req)
@ -106,20 +121,33 @@ impl LfsVerifier {
.context(ErrorKind::DeserializationFailed)
.map_err(Error::from)
})
.and_then(|batch| {
.and_then(move |batch| {
let missing_objects = find_missing_objects(batch);
if missing_objects.is_empty() {
return Ok(());
return future::ok(()).boxify();
}
Err(ErrorKind::LfsObjectsMissing(missing_objects).into())
for object in &missing_objects {
warn!(ctx.logger(), "missing {:?} object, uploading", object);
}
stream::iter_ok(missing_objects)
.map(move |object| {
cloned!(ctx, client, object, blobstore);
async move { upload(ctx, client, object, blobstore).await }
.boxed()
.compat()
})
.buffer_unordered(100)
.for_each(|_| Ok(()))
.boxify()
})
.boxify()
}
}
fn build_download_request_batch(blobs: &[(Sha256, u64)]) -> RequestBatch {
fn build_upload_request_batch(blobs: &[(Sha256, u64)]) -> RequestBatch {
let objects = blobs
.iter()
.map(|(oid, size)| RequestObject {
@ -129,22 +157,76 @@ fn build_download_request_batch(blobs: &[(Sha256, u64)]) -> RequestBatch {
.collect();
RequestBatch {
operation: Operation::Download,
operation: Operation::Upload,
r#ref: None,
transfers: vec![Transfer::Basic],
objects,
}
}
fn find_missing_objects(batch: ResponseBatch) -> Vec<RequestObject> {
fn find_missing_objects(batch: ResponseBatch) -> Vec<ResponseObject> {
batch
.objects
.into_iter()
.filter_map(|object| match object.status {
ObjectStatus::Ok { ref actions, .. } if actions.contains_key(&Operation::Download) => {
ObjectStatus::Ok { ref actions, .. } if !actions.contains_key(&Operation::Upload) => {
None
}
_ => Some(object.object),
_ => Some(object),
})
.collect()
}
async fn upload(
ctx: CoreContext,
client: HttpsHyperClient,
resp_object: ResponseObject,
blobstore: Arc<dyn Blobstore>,
) -> Result<(), Error> {
match resp_object.status {
ObjectStatus::Ok { actions, .. } => match actions.get(&Operation::Upload) {
Some(action) => {
let ObjectAction { href, .. } = action;
let s = fetch_stream(
&blobstore,
ctx.clone(),
FetchKey::from(Sha256::from_byte_array(resp_object.object.oid.0)),
);
let body = Body::wrap_stream(s.map(|s| s.to_vec()));
let req = Request::put(format!("{}", href))
.header("Content-Length", &resp_object.object.size.to_string())
.body(body)?;
let res = client.request(req).compat().await?;
let (head, body) = res.into_parts();
if !head.status.is_success() {
let body = body.concat2().compat().await?;
return Err(ErrorKind::UpstreamError(
head.status,
String::from_utf8_lossy(&body).to_string(),
)
.into());
} else {
info!(
ctx.logger(),
"uploaded content for {:?}", resp_object.object
);
}
Ok(())
}
None => Err(anyhow!(
"not found upload action for {:?}",
resp_object.object
)),
},
ObjectStatus::Err { error } => Err(anyhow!(
"batch failed for {:?} {:?}",
resp_object.object,
error
)),
}
}

View File

@ -679,14 +679,9 @@ fn run(ctx: CoreContext, matches: ArgMatches<'static>) -> BoxFuture<(), Error> {
let lfs_params = repo_config.lfs.clone();
let filenode_verifier = match matches.value_of("verify-lfs-blob-presence") {
Some(uri) => {
let uri = try_boxfuture!(uri.parse::<Uri>());
let verifier = try_boxfuture!(LfsVerifier::new(uri));
FilenodeVerifier::LfsVerifier(verifier)
}
None => FilenodeVerifier::NoopVerifier,
};
let verify_lfs_blob_presence = matches
.value_of("verify-lfs-blob-presence")
.map(|s| s.to_string());
let hgsql_use_sqlite = matches.is_present(HGSQL_GLOBALREVS_USE_SQLITE);
let hgsql_db_addr = matches
@ -699,6 +694,16 @@ fn run(ctx: CoreContext, matches: ArgMatches<'static>) -> BoxFuture<(), Error> {
let maybe_skiplist_blobstore_key = repo_config.skiplist_index_blobstore_key.clone();
let hgsql_globalrevs_name = repo_config.hgsql_globalrevs_name.clone();
move |repo| {
let filenode_verifier = match verify_lfs_blob_presence {
Some(uri) => {
let uri = try_boxfuture!(uri.parse::<Uri>());
let verifier =
try_boxfuture!(LfsVerifier::new(uri, Arc::new(repo.get_blobstore())));
FilenodeVerifier::LfsVerifier(verifier)
}
None => FilenodeVerifier::NoopVerifier,
};
let overlay = list_hg_server_bookmarks(hg_repo_path.clone())
.and_then({
cloned!(ctx, repo);
@ -759,7 +764,10 @@ fn run(ctx: CoreContext, matches: ArgMatches<'static>) -> BoxFuture<(), Error> {
.boxed()
.compat();
preparer.map(Arc::new).join3(overlay, globalrev_syncer)
preparer
.map(Arc::new)
.join3(overlay, globalrev_syncer)
.boxify()
}
});

View File

@ -85,18 +85,14 @@ Make client repo
$ cd "$TESTTMP"
Two missing blobs: it fails
$ mononoke_hg_sync repo-hg 1 --generate-bundles --verify-lfs-blob-presence "${lfs_uri_other}/objects/batch" 2>&1 | grep 'objects are missing'
* LFS objects are missing: * (glob)
Two missing blobs that were uploaded
$ mononoke_hg_sync repo-hg 1 --generate-bundles --verify-lfs-blob-presence "${lfs_uri_other}/objects/batch" 2>&1 | grep missing
* missing * object, uploading (glob)
* missing * object, uploading (glob)
One missing blob: it still fails
$ hg debuglfssend "$lfs_uri_other" < client-push/long
c12949887b7d8c46e9fcc5d9cd4bd884de33c1d00e24d7ac56ed9200e07f31a1 40
$ mononoke_hg_sync repo-hg 1 --generate-bundles --verify-lfs-blob-presence "${lfs_uri_other}/objects/batch" 2>&1 | grep 'objects are missing'
* LFS objects are missing: [RequestObject { oid: Sha256(aac24ec70120b177274d359073212777a40780e2874b120a0f210096e55cfa5f), size: 40 }] (glob)
Check that they were uploaded
$ hg debuglfsreceive c12949887b7d8c46e9fcc5d9cd4bd884de33c1d00e24d7ac56ed9200e07f31a1 0 "${lfs_uri_other}" > "$TESTTMP/long"
$ cmp "$TESTTMP/long" client-push/long
Zero missing blobs: it succeeds
$ hg debuglfssend "$lfs_uri_other" < client-push/long2
aac24ec70120b177274d359073212777a40780e2874b120a0f210096e55cfa5f 40
$ mononoke_hg_sync repo-hg 1 --generate-bundles --verify-lfs-blob-presence "${lfs_uri_other}/objects/batch" 2>&1 | grep 'successful sync'
* successful sync of entries [2] (glob)
$ hg debuglfsreceive aac24ec70120b177274d359073212777a40780e2874b120a0f210096e55cfa5f 0 "${lfs_uri_other}" > "$TESTTMP/long2"
$ cmp "$TESTTMP/long2" client-push/long2