diff --git a/eden/mononoke/mononoke_hg_sync_job/Cargo.toml b/eden/mononoke/mononoke_hg_sync_job/Cargo.toml index 523fb26873..46786d2485 100644 --- a/eden/mononoke/mononoke_hg_sync_job/Cargo.toml +++ b/eden/mononoke/mononoke_hg_sync_job/Cargo.toml @@ -21,6 +21,7 @@ bookmarks = { path = "../bookmarks" } cmdlib = { path = "../cmdlib" } context = { path = "../server/context" } dbbookmarks = { path = "../bookmarks/dbbookmarks" } +filestore = { path = "../filestore" } getbundle_response = { path = "../repo_client/getbundle_response" } hgserver_config = { path = "../../../configerator/structs/scm/mononoke/hgserverconf" } lfs_protocol = { path = "../lfs_protocol" } diff --git a/eden/mononoke/mononoke_hg_sync_job/src/bundle_generator.rs b/eden/mononoke/mononoke_hg_sync_job/src/bundle_generator.rs index c176572e5a..96c4054076 100644 --- a/eden/mononoke/mononoke_hg_sync_job/src/bundle_generator.rs +++ b/eden/mononoke/mononoke_hg_sync_job/src/bundle_generator.rs @@ -166,6 +166,7 @@ pub enum FilenodeVerifier { impl FilenodeVerifier { fn verify_entries( &self, + ctx: CoreContext, filenode_entries: &HashMap>, ) -> impl Future + 'static { match self { @@ -182,7 +183,9 @@ impl FilenodeVerifier { }) .collect(); - lfs_verifier.verify_lfs_presence(&lfs_blobs).right_future() + lfs_verifier + .ensure_lfs_presence(ctx, &lfs_blobs) + .right_future() } } } @@ -261,7 +264,8 @@ fn create_bundle_impl( }; // Check that the filenodes pass the verifier prior to serializing them. - let verify_ok = filenode_verifier.verify_entries(&prepared_filenode_entries); + let verify_ok = + filenode_verifier.verify_entries(ctx.clone(), &prepared_filenode_entries); let filenode_entries = create_filenodes(ctx.clone(), repo.clone(), prepared_filenode_entries).boxify(); diff --git a/eden/mononoke/mononoke_hg_sync_job/src/lfs_verifier.rs b/eden/mononoke/mononoke_hg_sync_job/src/lfs_verifier.rs index 1b0f9cf49b..3314904cbd 100644 --- a/eden/mononoke/mononoke_hg_sync_job/src/lfs_verifier.rs +++ b/eden/mononoke/mononoke_hg_sync_job/src/lfs_verifier.rs @@ -7,20 +7,26 @@ use std::sync::Arc; -use anyhow::{Context, Error}; +use anyhow::{anyhow, Context, Error}; +use blobstore::Blobstore; use bytes_old::Bytes as BytesOld; +use cloned::cloned; +use context::CoreContext; use failure_ext::FutureFailureExt; -use futures_ext::{try_boxfuture, FutureExt}; -use futures_old::{Future, IntoFuture, Stream}; +use filestore::{fetch_stream, FetchKey}; +use futures::{compat::Future01CompatExt, FutureExt, TryFutureExt}; +use futures_ext::{try_boxfuture, FutureExt as OldFutureExt}; +use futures_old::{future, stream, Future, IntoFuture, Stream}; use http::{status::StatusCode, uri::Uri}; -use hyper::Request; use hyper::{client::HttpConnector, Client}; +use hyper::{Body, Request}; use hyper_openssl::HttpsConnector; +use slog::{info, warn}; use thiserror::Error; use lfs_protocol::{ - ObjectStatus, Operation, RequestBatch, RequestObject, ResponseBatch, Sha256 as LfsSha256, - Transfer, + ObjectAction, ObjectStatus, Operation, RequestBatch, RequestObject, ResponseBatch, + ResponseObject, Sha256 as LfsSha256, Transfer, }; use mononoke_types::hash::Sha256; @@ -40,13 +46,14 @@ pub enum ErrorKind { BatchRequestFailed(StatusCode), #[error("Reading the response for a batch request failed")] BatchRequestReadFailed, - #[error("LFS objects are missing: {0:?}")] - LfsObjectsMissing(Vec), + #[error("An error ocurred receiving a response from upstream ({0}): {1}")] + UpstreamError(StatusCode, String), } struct LfsVerifierInner { client: HttpsHyperClient, batch_uri: Uri, + blobstore: Arc, } #[derive(Clone)] @@ -55,22 +62,27 @@ pub struct LfsVerifier { } impl LfsVerifier { - pub fn new(batch_uri: Uri) -> Result { + pub fn new(batch_uri: Uri, blobstore: Arc) -> Result { let connector = HttpsConnector::new(4)?; let client = Client::builder().build(connector); - let inner = LfsVerifierInner { batch_uri, client }; + let inner = LfsVerifierInner { + batch_uri, + client, + blobstore, + }; Ok(Self { inner: Arc::new(inner), }) } - pub fn verify_lfs_presence( + pub fn ensure_lfs_presence( &self, + ctx: CoreContext, blobs: &[(Sha256, u64)], ) -> impl Future { - let batch = build_download_request_batch(blobs); + let batch = build_upload_request_batch(blobs); let body: BytesOld = try_boxfuture!(serde_json::to_vec(&batch).context(ErrorKind::SerializationFailed)) .into(); @@ -82,6 +94,9 @@ impl LfsVerifier { .context(ErrorKind::RequestCreationFailed) ); + let blobstore = self.inner.blobstore.clone(); + + let client = self.inner.client.clone(); self.inner .client .request(req) @@ -106,20 +121,33 @@ impl LfsVerifier { .context(ErrorKind::DeserializationFailed) .map_err(Error::from) }) - .and_then(|batch| { + .and_then(move |batch| { let missing_objects = find_missing_objects(batch); if missing_objects.is_empty() { - return Ok(()); + return future::ok(()).boxify(); } - Err(ErrorKind::LfsObjectsMissing(missing_objects).into()) + for object in &missing_objects { + warn!(ctx.logger(), "missing {:?} object, uploading", object); + } + + stream::iter_ok(missing_objects) + .map(move |object| { + cloned!(ctx, client, object, blobstore); + async move { upload(ctx, client, object, blobstore).await } + .boxed() + .compat() + }) + .buffer_unordered(100) + .for_each(|_| Ok(())) + .boxify() }) .boxify() } } -fn build_download_request_batch(blobs: &[(Sha256, u64)]) -> RequestBatch { +fn build_upload_request_batch(blobs: &[(Sha256, u64)]) -> RequestBatch { let objects = blobs .iter() .map(|(oid, size)| RequestObject { @@ -129,22 +157,76 @@ fn build_download_request_batch(blobs: &[(Sha256, u64)]) -> RequestBatch { .collect(); RequestBatch { - operation: Operation::Download, + operation: Operation::Upload, r#ref: None, transfers: vec![Transfer::Basic], objects, } } -fn find_missing_objects(batch: ResponseBatch) -> Vec { +fn find_missing_objects(batch: ResponseBatch) -> Vec { batch .objects .into_iter() .filter_map(|object| match object.status { - ObjectStatus::Ok { ref actions, .. } if actions.contains_key(&Operation::Download) => { + ObjectStatus::Ok { ref actions, .. } if !actions.contains_key(&Operation::Upload) => { None } - _ => Some(object.object), + _ => Some(object), }) .collect() } + +async fn upload( + ctx: CoreContext, + client: HttpsHyperClient, + resp_object: ResponseObject, + blobstore: Arc, +) -> Result<(), Error> { + match resp_object.status { + ObjectStatus::Ok { actions, .. } => match actions.get(&Operation::Upload) { + Some(action) => { + let ObjectAction { href, .. } = action; + + let s = fetch_stream( + &blobstore, + ctx.clone(), + FetchKey::from(Sha256::from_byte_array(resp_object.object.oid.0)), + ); + + let body = Body::wrap_stream(s.map(|s| s.to_vec())); + let req = Request::put(format!("{}", href)) + .header("Content-Length", &resp_object.object.size.to_string()) + .body(body)?; + + let res = client.request(req).compat().await?; + let (head, body) = res.into_parts(); + + if !head.status.is_success() { + let body = body.concat2().compat().await?; + return Err(ErrorKind::UpstreamError( + head.status, + String::from_utf8_lossy(&body).to_string(), + ) + .into()); + } else { + info!( + ctx.logger(), + "uploaded content for {:?}", resp_object.object + ); + } + + Ok(()) + } + None => Err(anyhow!( + "not found upload action for {:?}", + resp_object.object + )), + }, + ObjectStatus::Err { error } => Err(anyhow!( + "batch failed for {:?} {:?}", + resp_object.object, + error + )), + } +} diff --git a/eden/mononoke/mononoke_hg_sync_job/src/main.rs b/eden/mononoke/mononoke_hg_sync_job/src/main.rs index 9b91221d9b..df70c070b3 100644 --- a/eden/mononoke/mononoke_hg_sync_job/src/main.rs +++ b/eden/mononoke/mononoke_hg_sync_job/src/main.rs @@ -679,14 +679,9 @@ fn run(ctx: CoreContext, matches: ArgMatches<'static>) -> BoxFuture<(), Error> { let lfs_params = repo_config.lfs.clone(); - let filenode_verifier = match matches.value_of("verify-lfs-blob-presence") { - Some(uri) => { - let uri = try_boxfuture!(uri.parse::()); - let verifier = try_boxfuture!(LfsVerifier::new(uri)); - FilenodeVerifier::LfsVerifier(verifier) - } - None => FilenodeVerifier::NoopVerifier, - }; + let verify_lfs_blob_presence = matches + .value_of("verify-lfs-blob-presence") + .map(|s| s.to_string()); let hgsql_use_sqlite = matches.is_present(HGSQL_GLOBALREVS_USE_SQLITE); let hgsql_db_addr = matches @@ -699,6 +694,16 @@ fn run(ctx: CoreContext, matches: ArgMatches<'static>) -> BoxFuture<(), Error> { let maybe_skiplist_blobstore_key = repo_config.skiplist_index_blobstore_key.clone(); let hgsql_globalrevs_name = repo_config.hgsql_globalrevs_name.clone(); move |repo| { + let filenode_verifier = match verify_lfs_blob_presence { + Some(uri) => { + let uri = try_boxfuture!(uri.parse::()); + let verifier = + try_boxfuture!(LfsVerifier::new(uri, Arc::new(repo.get_blobstore()))); + FilenodeVerifier::LfsVerifier(verifier) + } + None => FilenodeVerifier::NoopVerifier, + }; + let overlay = list_hg_server_bookmarks(hg_repo_path.clone()) .and_then({ cloned!(ctx, repo); @@ -759,7 +764,10 @@ fn run(ctx: CoreContext, matches: ArgMatches<'static>) -> BoxFuture<(), Error> { .boxed() .compat(); - preparer.map(Arc::new).join3(overlay, globalrev_syncer) + preparer + .map(Arc::new) + .join3(overlay, globalrev_syncer) + .boxify() } }); diff --git a/eden/mononoke/tests/integration/test-mononoke-hg-sync-job-generate-bundles-lfs-verification.t b/eden/mononoke/tests/integration/test-mononoke-hg-sync-job-generate-bundles-lfs-verification.t index d1c61d2100..165f38517b 100644 --- a/eden/mononoke/tests/integration/test-mononoke-hg-sync-job-generate-bundles-lfs-verification.t +++ b/eden/mononoke/tests/integration/test-mononoke-hg-sync-job-generate-bundles-lfs-verification.t @@ -85,18 +85,14 @@ Make client repo $ cd "$TESTTMP" -Two missing blobs: it fails - $ mononoke_hg_sync repo-hg 1 --generate-bundles --verify-lfs-blob-presence "${lfs_uri_other}/objects/batch" 2>&1 | grep 'objects are missing' - * LFS objects are missing: * (glob) +Two missing blobs that were uploaded + $ mononoke_hg_sync repo-hg 1 --generate-bundles --verify-lfs-blob-presence "${lfs_uri_other}/objects/batch" 2>&1 | grep missing + * missing * object, uploading (glob) + * missing * object, uploading (glob) -One missing blob: it still fails - $ hg debuglfssend "$lfs_uri_other" < client-push/long - c12949887b7d8c46e9fcc5d9cd4bd884de33c1d00e24d7ac56ed9200e07f31a1 40 - $ mononoke_hg_sync repo-hg 1 --generate-bundles --verify-lfs-blob-presence "${lfs_uri_other}/objects/batch" 2>&1 | grep 'objects are missing' - * LFS objects are missing: [RequestObject { oid: Sha256(aac24ec70120b177274d359073212777a40780e2874b120a0f210096e55cfa5f), size: 40 }] (glob) +Check that they were uploaded + $ hg debuglfsreceive c12949887b7d8c46e9fcc5d9cd4bd884de33c1d00e24d7ac56ed9200e07f31a1 0 "${lfs_uri_other}" > "$TESTTMP/long" + $ cmp "$TESTTMP/long" client-push/long -Zero missing blobs: it succeeds - $ hg debuglfssend "$lfs_uri_other" < client-push/long2 - aac24ec70120b177274d359073212777a40780e2874b120a0f210096e55cfa5f 40 - $ mononoke_hg_sync repo-hg 1 --generate-bundles --verify-lfs-blob-presence "${lfs_uri_other}/objects/batch" 2>&1 | grep 'successful sync' - * successful sync of entries [2] (glob) + $ hg debuglfsreceive aac24ec70120b177274d359073212777a40780e2874b120a0f210096e55cfa5f 0 "${lfs_uri_other}" > "$TESTTMP/long2" + $ cmp "$TESTTMP/long2" client-push/long2