mirror of
https://github.com/facebook/sapling.git
synced 2024-10-06 23:07:18 +03:00
make SCSC SM aware
Summary: I'm changing scsc to use shardmanager to talk to SCS. It will use `shardmanager:mononoke.scs` tier. For xrepo commands, we require to implicitly have all repos everywhere for nor as SM doesn't support shard colocation and the method requries repos to be colocated on a single host. Reviewed By: RajivTS Differential Revision: D38781575 fbshipit-source-id: 0dde8e3d0d96cd59c3eadf34e57b1ece8fdd9df5
This commit is contained in:
parent
6b0134c35f
commit
4c2a52afbf
@ -375,8 +375,9 @@ impl Render for BlameOut {
|
||||
|
||||
pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let repo = args.repo_args.clone().into_repo_specifier();
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let commit_id = args.commit_id_args.clone().into_commit_id();
|
||||
let id = resolve_commit_id(&app.connection, &repo, &commit_id).await?;
|
||||
let id = resolve_commit_id(&conn, &repo, &commit_id).await?;
|
||||
|
||||
let mut commit = thrift::CommitSpecifier {
|
||||
repo,
|
||||
@ -395,7 +396,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
identity_schemes: btreeset! { thrift::CommitIdentityScheme::BONSAI },
|
||||
..Default::default()
|
||||
};
|
||||
let response = app.connection.commit_info(&commit, ¶ms).await?;
|
||||
let response = conn.commit_info(&commit, ¶ms).await?;
|
||||
commit.id.clone_from(
|
||||
response
|
||||
.parents
|
||||
@ -434,10 +435,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
follow_mutable_file_history,
|
||||
..Default::default()
|
||||
};
|
||||
let response = app
|
||||
.connection
|
||||
.commit_path_blame(&commit_and_path, ¶ms)
|
||||
.await?;
|
||||
let response = conn.commit_path_blame(&commit_and_path, ¶ms).await?;
|
||||
app.target
|
||||
.render_one(
|
||||
&args,
|
||||
|
@ -72,8 +72,9 @@ impl Render for CatOutput {
|
||||
|
||||
pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let repo = args.repo_args.clone().into_repo_specifier();
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let commit_id = args.commit_id_args.clone().into_commit_id();
|
||||
let id = resolve_commit_id(&app.connection, &repo, &commit_id).await?;
|
||||
let id = resolve_commit_id(&conn, &repo, &commit_id).await?;
|
||||
let commit = thrift::CommitSpecifier {
|
||||
repo,
|
||||
id,
|
||||
@ -92,7 +93,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
size: CHUNK_SIZE,
|
||||
..Default::default()
|
||||
};
|
||||
let response = app.connection.file_content_chunk(&file, ¶ms).await?;
|
||||
let response = conn.file_content_chunk(&file, ¶ms).await?;
|
||||
let output = CatOutput {
|
||||
offset: response.offset as u64,
|
||||
data: response.data,
|
||||
@ -107,8 +108,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
size: CHUNK_SIZE,
|
||||
..Default::default()
|
||||
};
|
||||
app.connection
|
||||
.file_content_chunk(&file, ¶ms)
|
||||
conn.file_content_chunk(&file, ¶ms)
|
||||
.map_err(anyhow::Error::from)
|
||||
})
|
||||
.buffered(CONCURRENT_FETCHES)
|
||||
|
@ -69,7 +69,8 @@ impl Render for CommonBaseOutput {
|
||||
pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let repo = args.repo_args.clone().into_repo_specifier();
|
||||
let commit_ids = args.commit_ids_args.clone().into_commit_ids();
|
||||
let ids = resolve_commit_ids(&app.connection, &repo, &commit_ids).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let ids = resolve_commit_ids(&conn, &repo, &commit_ids).await?;
|
||||
let ids = match ids.as_slice() {
|
||||
[id0, id1] => (id0.clone(), id1.clone()),
|
||||
_ => bail!("expected 2 commit_ids (got {})", commit_ids.len()),
|
||||
@ -84,10 +85,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
identity_schemes: args.scheme_args.clone().into_request_schemes(),
|
||||
..Default::default()
|
||||
};
|
||||
let response = app
|
||||
.connection
|
||||
.commit_common_base_with(&commit, ¶ms)
|
||||
.await?;
|
||||
let response = conn.commit_common_base_with(&commit, ¶ms).await?;
|
||||
let ids = match &response.ids {
|
||||
Some(ids) => map_commit_ids(ids.values()),
|
||||
None => BTreeMap::new(),
|
||||
|
@ -34,7 +34,8 @@ pub(super) struct CommandArgs {
|
||||
pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let repo = args.repo_args.clone().into_repo_specifier();
|
||||
let commit_id = args.commit_id_args.clone().into_commit_id();
|
||||
let id = resolve_commit_id(&app.connection, &repo, &commit_id).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let id = resolve_commit_id(&conn, &repo, &commit_id).await?;
|
||||
let bookmark = args.name.clone();
|
||||
let service_identity = args.service_id_args.service_id.clone();
|
||||
let pushvars = args.pushvar_args.into_pushvars();
|
||||
@ -46,6 +47,6 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
pushvars,
|
||||
..Default::default()
|
||||
};
|
||||
app.connection.repo_create_bookmark(&repo, ¶ms).await?;
|
||||
conn.repo_create_bookmark(&repo, ¶ms).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -37,8 +37,9 @@ pub(super) struct CommandArgs {
|
||||
pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let repo = args.repo_args.into_repo_specifier();
|
||||
let commit_id = args.commit_id_args.into_commit_id();
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let old_target = match commit_id {
|
||||
Some(commit_id) => Some(resolve_commit_id(&app.connection, &repo, &commit_id).await?),
|
||||
Some(commit_id) => Some(resolve_commit_id(&conn, &repo, &commit_id).await?),
|
||||
None => None,
|
||||
};
|
||||
let bookmark = args.name;
|
||||
@ -52,6 +53,6 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
pushvars,
|
||||
..Default::default()
|
||||
};
|
||||
app.connection.repo_delete_bookmark(&repo, ¶ms).await?;
|
||||
conn.repo_delete_bookmark(&repo, ¶ms).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -110,7 +110,8 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
bail!("expected 1 or 2 commit_ids (got {})", commit_ids.len())
|
||||
}
|
||||
let paths = args.path.clone();
|
||||
let commit_ids = resolve_commit_ids(&app.connection, &repo, &commit_ids).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let commit_ids = resolve_commit_ids(&conn, &repo, &commit_ids).await?;
|
||||
let mut identity_schemes = BTreeSet::new();
|
||||
identity_schemes.insert(thrift::CommitIdentityScheme::BONSAI);
|
||||
|
||||
@ -152,7 +153,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
ordered_params,
|
||||
..Default::default()
|
||||
};
|
||||
let response = app.connection.commit_compare(&base_commit, ¶ms).await?;
|
||||
let response = conn.commit_compare(&base_commit, ¶ms).await?;
|
||||
|
||||
if args.paths_only {
|
||||
return app
|
||||
@ -201,7 +202,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
.render(
|
||||
&(),
|
||||
diff_files(
|
||||
&app.connection,
|
||||
&conn,
|
||||
base_commit.clone(),
|
||||
other_commit_id,
|
||||
paths_sizes,
|
||||
|
@ -553,7 +553,8 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
|
||||
let repo = args.repo_args.clone().into_repo_specifier();
|
||||
let commit_id = args.commit_id_args.clone().into_commit_id();
|
||||
let id = resolve_commit_id(&app.connection, &repo, &commit_id).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let id = resolve_commit_id(&conn, &repo, &commit_id).await?;
|
||||
let commit = thrift::CommitSpecifier {
|
||||
repo: repo.clone(),
|
||||
id,
|
||||
@ -570,17 +571,11 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
..Default::default()
|
||||
};
|
||||
let response = {
|
||||
let mut response = app
|
||||
.connection
|
||||
.commit_path_info(&commit_path, ¶ms)
|
||||
.await?;
|
||||
let mut response = conn.commit_path_info(&commit_path, ¶ms).await?;
|
||||
if !response.exists && casefold == Casefold::Insensitive {
|
||||
if let Some(case_path) = case_insensitive_path(&app.connection, &commit, &path).await? {
|
||||
if let Some(case_path) = case_insensitive_path(&conn, &commit, &path).await? {
|
||||
commit_path.path = case_path;
|
||||
response = app
|
||||
.connection
|
||||
.commit_path_info(&commit_path, ¶ms)
|
||||
.await?;
|
||||
response = conn.commit_path_info(&commit_path, ¶ms).await?;
|
||||
}
|
||||
}
|
||||
response
|
||||
@ -632,7 +627,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let bytes_written = bytes_written.clone();
|
||||
move |item| {
|
||||
export_item(
|
||||
app.connection.clone(),
|
||||
conn.clone(),
|
||||
repo.clone(),
|
||||
item,
|
||||
casefold,
|
||||
|
@ -64,7 +64,8 @@ impl Render for FileListOutput {
|
||||
pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let repo = args.repo_args.clone().into_repo_specifier();
|
||||
let commit_id = args.commit_id_args.clone().into_commit_id();
|
||||
let id = resolve_commit_id(&app.connection, &repo, &commit_id).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let id = resolve_commit_id(&conn, &repo, &commit_id).await?;
|
||||
let prefixes = args.prefix.clone();
|
||||
let basenames = args.filename.clone();
|
||||
let basename_suffixes = args.suffix.clone();
|
||||
@ -84,10 +85,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
prefixes,
|
||||
..Default::default()
|
||||
};
|
||||
let response = app
|
||||
.connection
|
||||
.commit_find_files(&commit_specifier, ¶ms)
|
||||
.await?;
|
||||
let response = conn.commit_find_files(&commit_specifier, ¶ms).await?;
|
||||
app.target
|
||||
.render_one(&args, FileListOutput(response.files))
|
||||
.await
|
||||
|
@ -161,7 +161,8 @@ impl Render for FileInfoOutput {
|
||||
|
||||
async fn commit_info(app: ScscApp, args: CommandArgs, repo: thrift::RepoSpecifier) -> Result<()> {
|
||||
let commit_id = args.commit_id_args.clone().into_commit_id();
|
||||
let id = resolve_commit_id(&app.connection, &repo, &commit_id).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let id = resolve_commit_id(&conn, &repo, &commit_id).await?;
|
||||
let commit = thrift::CommitSpecifier {
|
||||
repo,
|
||||
id,
|
||||
@ -171,7 +172,7 @@ async fn commit_info(app: ScscApp, args: CommandArgs, repo: thrift::RepoSpecifie
|
||||
identity_schemes: args.scheme_args.clone().into_request_schemes(),
|
||||
..Default::default()
|
||||
};
|
||||
let response = app.connection.commit_info(&commit, ¶ms).await?;
|
||||
let response = conn.commit_info(&commit, ¶ms).await?;
|
||||
|
||||
let commit_info = CommitInfo::try_from(&response)?;
|
||||
let output = CommitInfoOutput {
|
||||
@ -183,6 +184,7 @@ async fn commit_info(app: ScscApp, args: CommandArgs, repo: thrift::RepoSpecifie
|
||||
}
|
||||
|
||||
async fn bookmark_info(app: ScscApp, args: CommandArgs, repo: thrift::RepoSpecifier) -> Result<()> {
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let bookmark_name = args
|
||||
.commit_id_args
|
||||
.clone()
|
||||
@ -193,7 +195,7 @@ async fn bookmark_info(app: ScscApp, args: CommandArgs, repo: thrift::RepoSpecif
|
||||
identity_schemes: args.scheme_args.clone().into_request_schemes(),
|
||||
..Default::default()
|
||||
};
|
||||
let response = app.connection.repo_bookmark_info(&repo, ¶ms).await?;
|
||||
let response = conn.repo_bookmark_info(&repo, ¶ms).await?;
|
||||
let info = response
|
||||
.info
|
||||
.ok_or_else(|| anyhow!("Bookmark doesn't exit"))?;
|
||||
@ -214,7 +216,8 @@ async fn path_info(
|
||||
path: String,
|
||||
) -> Result<()> {
|
||||
let commit_id = args.commit_id_args.clone().into_commit_id();
|
||||
let id = resolve_commit_id(&app.connection, &repo, &commit_id).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let id = resolve_commit_id(&conn, &repo, &commit_id).await?;
|
||||
let commit = thrift::CommitSpecifier {
|
||||
repo,
|
||||
id,
|
||||
@ -228,10 +231,7 @@ async fn path_info(
|
||||
let params = thrift::CommitPathInfoParams {
|
||||
..Default::default()
|
||||
};
|
||||
let response = app
|
||||
.connection
|
||||
.commit_path_info(&commit_path, ¶ms)
|
||||
.await?;
|
||||
let response = conn.commit_path_info(&commit_path, ¶ms).await?;
|
||||
if response.exists {
|
||||
match (response.r#type, response.info) {
|
||||
(Some(entry_type), Some(thrift::EntryInfo::tree(info))) => {
|
||||
@ -260,7 +260,8 @@ async fn multiple_path_info(
|
||||
paths: Vec<String>,
|
||||
) -> Result<()> {
|
||||
let commit_id = args.commit_id_args.clone().into_commit_id();
|
||||
let id = resolve_commit_id(&app.connection, &repo, &commit_id).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let id = resolve_commit_id(&conn, &repo, &commit_id).await?;
|
||||
let commit = thrift::CommitSpecifier {
|
||||
repo,
|
||||
id,
|
||||
@ -270,10 +271,7 @@ async fn multiple_path_info(
|
||||
paths,
|
||||
..Default::default()
|
||||
};
|
||||
let response = app
|
||||
.connection
|
||||
.commit_multiple_path_info(&commit, ¶ms)
|
||||
.await?;
|
||||
let response = conn.commit_multiple_path_info(&commit, ¶ms).await?;
|
||||
|
||||
let output = stream::iter(response.path_info).map(move |(path, commit_info)| {
|
||||
match (commit_info.r#type, commit_info.info) {
|
||||
|
@ -53,7 +53,8 @@ impl Render for IsAncestorOutput {
|
||||
pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let repo = args.repo_args.clone().into_repo_specifier();
|
||||
let commit_ids = args.commit_ids_args.clone().into_commit_ids();
|
||||
let ids = resolve_commit_ids(&app.connection, &repo, &commit_ids).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let ids = resolve_commit_ids(&conn, &repo, &commit_ids).await?;
|
||||
if ids.len() != 2 {
|
||||
bail!("expected 2 commit_ids (got {})", commit_ids.len())
|
||||
}
|
||||
@ -66,10 +67,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
descendant_commit_id: ids[1].clone(),
|
||||
..Default::default()
|
||||
};
|
||||
let response = app
|
||||
.connection
|
||||
.commit_is_ancestor_of(&commit, ¶ms)
|
||||
.await?;
|
||||
let response = conn.commit_is_ancestor_of(&commit, ¶ms).await?;
|
||||
let output = IsAncestorOutput { result: response };
|
||||
app.target.render_one(&args, output).await
|
||||
}
|
||||
|
@ -104,7 +104,8 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
if commit_ids.len() != 2 {
|
||||
bail!("expected 2 commit_ids (got {})", commit_ids.len())
|
||||
}
|
||||
let ids = resolve_commit_ids(&app.connection, &repo, &commit_ids).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let ids = resolve_commit_ids(&conn, &repo, &commit_ids).await?;
|
||||
let bookmark = args.name;
|
||||
let service_identity = args.service_id_args.service_id;
|
||||
let pushvars = args.pushvar_args.into_pushvars();
|
||||
@ -124,8 +125,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
pushvars,
|
||||
..Default::default()
|
||||
};
|
||||
let outcome = app
|
||||
.connection
|
||||
let outcome = conn
|
||||
.repo_land_stack(&repo, ¶ms)
|
||||
.await?
|
||||
.pushrebase_outcome;
|
||||
|
@ -197,16 +197,17 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let prefix = args.prefix.clone();
|
||||
let include_scratch = args.include_scratch;
|
||||
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let bookmarks = match args.commit_ids_args.clone().into_commit_ids().as_slice() {
|
||||
[ref commit_id] => {
|
||||
let id = resolve_commit_id(&app.connection, &repo, commit_id).await?;
|
||||
let id = resolve_commit_id(&conn, &repo, commit_id).await?;
|
||||
let commit = thrift::CommitSpecifier {
|
||||
repo,
|
||||
id,
|
||||
..Default::default()
|
||||
};
|
||||
commit_list_descendant_bookmarks(
|
||||
app.connection.clone(),
|
||||
conn.clone(),
|
||||
commit,
|
||||
Some(limit),
|
||||
after.map(String::from),
|
||||
@ -217,7 +218,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
.left_stream()
|
||||
}
|
||||
[] => repo_list_bookmarks(
|
||||
app.connection.clone(),
|
||||
conn.clone(),
|
||||
repo,
|
||||
Some(limit),
|
||||
after.map(String::from),
|
||||
|
@ -119,7 +119,8 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
if commit_ids.len() > 2 || commit_ids.is_empty() {
|
||||
anyhow::bail!("expected 1 or 2 commit_ids (got {})", commit_ids.len())
|
||||
}
|
||||
let ids = resolve_commit_ids(&app.connection, &repo, &commit_ids).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let ids = resolve_commit_ids(&conn, &repo, &commit_ids).await?;
|
||||
let id = ids[0].clone();
|
||||
let descendants_of = ids.get(1).cloned();
|
||||
let commit = thrift::CommitSpecifier {
|
||||
@ -158,8 +159,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
follow_mutable_file_history,
|
||||
..Default::default()
|
||||
};
|
||||
app.connection
|
||||
.commit_path_history(&commit_and_path, ¶ms)
|
||||
conn.commit_path_history(&commit_and_path, ¶ms)
|
||||
.await?
|
||||
.history
|
||||
}
|
||||
@ -175,10 +175,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
exclude_changeset_and_ancestors: None,
|
||||
..Default::default()
|
||||
};
|
||||
app.connection
|
||||
.commit_history(&commit, ¶ms)
|
||||
.await?
|
||||
.history
|
||||
conn.commit_history(&commit, ¶ms).await?.history
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -65,7 +65,8 @@ impl Render for LookupOutput {
|
||||
pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let repo = args.repo_args.clone().into_repo_specifier();
|
||||
let commit_id = args.commit_id_args.clone().into_commit_id();
|
||||
let id = resolve_commit_id(&app.connection, &repo, &commit_id).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let id = resolve_commit_id(&conn, &repo, &commit_id).await?;
|
||||
let commit = thrift::CommitSpecifier {
|
||||
repo,
|
||||
id,
|
||||
@ -75,7 +76,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
identity_schemes: args.scheme_args.clone().into_request_schemes(),
|
||||
..Default::default()
|
||||
};
|
||||
let response = app.connection.commit_lookup(&commit, ¶ms).await?;
|
||||
let response = conn.commit_lookup(&commit, ¶ms).await?;
|
||||
let ids = match &response.ids {
|
||||
Some(ids) => map_commit_ids(ids.values()),
|
||||
None => BTreeMap::new(),
|
||||
|
@ -242,7 +242,8 @@ fn list_output(
|
||||
pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let repo = args.repo_args.clone().into_repo_specifier();
|
||||
let commit_id = args.commit_id_args.clone().into_commit_id();
|
||||
let id = resolve_commit_id(&app.connection, &repo, &commit_id).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let id = resolve_commit_id(&conn, &repo, &commit_id).await?;
|
||||
let commit = thrift::CommitSpecifier {
|
||||
repo: repo.clone(),
|
||||
id,
|
||||
@ -261,13 +262,13 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
limit: CHUNK_SIZE,
|
||||
..Default::default()
|
||||
};
|
||||
let response = app.connection.tree_list(&tree, ¶ms).await?;
|
||||
let response = conn.tree_list(&tree, ¶ms).await?;
|
||||
let count = response.count;
|
||||
let long = args.long;
|
||||
let output = list_output(app.connection.clone(), repo.clone(), response, long).chain(
|
||||
let output = list_output(conn.clone(), repo.clone(), response, long).chain(
|
||||
stream::iter((CHUNK_SIZE..count).step_by(CHUNK_SIZE as usize))
|
||||
.map({
|
||||
let connection = app.connection.clone();
|
||||
let connection = conn.clone();
|
||||
move |offset| {
|
||||
// Request subsequent chunks of the directory listing.
|
||||
let params = thrift::TreeListParams {
|
||||
@ -281,7 +282,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
.buffered(CONCURRENT_FETCHES)
|
||||
.then(move |response| {
|
||||
let repo = repo.clone();
|
||||
let connection = app.connection.clone();
|
||||
let connection = conn.clone();
|
||||
async move {
|
||||
response.map(move |response| {
|
||||
list_output(connection.clone(), repo.clone(), response, long)
|
||||
|
@ -45,7 +45,8 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
if commit_ids.len() != 1 && commit_ids.len() != 2 {
|
||||
bail!("expected 1 or 2 commit_ids (got {})", commit_ids.len())
|
||||
}
|
||||
let ids = resolve_commit_ids(&app.connection, &repo, &commit_ids).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let ids = resolve_commit_ids(&conn, &repo, &commit_ids).await?;
|
||||
let bookmark = args.name;
|
||||
let service_identity = args.service_id_args.service_id;
|
||||
|
||||
@ -66,6 +67,6 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
pushvars,
|
||||
..Default::default()
|
||||
};
|
||||
app.connection.repo_move_bookmark(&repo, ¶ms).await?;
|
||||
conn.repo_move_bookmark(&repo, ¶ms).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -27,16 +27,13 @@ pub(super) struct CommandArgs {
|
||||
|
||||
pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let repo = args.repo.clone().into_repo_specifier();
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let params = thrift::RepoPrepareCommitsParams {
|
||||
commits: resolve_commit_ids(
|
||||
&app.connection,
|
||||
&repo,
|
||||
&args.commit_ids.clone().into_commit_ids(),
|
||||
)
|
||||
.await?,
|
||||
commits: resolve_commit_ids(&conn, &repo, &args.commit_ids.clone().into_commit_ids())
|
||||
.await?,
|
||||
derived_data_type: args.derived_data_type.clone().into_derived_data_type(),
|
||||
..Default::default()
|
||||
};
|
||||
app.connection.repo_prepare_commits(&repo, ¶ms).await?;
|
||||
conn.repo_prepare_commits(&repo, ¶ms).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -96,14 +96,14 @@ impl Render for PushrebaseLookupOutput {
|
||||
pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let repo = args.repo_args.clone().into_repo_specifier();
|
||||
let commit_id = args.commit_id_args.clone().into_commit_id();
|
||||
let id = resolve_commit_id(&app.connection, &repo, &commit_id).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let id = resolve_commit_id(&conn, &repo, &commit_id).await?;
|
||||
let commit = thrift::CommitSpecifier {
|
||||
repo,
|
||||
id,
|
||||
..Default::default()
|
||||
};
|
||||
let pushrebase_history = app
|
||||
.connection
|
||||
let pushrebase_history = conn
|
||||
.commit_lookup_pushrebase_history(
|
||||
&commit,
|
||||
&thrift::CommitLookupPushrebaseHistoryParams {
|
||||
@ -116,7 +116,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
..Default::default()
|
||||
};
|
||||
let commit_lookups: Vec<_> = stream::iter(pushrebase_history.history.clone())
|
||||
.map(|commit| app.connection.commit_lookup(&commit, &lookup_params))
|
||||
.map(|commit| conn.commit_lookup(&commit, &lookup_params))
|
||||
.buffered(10)
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
@ -45,7 +45,8 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let params = thrift::ListReposParams {
|
||||
..Default::default()
|
||||
};
|
||||
let repos = app.connection.list_repos(¶ms).await?;
|
||||
let conn = app.get_connection(None)?;
|
||||
let repos = conn.list_repos(¶ms).await?;
|
||||
app.target
|
||||
.render_one(
|
||||
&args,
|
||||
|
@ -79,7 +79,8 @@ impl Render for RunHooksOutput {
|
||||
pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let repo = args.repo_args.clone().into_repo_specifier();
|
||||
let original_commit_id = args.commit_id_args.clone().into_commit_id();
|
||||
let commit_id = resolve_commit_id(&app.connection, &repo, &original_commit_id).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let commit_id = resolve_commit_id(&conn, &repo, &original_commit_id).await?;
|
||||
let commit_specifier = thrift::CommitSpecifier {
|
||||
id: commit_id,
|
||||
repo,
|
||||
@ -93,10 +94,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
pushvars,
|
||||
..Default::default()
|
||||
};
|
||||
let response = app
|
||||
.connection
|
||||
.commit_run_hooks(&commit_specifier, ¶ms)
|
||||
.await?;
|
||||
let response = conn.commit_run_hooks(&commit_specifier, ¶ms).await?;
|
||||
let outcomes = response
|
||||
.outcomes
|
||||
.into_iter()
|
||||
|
@ -89,7 +89,8 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
bail!("expected 2 commit_ids (got {})", commit_ids.len())
|
||||
}
|
||||
|
||||
let commit_ids = resolve_commit_ids(&app.connection, &repo, &commit_ids).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let commit_ids = resolve_commit_ids(&conn, &repo, &commit_ids).await?;
|
||||
|
||||
let profiles = args.sparse_profiles_args.clone().into_sparse_profiles();
|
||||
|
||||
@ -105,10 +106,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let response = app
|
||||
.connection
|
||||
.commit_sparse_profile_delta(&commit, ¶ms)
|
||||
.await?;
|
||||
let response = conn.commit_sparse_profile_delta(&commit, ¶ms).await?;
|
||||
|
||||
let output = SparseProfileDeltaOutput {
|
||||
changed_sparse_profiles: response.changed_sparse_profiles,
|
||||
|
@ -61,7 +61,8 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let repo = args.repo_args.clone().into_repo_specifier();
|
||||
|
||||
let commit_id = args.commit_id_args.clone().into_commit_id();
|
||||
let commit_id = resolve_commit_id(&app.connection, &repo, &commit_id).await?;
|
||||
let conn = app.get_connection(Some(&repo.name))?;
|
||||
let commit_id = resolve_commit_id(&conn, &repo, &commit_id).await?;
|
||||
|
||||
let profiles = args.sparse_profiles_args.clone().into_sparse_profiles();
|
||||
|
||||
@ -76,10 +77,7 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let response = app
|
||||
.connection
|
||||
.commit_sparse_profile_size(&commit, ¶ms)
|
||||
.await?;
|
||||
let response = conn.commit_sparse_profile_size(&commit, ¶ms).await?;
|
||||
|
||||
let output = SparseProfileSizeOutput {
|
||||
profiles_size: response.profiles_size,
|
||||
|
@ -118,8 +118,10 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
let target_repo = get_repo_specifier(args.target_repo.clone());
|
||||
|
||||
let commit_id = args.commit_id_args.clone().into_commit_id();
|
||||
let id = resolve_commit_id(&app.connection, &source_repo, &commit_id).await?;
|
||||
let hint = build_hint(&args, &app.connection, &target_repo).await?;
|
||||
let src_repo_conn = app.get_connection(Some(&source_repo.name))?;
|
||||
let id = resolve_commit_id(&src_repo_conn, &source_repo, &commit_id).await?;
|
||||
let target_repo_conn = app.get_connection(Some(&target_repo.name))?;
|
||||
let hint = build_hint(&args, &target_repo_conn, &target_repo).await?;
|
||||
|
||||
let commit = thrift::CommitSpecifier {
|
||||
repo: source_repo,
|
||||
@ -132,7 +134,10 @@ pub(super) async fn run(app: ScscApp, args: CommandArgs) -> Result<()> {
|
||||
candidate_selection_hint: hint,
|
||||
..Default::default()
|
||||
};
|
||||
let response = app.connection.commit_lookup_xrepo(&commit, ¶ms).await?;
|
||||
// XXX Repos for xrepo methods need to be available on all servers,
|
||||
// no matter if they're sharded or not, because SM doesn't support
|
||||
// shard colocation.
|
||||
let response = src_repo_conn.commit_lookup_xrepo(&commit, ¶ms).await?;
|
||||
let ids = match &response.ids {
|
||||
Some(ids) => map_commit_ids(ids.values()),
|
||||
None => BTreeMap::new(),
|
||||
|
@ -13,11 +13,12 @@ use std::sync::Arc;
|
||||
use anyhow::anyhow;
|
||||
use anyhow::Error;
|
||||
use fbinit::FacebookInit;
|
||||
use sharding_lib_ext::encode_repo_name;
|
||||
use source_control::client::make_SourceControlService;
|
||||
use source_control::client::SourceControlService;
|
||||
use x2pclient::X2pClientBuilder;
|
||||
|
||||
const DEFAULT_TIER: &str = "mononoke-scs-server";
|
||||
const DEFAULT_TIER: &str = "shardmanager:mononoke.scs";
|
||||
|
||||
const CONN_TIMEOUT_MS: u32 = 1000;
|
||||
const RECV_TIMEOUT_MS: u32 = 30_000;
|
||||
@ -57,10 +58,12 @@ impl Connection {
|
||||
fb: FacebookInit,
|
||||
client_id: String,
|
||||
tier: impl AsRef<str>,
|
||||
shardmanager_domain: Option<&str>,
|
||||
) -> Result<Self, Error> {
|
||||
use maplit::hashmap;
|
||||
use rand::distributions::Alphanumeric;
|
||||
use rand::Rng;
|
||||
use srclient::ClientParams;
|
||||
use srclient::SRChannelBuilder;
|
||||
|
||||
let correlator: String = rand::thread_rng()
|
||||
@ -75,10 +78,19 @@ impl Connection {
|
||||
let conn_config = hashmap! {
|
||||
String::from("client_id") => client_id,
|
||||
};
|
||||
|
||||
let client = SRChannelBuilder::from_service_name(fb, tier.as_ref())?
|
||||
.with_conn_config(&conn_config)
|
||||
.with_persistent_headers(headers)
|
||||
.build_client(make_SourceControlService)?;
|
||||
.with_persistent_headers(headers);
|
||||
|
||||
let client = if let Some(shardmanager_domain) = shardmanager_domain {
|
||||
let client_params = ClientParams::new()
|
||||
.with_shard_manager_domain(encode_repo_name(shardmanager_domain));
|
||||
client.with_client_params(client_params)
|
||||
} else {
|
||||
client
|
||||
}
|
||||
.build_client(make_SourceControlService)?;
|
||||
Ok(Self { client })
|
||||
}
|
||||
|
||||
@ -88,6 +100,7 @@ impl Connection {
|
||||
_fb: FacebookInit,
|
||||
_client_id: String,
|
||||
_tier: impl AsRef<str>,
|
||||
_shardmanager_domain: Option<&str>,
|
||||
) -> Result<Self, Error> {
|
||||
Err(anyhow!(
|
||||
"Connection via ServiceRouter is not supported on this platform"
|
||||
@ -99,9 +112,16 @@ impl Connection {
|
||||
fb: FacebookInit,
|
||||
_client_id: String,
|
||||
tier: impl AsRef<str>,
|
||||
shardmanager_domain: Option<&str>,
|
||||
) -> Result<Self, Error> {
|
||||
let client = X2pClientBuilder::from_service_name(fb, tier.as_ref())
|
||||
.build_client(make_SourceControlService)?;
|
||||
let client = X2pClientBuilder::from_service_name(fb, tier.as_ref());
|
||||
let client = if let Some(sm_domain) = shardmanager_domain {
|
||||
client.with_shard_manager_domain(encode_repo_name(sm_domain))
|
||||
} else {
|
||||
client
|
||||
}
|
||||
.build_client(make_SourceControlService)?;
|
||||
|
||||
Ok(Self { client })
|
||||
}
|
||||
|
||||
@ -110,10 +130,15 @@ impl Connection {
|
||||
fb: FacebookInit,
|
||||
client_id: String,
|
||||
tier: impl AsRef<str>,
|
||||
shardmanager_domain: Option<&str>,
|
||||
) -> Result<Self, Error> {
|
||||
match x2pclient::get_env(fb) {
|
||||
x2pclient::Environment::Prod => Self::from_tier_name_via_sr(fb, client_id, tier),
|
||||
x2pclient::Environment::Corp => Self::from_tier_name_via_x2p(fb, client_id, tier),
|
||||
x2pclient::Environment::Prod => {
|
||||
Self::from_tier_name_via_sr(fb, client_id, tier, shardmanager_domain)
|
||||
}
|
||||
x2pclient::Environment::Corp => {
|
||||
Self::from_tier_name_via_x2p(fb, client_id, tier, shardmanager_domain)
|
||||
}
|
||||
other_env => Err(anyhow!("{} not supported", other_env)),
|
||||
}
|
||||
}
|
||||
@ -133,11 +158,15 @@ pub(super) struct ConnectionArgs {
|
||||
}
|
||||
|
||||
impl ConnectionArgs {
|
||||
pub fn get_connection(&self, fb: FacebookInit) -> Result<Connection, Error> {
|
||||
pub fn get_connection(
|
||||
&self,
|
||||
fb: FacebookInit,
|
||||
repo: Option<&str>,
|
||||
) -> Result<Connection, Error> {
|
||||
if let Some(host_port) = &self.host {
|
||||
Connection::from_host_port(fb, host_port)
|
||||
} else {
|
||||
Connection::from_tier_name(fb, self.client_id.clone(), &self.tier)
|
||||
Connection::from_tier_name(fb, self.client_id.clone(), &self.tier, repo)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -61,8 +61,15 @@ lazy_static::lazy_static! {
|
||||
|
||||
pub(crate) struct ScscApp {
|
||||
matches: ArgMatches,
|
||||
connection: Connection,
|
||||
connection_args: ConnectionArgs,
|
||||
target: OutputTarget,
|
||||
fb: FacebookInit,
|
||||
}
|
||||
|
||||
impl ScscApp {
|
||||
fn get_connection(&self, repo: Option<&str>) -> anyhow::Result<Connection> {
|
||||
self.connection_args.get_connection(self.fb, repo)
|
||||
}
|
||||
}
|
||||
|
||||
impl BaseApp for ScscApp {
|
||||
@ -97,7 +104,7 @@ async fn main_impl(fb: FacebookInit) -> anyhow::Result<()> {
|
||||
.arg_required_else_help(true);
|
||||
let matches = app.get_matches();
|
||||
let common_args = ScscArgs::from_arg_matches(&matches)?;
|
||||
let connection = common_args.connection_args.get_connection(fb)?;
|
||||
let connection_args = common_args.connection_args;
|
||||
let target = if common_args.json {
|
||||
OutputTarget::Json
|
||||
} else if atty::is(atty::Stream::Stdout) {
|
||||
@ -107,8 +114,9 @@ async fn main_impl(fb: FacebookInit) -> anyhow::Result<()> {
|
||||
};
|
||||
let app = ScscApp {
|
||||
matches,
|
||||
connection,
|
||||
connection_args,
|
||||
target,
|
||||
fb,
|
||||
};
|
||||
commands::dispatch(app).await
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user