Syntactic sugar to log memory stats for a method

Reviewed By: mitrandir77

Differential Revision: D56886668

fbshipit-source-id: 60306e9845c604b3584f665827f4d1e3fed16b70
This commit is contained in:
Andrea Campi 2024-05-07 03:25:14 -07:00 committed by Facebook GitHub Bot
parent b36ba1179b
commit d876996c51
2 changed files with 126 additions and 92 deletions

View File

@ -6,6 +6,7 @@
*/
use std::collections::hash_map::Entry;
use std::future::Future;
use std::io::Error as IoError;
use std::num::NonZeroU64;
use std::path::Path;
@ -230,6 +231,34 @@ impl MononokeScubaSampleBuilder {
self.log_with_msg(log_tag, msg)
}
pub async fn maybe_log_memory_stats<T, E>(
&self,
log_memory_usage: bool,
fut: impl Future<Output = Result<T, E>>,
) -> Result<T, E> {
if log_memory_usage {
let stats = memory::get_stats();
if stats.is_ok() {
let mut scuba = self.clone();
scuba.add_memory_stats(&stats.unwrap());
scuba.log_with_msg("Memory usage before call", None);
}
}
let ret = fut.await;
if log_memory_usage {
let stats = memory::get_stats();
if stats.is_ok() {
let mut scuba = self.clone();
scuba.add_memory_stats(&stats.unwrap());
scuba.log_with_msg("Memory usage after call", None);
}
}
ret
}
pub fn add_stream_stats(&mut self, stats: &StreamStats) -> &mut Self {
self.inner
.add("poll_count", stats.poll_count)

View File

@ -744,108 +744,113 @@ impl SourceControlServiceImpl {
commit: thrift::CommitSpecifier,
params: thrift::CommitFindFilesParams,
) -> Result<thrift::CommitFindFilesResponse, errors::ServiceError> {
let rss_min_free_bytes =
justknobs::get_as::<usize>("scm/mononoke:scs_rss_min_free_bytes", None).unwrap_or(0);
let rss_min_free_pct =
justknobs::get_as::<i32>("scm/mononoke:scs_rss_min_free_pct", None).unwrap_or(0);
ctx.clone()
.scuba()
.maybe_log_memory_stats(should_log_memory_usage(), async move {
let rss_min_free_bytes =
justknobs::get_as::<usize>("scm/mononoke:scs_rss_min_free_bytes", None).unwrap_or(0);
let rss_min_free_pct =
justknobs::get_as::<i32>("scm/mononoke:scs_rss_min_free_pct", None).unwrap_or(0);
debug!(
ctx.logger(),
"commit_find_files: {} {} {}",
rss_min_free_bytes,
rss_min_free_pct,
should_log_memory_usage(),
);
debug!(
ctx.logger(),
"commit_find_files: {} {} {}",
rss_min_free_bytes,
rss_min_free_pct,
should_log_memory_usage(),
);
if rss_min_free_bytes > 0 || rss_min_free_pct > 0 || should_log_memory_usage() {
match memory::get_stats() {
Ok(stats) => {
debug!(ctx.logger(), "commit_find_files: loaded stats {:?}", stats);
if should_log_memory_usage() {
let mut scuba = ctx.clone().scuba().clone();
scuba.add_memory_stats(&stats);
scuba.log_with_msg("Memory usage before call", None);
}
if stats.rss_free_bytes < rss_min_free_bytes {
debug!(
ctx.logger(),
"not enough memory free, need at least {} bytes free, only {} free right now",
rss_min_free_bytes,
stats.rss_free_bytes,
);
if rss_min_free_bytes > 0 || rss_min_free_pct > 0 || should_log_memory_usage() {
match memory::get_stats() {
Ok(stats) => {
debug!(ctx.logger(), "commit_find_files: loaded stats {:?}", stats);
if should_log_memory_usage() {
let mut scuba = ctx.clone().scuba().clone();
scuba.add_memory_stats(&stats);
scuba.log_with_msg("Memory usage before call", None);
}
if stats.rss_free_bytes < rss_min_free_bytes {
debug!(
ctx.logger(),
"not enough memory free, need at least {} bytes free, only {} free right now",
rss_min_free_bytes,
stats.rss_free_bytes,
);
return Err(errors::overloaded("Not enough memory free".to_string()).into());
}
if stats.rss_free_pct < rss_min_free_pct as f32 {
debug!(
ctx.logger(),
"not enough memory free, need at least {}% free, only {}% free right now",
rss_min_free_pct,
stats.rss_free_pct,
);
return Err(errors::overloaded("Not enough memory free".to_string()).into());
}
if stats.rss_free_pct < rss_min_free_pct as f32 {
debug!(
ctx.logger(),
"not enough memory free, need at least {}% free, only {}% free right now",
rss_min_free_pct,
stats.rss_free_pct,
);
return Err(errors::overloaded("Not enough memory free".to_string()).into());
return Err(errors::overloaded("Not enough memory free".to_string()).into());
}
}
Err(_) => {}
}
}
Err(_) => {}
}
}
let (_repo, changeset) = self.repo_changeset(ctx.clone(), &commit).await?;
let limit: usize = check_range_and_convert(
"limit",
params.limit,
0..=source_control::COMMIT_FIND_FILES_MAX_LIMIT,
)?;
let prefixes: Option<Vec<_>> = match params.prefixes {
Some(prefixes) => Some(
prefixes
.into_iter()
.map(|prefix| {
MPath::try_from(&prefix).map_err(|e| {
errors::invalid_request(format!("invalid prefix '{}': {}", prefix, e))
})
})
.collect::<Result<Vec<_>, _>>()?,
),
None => None,
};
let ordering = match &params.after {
Some(after) => {
let after = Some(MPath::try_from(after).map_err(|e| {
errors::invalid_request(format!("invalid continuation path '{}': {}", after, e))
})?);
ChangesetFileOrdering::Ordered { after }
}
None => ChangesetFileOrdering::Unordered,
};
let (_repo, changeset) = self.repo_changeset(ctx.clone(), &commit).await?;
let limit: usize = check_range_and_convert(
"limit",
params.limit,
0..=source_control::COMMIT_FIND_FILES_MAX_LIMIT,
)?;
let prefixes: Option<Vec<_>> = match params.prefixes {
Some(prefixes) => Some(
prefixes
.into_iter()
.map(|prefix| {
MPath::try_from(&prefix).map_err(|e| {
errors::invalid_request(format!("invalid prefix '{}': {}", prefix, e))
})
})
.collect::<Result<Vec<_>, _>>()?,
),
None => None,
};
let ordering = match &params.after {
Some(after) => {
let after = Some(MPath::try_from(after).map_err(|e| {
errors::invalid_request(format!("invalid continuation path '{}': {}", after, e))
})?);
ChangesetFileOrdering::Ordered { after }
}
None => ChangesetFileOrdering::Unordered,
};
let files = changeset
.find_files(
prefixes,
params.basenames,
params.basename_suffixes,
ordering,
)
.await?
.take(limit)
.map_ok(|path| path.to_string())
.try_collect()
.await?;
let files = changeset
.find_files(
prefixes,
params.basenames,
params.basename_suffixes,
ordering,
)
.await?
.take(limit)
.map_ok(|path| path.to_string())
.try_collect()
.await?;
if should_log_memory_usage() {
let stats = memory::get_stats();
if stats.is_ok() {
let mut scuba = ctx.scuba().clone();
scuba.add_memory_stats(&stats.unwrap());
scuba.log_with_msg("Memory usage after call", None);
}
}
if should_log_memory_usage() {
let stats = memory::get_stats();
if stats.is_ok() {
let mut scuba = ctx.scuba().clone();
scuba.add_memory_stats(&stats.unwrap());
scuba.log_with_msg("Memory usage after call", None);
}
}
Ok(thrift::CommitFindFilesResponse {
files,
..Default::default()
})
Ok(thrift::CommitFindFilesResponse {
files,
..Default::default()
})
})
.await
}
/// Returns the history of a commit