mononoke: start passing CoreContext around.

Summary: Introduce CoreContext to replace separate log/scuba/trace things.

Reviewed By: StanislavGlebik

Differential Revision: D9322150

fbshipit-source-id: fe65c8e0c601ba0fe00ef7e6bfa226f0d9c78dd0
This commit is contained in:
Jeremy Fitzhardinge 2018-08-15 11:04:34 -07:00 committed by Facebook Github Bot
parent 368318b006
commit 3a28aeef53
6 changed files with 77 additions and 62 deletions

View File

@ -22,9 +22,11 @@ use itertools::Itertools;
use slog::Logger;
use stats::Histogram;
use time_ext::DurationExt;
use uuid::Uuid;
use blobrepo::HgBlobChangeset;
use bundle2_resolver;
use context::CoreContext;
use mercurial::{self, RevlogChangeset};
use mercurial_bundles::{create_bundle_stream, parts, Bundle2EncodeBuilder, Bundle2Item};
use mercurial_types::{percent_encode, Changeset, Entry, HgBlobNode, HgChangesetId, HgManifestId,
@ -149,34 +151,25 @@ fn bundle2caps() -> String {
#[derive(Clone)]
pub struct RepoClient {
repo: Arc<MononokeRepo>,
logger: Logger,
scuba_logger: ScubaSampleBuilder,
trace: TraceContext,
repo: MononokeRepo,
ctxt: CoreContext<Uuid>,
}
impl RepoClient {
pub fn new(
repo: Arc<MononokeRepo>,
logger: Logger,
scuba_logger: ScubaSampleBuilder,
trace: TraceContext,
) -> Self {
RepoClient {
repo,
logger,
scuba_logger,
trace,
}
pub fn new(repo: MononokeRepo, ctxt: CoreContext<Uuid>) -> Self {
RepoClient { repo, ctxt }
}
#[allow(dead_code)]
pub fn get_logger(&self) -> &Logger {
&self.logger
fn logger(&self) -> &Logger {
self.ctxt.logger()
}
fn trace(&self) -> &TraceContext {
self.ctxt.trace()
}
fn scuba_logger(&self, op: &str, args: Option<String>) -> ScubaSampleBuilder {
let mut scuba_logger = self.scuba_logger.clone();
let mut scuba_logger = self.ctxt.scuba().clone();
scuba_logger.add("command", op);
@ -206,9 +199,9 @@ impl RepoClient {
.cloned()
.collect();
info!(self.logger, "{} heads requested", heads.len());
info!(self.logger(), "{} heads requested", heads.len());
for head in heads.iter() {
debug!(self.logger, "{}", head);
debug!(self.logger(), "{}", head);
}
let excludes: Vec<_> = args.common
@ -281,7 +274,7 @@ impl RepoClient {
}
fn gettreepack_untimed(&self, params: GettreepackArgs) -> BoxStream<Bytes, Error> {
debug!(self.logger, "gettreepack");
debug!(self.logger(), "gettreepack");
if !params.directories.is_empty() {
// This param is not used by core hg, don't worry about implementing it now
@ -310,7 +303,7 @@ impl RepoClient {
&basemfnode,
rootpath.clone(),
Some(and_pruner_combinator(&file_pruner, visited_pruner.clone())),
self.trace.clone(),
self.trace().clone(),
);
cur_stream.select(new_stream).boxify()
})
@ -322,7 +315,7 @@ impl RepoClient {
&basemfnode,
rootpath.clone(),
Some(&file_pruner),
self.trace.clone(),
self.trace().clone(),
),
None => empty().boxify(),
}
@ -335,7 +328,7 @@ impl RepoClient {
})
.map({
let blobrepo = self.repo.blobrepo();
let trace = self.trace.clone();
let trace = self.trace().clone();
move |(entry, basepath)| {
fetch_treepack_part_input(blobrepo.clone(), entry, basepath, trace.clone())
}
@ -356,17 +349,17 @@ impl RepoClient {
impl HgCommands for RepoClient {
// @wireprotocommand('between', 'pairs')
fn between(&self, pairs: Vec<(HgNodeHash, HgNodeHash)>) -> HgCommandRes<Vec<Vec<HgNodeHash>>> {
info!(self.logger, "between pairs {:?}", pairs);
info!(self.logger(), "between pairs {:?}", pairs);
struct ParentStream<CS> {
repo: Arc<MononokeRepo>,
repo: MononokeRepo,
n: HgNodeHash,
bottom: HgNodeHash,
wait_cs: Option<CS>,
};
impl<CS> ParentStream<CS> {
fn new(repo: &Arc<MononokeRepo>, top: HgNodeHash, bottom: HgNodeHash) -> Self {
fn new(repo: &MononokeRepo, top: HgNodeHash, bottom: HgNodeHash) -> Self {
ParentStream {
repo: repo.clone(),
n: top,
@ -424,7 +417,7 @@ impl HgCommands for RepoClient {
.collect()
})
.collect()
.traced(&self.trace, ops::BETWEEN, trace_args!())
.traced(self.trace(), ops::BETWEEN, trace_args!())
.timed(move |stats, _| {
scuba_logger
.add_stats(&stats)
@ -446,7 +439,7 @@ impl HgCommands for RepoClient {
.collect()
.map(|v| v.into_iter().collect())
.from_err()
.traced(&self.trace, ops::HEADS, trace_args!())
.traced(self.trace(), ops::HEADS, trace_args!())
.timed(move |stats, _| {
scuba_logger
.add_stats(&stats)
@ -458,7 +451,7 @@ impl HgCommands for RepoClient {
// @wireprotocommand('lookup', 'key')
fn lookup(&self, key: String) -> HgCommandRes<Bytes> {
info!(self.logger, "lookup: {:?}", key);
info!(self.logger(), "lookup: {:?}", key);
// TODO(stash): T25928839 lookup should support bookmarks and prefixes too
let repo = self.repo.blobrepo();
let mut scuba_logger = self.scuba_logger(ops::LOOKUP, None);
@ -488,7 +481,7 @@ impl HgCommands for RepoClient {
Ok(buf.freeze())
}
})
.traced(&self.trace, ops::LOOKUP, trace_args!())
.traced(self.trace(), ops::LOOKUP, trace_args!())
.timed(move |stats, _| {
scuba_logger
.add_stats(&stats)
@ -501,9 +494,9 @@ impl HgCommands for RepoClient {
// @wireprotocommand('known', 'nodes *'), but the '*' is ignored
fn known(&self, nodes: Vec<HgNodeHash>) -> HgCommandRes<Vec<bool>> {
if nodes.len() > MAX_NODES_TO_LOG {
info!(self.logger, "known: {:?}...", &nodes[..MAX_NODES_TO_LOG]);
info!(self.logger(), "known: {:?}...", &nodes[..MAX_NODES_TO_LOG]);
} else {
info!(self.logger, "known: {:?}", nodes);
info!(self.logger(), "known: {:?}", nodes);
}
let blobrepo = self.repo.blobrepo();
@ -513,7 +506,7 @@ impl HgCommands for RepoClient {
nodes
.into_iter()
.map(move |node| blobrepo.changeset_exists(&HgChangesetId::new(node))),
).traced(&self.trace, ops::KNOWN, trace_args!())
).traced(self.trace(), ops::KNOWN, trace_args!())
.timed(move |stats, _| {
scuba_logger
.add_stats(&stats)
@ -525,14 +518,14 @@ impl HgCommands for RepoClient {
// @wireprotocommand('getbundle', '*')
fn getbundle(&self, args: GetbundleArgs) -> HgCommandRes<Bytes> {
info!(self.logger, "Getbundle: {:?}", args);
info!(self.logger(), "Getbundle: {:?}", args);
let mut scuba_logger = self.scuba_logger(ops::GETBUNDLE, None);
match self.create_bundle(args) {
Ok(res) => res,
Err(err) => Err(err).into_future().boxify(),
}.traced(&self.trace, ops::GETBUNDLE, trace_args!())
}.traced(self.trace(), ops::GETBUNDLE, trace_args!())
.timed(move |stats, _| {
STATS::getbundle_ms.add_value(stats.completion_time.as_millis_unchecked() as i64);
scuba_logger
@ -545,7 +538,7 @@ impl HgCommands for RepoClient {
// @wireprotocommand('hello')
fn hello(&self) -> HgCommandRes<HashMap<String, Vec<String>>> {
info!(self.logger, "Hello -> capabilities");
info!(self.logger(), "Hello -> capabilities");
let mut res = HashMap::new();
let mut caps = wireprotocaps();
@ -555,7 +548,7 @@ impl HgCommands for RepoClient {
let mut scuba_logger = self.scuba_logger(ops::HELLO, None);
future::ok(res)
.traced(&self.trace, ops::HELLO, trace_args!())
.traced(self.trace(), ops::HELLO, trace_args!())
.timed(move |stats, _| {
scuba_logger
.add_stats(&stats)
@ -584,7 +577,7 @@ impl HgCommands for RepoClient {
.map(|(name, value)| (Vec::from(name.to_string()), value));
HashMap::from_iter(bookiter)
})
.traced(&self.trace, ops::LISTKEYS, trace_args!())
.traced(self.trace(), ops::LISTKEYS, trace_args!())
.timed(move |stats, _| {
scuba_logger
.add_stats(&stats)
@ -594,7 +587,7 @@ impl HgCommands for RepoClient {
.boxify()
} else {
info!(
self.get_logger(),
self.logger(),
"unsupported listkeys namespace: {}",
namespace
);
@ -612,13 +605,13 @@ impl HgCommands for RepoClient {
let res = bundle2_resolver::resolve(
self.repo.blobrepo(),
self.logger.new(o!("command" => "unbundle")),
self.logger().new(o!("command" => "unbundle")),
scuba_logger.clone(),
heads,
stream,
);
res.traced(&self.trace, ops::UNBUNDLE, trace_args!())
res.traced(self.trace(), ops::UNBUNDLE, trace_args!())
.timed(move |stats, _| {
scuba_logger
.add_stats(&stats)
@ -641,7 +634,7 @@ impl HgCommands for RepoClient {
let mut scuba_logger = self.scuba_logger(ops::GETTREEPACK, Some(args));
self.gettreepack_untimed(params)
.traced(&self.trace, ops::GETTREEPACK, trace_args!())
.traced(self.trace(), ops::GETTREEPACK, trace_args!())
.timed(move |stats, _| {
STATS::gettreepack_ms.add_value(stats.completion_time.as_millis_unchecked() as i64);
scuba_logger
@ -654,8 +647,8 @@ impl HgCommands for RepoClient {
// @wireprotocommand('getfiles', 'files*')
fn getfiles(&self, params: BoxStream<(HgNodeHash, MPath), Error>) -> BoxStream<Bytes, Error> {
let logger = self.logger.clone();
let trace = self.trace.clone();
let logger = self.logger().clone();
let trace = self.trace().clone();
info!(logger, "getfiles");
let this = self.clone();
@ -668,7 +661,7 @@ impl HgCommands for RepoClient {
let repo = this.repo.clone();
create_remotefilelog_blob(repo.blobrepo(), node, path.clone(), trace.clone())
.traced(
&this.trace,
this.trace(),
ops::GETFILES,
trace_args!("node" => node.to_string(), "path" => path.to_string()),
)

View File

@ -30,9 +30,11 @@ extern crate stats;
extern crate time_ext;
#[macro_use]
extern crate tracing;
extern crate uuid;
extern crate blobrepo;
extern crate bundle2_resolver;
extern crate context;
extern crate filenodes;
extern crate hgproto;
extern crate mercurial;

View File

@ -24,6 +24,7 @@ struct LogNormalGenerator {
distribution: LogNormal,
}
#[derive(Clone)]
pub struct MononokeRepo {
log_name: String,
blobrepo: Arc<BlobRepo>,

View File

@ -10,6 +10,7 @@
#[macro_use]
extern crate cloned;
extern crate context;
extern crate dns_lookup;
#[macro_use]
extern crate failure_ext as failure;

View File

@ -5,7 +5,6 @@
// GNU General Public License version 2 or any later version.
use std::collections::HashMap;
use std::sync::Arc;
use failure::prelude::*;
use futures::{future, Future};
@ -19,7 +18,12 @@ use ready_state::ReadyStateBuilder;
use repo_client::MononokeRepo;
use scuba_ext::{ScubaSampleBuilder, ScubaSampleBuilderExt};
pub type RepoHandler = (Logger, ScubaSampleBuilder, Arc<MononokeRepo>);
#[derive(Clone, Debug)]
pub struct RepoHandler {
pub logger: Logger,
pub scuba: ScubaSampleBuilder,
pub repo: MononokeRepo,
}
pub fn repo_handlers(
repos: impl IntoIterator<Item = (String, RepoConfig)>,
@ -50,15 +54,20 @@ pub fn repo_handlers(
let mut scuba_logger = ScubaSampleBuilder::with_opt_table(config.scuba_table.clone());
scuba_logger.add_common_server_data();
let repo = Arc::new(repo);
let initial_warmup =
cache_warmup(repo.blobrepo(), config.cache_warmup, listen_log.clone())
.context(format!("while warming up cache for repo: {}", reponame))
.from_err();
ready_handle
.wait_for(initial_warmup)
.map(move |()| (reponame, (listen_log, scuba_logger, repo)))
ready_handle.wait_for(initial_warmup).map(move |()| {
(
reponame,
RepoHandler {
logger: listen_log,
scuba: scuba_logger,
repo: repo,
},
)
})
})
.collect();

View File

@ -30,6 +30,8 @@ use sshrelay::{SenderBytesWrite, Stdio};
use repo_handlers::RepoHandler;
use context::CoreContext;
define_stats! {
prefix = "mononoke.request_handler";
wireproto_ms:
@ -37,10 +39,15 @@ define_stats! {
}
pub fn request_handler(
(logger, mut scuba_logger, repo): RepoHandler,
RepoHandler {
logger,
scuba,
repo,
}: RepoHandler,
stdio: Stdio,
addr: SocketAddr,
) -> impl Future<Item = (), Error = ()> {
let mut scuba_logger = scuba;
let Stdio {
stdin,
stdout,
@ -107,15 +114,17 @@ pub fn request_handler(
scuba_logger.log_with_msg("Connection established", None);
let ctxt = CoreContext {
session: session_uuid,
logger: conn_log.clone(),
scuba: scuba_logger.clone(),
trace: trace.clone(),
};
// Construct a hg protocol handler
let proto_handler = HgProtoHandler::new(
stdin,
RepoClient::new(
repo.clone(),
conn_log.clone(),
scuba_logger.clone(),
trace.clone(),
),
RepoClient::new(repo.clone(), ctxt),
sshproto::HgSshCommandDecode,
sshproto::HgSshCommandEncode,
&conn_log,