repo_listener: replace usages of tokio_core with tokio::runtime

Summary: As a bonus there is no need for spawning threads for separate tokio cores since tokio::runtime has built-in thread pool

Reviewed By: farnz

Differential Revision: D8863343

fbshipit-source-id: 8adf696640aec78e767574e8bf2925699a580ca0
This commit is contained in:
Lukas Piatkowski 2018-07-17 04:37:09 -07:00 committed by Facebook Github Bot
parent 81e872828a
commit a91b43bb47
5 changed files with 167 additions and 210 deletions

View File

@ -14,9 +14,9 @@ use futures::sync::mpsc;
use futures_ext::{BoxFuture, FutureExt, StreamExt};
use openssl::ssl::SslAcceptor;
use slog::Logger;
use tokio;
use tokio::net::{TcpListener, TcpStream};
use tokio_codec::{FramedRead, FramedWrite};
use tokio_core::reactor::{Core, Remote};
use tokio_io::{AsyncRead, AsyncWrite, IoStream};
use tokio_openssl::SslAcceptorExt;
@ -31,10 +31,8 @@ pub fn connection_acceptor(
root_log: Logger,
repo_senders: HashMap<String, mpsc::Sender<(Stdio, SocketAddr)>>,
tls_acceptor: SslAcceptor,
) -> ! {
let mut core = Core::new().expect("failed to create tokio core");
let remote = core.remote();
let connection_acceptor = listener(sockname)
) -> impl Future<Item = (), Error = Error> {
listener(sockname)
.expect("failed to create listener")
.map_err(Error::from)
.and_then({
@ -50,10 +48,9 @@ pub fn connection_acceptor(
tls_acceptor
.accept_async(sock)
.then({
let remote = remote.clone();
let root_log = root_log.clone();
move |sock| match sock {
Ok(sock) => ssh_server_mux(sock, remote.clone())
Ok(sock) => ssh_server_mux(sock)
.map(move |stdio| Some((stdio, addr)))
.or_else({
let root_log = root_log.clone();
@ -98,13 +95,7 @@ pub fn connection_acceptor(
Ok(()).into_future().boxify()
}
}
});
core.run(connection_acceptor)
.expect("failure while running listener on tokio core");
// The server is an infinite stream of connections
unreachable!();
})
}
fn listener<P>(sockname: P) -> io::Result<IoStream<TcpStream>>
@ -135,7 +126,7 @@ where
// As a server, given a stream to a client, return an Io pair with stdin/stdout, and an
// auxillary sink for stderr.
fn ssh_server_mux<S>(s: S, remote: Remote) -> BoxFuture<Stdio, Error>
fn ssh_server_mux<S>(s: S) -> BoxFuture<Stdio, Error>
where
S: AsyncRead + AsyncWrite + Send + 'static,
{
@ -180,7 +171,7 @@ where
.forward(wr);
// spawn a task for forwarding stdout/err into stream
remote.spawn(|_handle| fwd.discard());
tokio::spawn(fwd.discard());
(otx, etx)
};

View File

@ -8,7 +8,6 @@ pub use failure::{Error, Result, ResultExt};
#[derive(Debug, Fail)]
pub enum ErrorKind {
#[fail(display = "failed to initialize server: {}", _0)] Initialization(&'static str),
#[fail(display = "connection does not start with preamble")] NoConnectionPreamble,
#[fail(display = "connection error while reading preamble")] ConnectionError,
#[fail(display = "incorrect reponame: {}", _0)] IncorrectRepoName(String),

View File

@ -23,7 +23,6 @@ extern crate slog_kvfilter;
extern crate slog_term;
extern crate tokio;
extern crate tokio_codec;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_openssl;
extern crate tracing;
@ -43,10 +42,9 @@ mod errors;
mod repo_listen;
use std::collections::HashMap;
use std::thread::{self, JoinHandle};
use failure::SlogKVError;
use futures::sync::mpsc;
use futures::{future, Future, sync::mpsc};
use futures_ext::{BoxFuture, FutureExt};
use openssl::ssl::SslAcceptor;
use slog::Logger;
@ -61,7 +59,7 @@ pub fn start_repo_listeners<I>(
root_log: &Logger,
sockname: &str,
tls_acceptor: SslAcceptor,
) -> Result<(Vec<JoinHandle<!>>, ready_state::ReadyState)>
) -> (BoxFuture<(), Error>, ready_state::ReadyState)
where
I: IntoIterator<Item = (String, RepoConfig)>,
{
@ -74,7 +72,7 @@ where
let mut repo_senders = HashMap::new();
let mut ready = ready_state::ReadyStateBuilder::new();
let mut handles: Vec<_> = repos
let handles: Vec<_> = repos
.into_iter()
.filter(|(reponame, config)| {
if !config.enabled {
@ -90,38 +88,19 @@ where
// the sender. However each clone creates one more entry in the channel.
let (sender, receiver) = mpsc::channel(1);
repo_senders.insert(reponame.clone(), sender);
// start a thread for each repo to own the reactor and start listening for
// connections and detach it
thread::Builder::new()
.name(format!("listener_{:?}", config.repotype))
.spawn({
let root_log = root_log.clone();
move || repo_listen(reponame, config, root_log, ready_handle, receiver)
})
.map_err(Error::from)
// create a future for each repo and start listening for connections
repo_listen(reponame, config, root_log.clone(), ready_handle, receiver)
})
.collect();
let conn_acceptor_handle = thread::Builder::new()
.name(format!("connection_acceptor"))
.spawn({
let root_log = root_log.clone();
move || connection_acceptor(&sockname, root_log, repo_senders, tls_acceptor)
})
.map_err(Error::from);
let conn_acceptor_handle =
connection_acceptor(&sockname, root_log.clone(), repo_senders, tls_acceptor);
handles.push(conn_acceptor_handle);
if handles.iter().any(Result::is_err) {
for err in handles.into_iter().filter_map(Result::err) {
crit!(root_log, "Failed to spawn listener thread"; SlogKVError(err));
}
bail_err!(ErrorKind::Initialization(
"at least one of the listener threads failed to be spawned",
));
}
Ok((
handles.into_iter().filter_map(Result::ok).collect(),
(
future::join_all(handles)
.join(conn_acceptor_handle)
.map(|_: (Vec<()>, ())| ())
.boxify(),
ready.freeze(),
))
)
}

View File

@ -11,16 +11,16 @@ use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use dns_lookup::getnameinfo;
use failure::SlogKVError;
use failure::{SlogKVError, prelude::*};
use futures::{Future, IntoFuture, Sink, Stream};
use futures::sync::mpsc;
use futures_ext::{asynchronize, FutureExt};
use futures_ext::FutureExt;
use futures_stats::Timed;
use slog::{self, Drain, Level, Logger};
use slog_kvfilter::KVFilter;
use slog_term;
use tokio;
use tokio::util::FutureExt as TokioFutureExt;
use tokio_core::reactor::Core;
use tracing::TraceContext;
use uuid::Uuid;
@ -35,16 +35,14 @@ use sshrelay::{SenderBytesWrite, Stdio};
use errors::*;
/// Listener thread for a specific repo
/// Listener for a specific repo
pub fn repo_listen(
reponame: String,
config: RepoConfig,
root_log: Logger,
ready_handle: ReadyHandle,
input_stream: mpsc::Receiver<(Stdio, SocketAddr)>,
) -> ! {
let mut core = Core::new().expect("failed to create tokio core");
) -> impl Future<Item = (), Error = Error> {
let scuba_logger = ScubaSampleBuilder::with_opt_table(config.scuba_table.clone());
let repo = MononokeRepo::new(
@ -55,169 +53,158 @@ pub fn repo_listen(
let listen_log = root_log.new(o!("repo" => repo.path().clone()));
let handle = core.handle();
let repo = Arc::new(repo);
let initial_warmup = cache_warmup(repo.blobrepo(), config.cache_warmup, listen_log.clone())
.map_err({
let listen_log = listen_log.clone();
move |err| {
error!(listen_log, "failed to warmup cache: {}", err);
()
}
});
.context(format!("while warming up cache for repo: {}", reponame))
.from_err();
let initial_warmup = ready_handle.wait_for(initial_warmup);
let server = input_stream.for_each(move |(stdio, addr)| {
// Have a connection. Extract std{in,out,err} streams for socket
let Stdio {
stdin,
stdout,
stderr,
mut preamble,
} = stdio;
let server = input_stream
.for_each(move |(stdio, addr)| {
// Have a connection. Extract std{in,out,err} streams for socket
let Stdio {
stdin,
stdout,
stderr,
mut preamble,
} = stdio;
let session_uuid = match preamble
.misc
.get("session_uuid")
.and_then(|v| Uuid::parse_str(v).ok())
{
Some(session_uuid) => session_uuid,
None => {
let session_uuid = Uuid::new_v4();
preamble
.misc
.insert("session_uuid".to_owned(), format!("{}", session_uuid));
session_uuid
}
};
let wireproto_calls = Arc::new(Mutex::new(Vec::new()));
let trace = TraceContext::new(session_uuid, Instant::now());
let conn_log = {
let stderr_write = SenderBytesWrite {
chan: stderr.clone().wait(),
};
let client_drain = slog_term::PlainSyncDecorator::new(stderr_write);
let client_drain = slog_term::FullFormat::new(client_drain).build();
let client_drain = KVFilter::new(client_drain, Level::Critical)
.only_pass_any_on_all_keys(hashmap! {
"remote".into() => hashset!["true".into(), "remote_only".into()],
});
let server_drain = KVFilter::new(listen_log.clone(), Level::Critical)
.always_suppress_any(hashmap! {
"remote".into() => hashset!["remote_only".into()],
});
let drain = slog::Duplicate::new(client_drain, server_drain).ignore_res();
Logger::root(drain, o!("session_uuid" => format!("{}", session_uuid)))
};
let mut scuba_logger = {
let client_hostname = match getnameinfo(&addr, 0) {
Ok((hostname, _)) => hostname,
Err(err) => {
warn!(
conn_log,
"failed to lookup hostname for address {}, reason: {:?}", addr, err
);
"".to_owned()
let session_uuid = match preamble
.misc
.get("session_uuid")
.and_then(|v| Uuid::parse_str(v).ok())
{
Some(session_uuid) => session_uuid,
None => {
let session_uuid = Uuid::new_v4();
preamble
.misc
.insert("session_uuid".to_owned(), format!("{}", session_uuid));
session_uuid
}
};
let mut scuba_logger = scuba_logger.clone();
scuba_logger
.add_preamble(&preamble)
.add("client_hostname", client_hostname);
scuba_logger
};
scuba_logger.log_with_msg("Connection established", None);
let wireproto_calls = Arc::new(Mutex::new(Vec::new()));
let trace = TraceContext::new(session_uuid, Instant::now());
// Construct a hg protocol handler
let proto_handler = HgProtoHandler::new(
stdin,
RepoClient::new(repo.clone(), conn_log.clone(), scuba_logger.clone(), trace),
sshproto::HgSshCommandDecode,
sshproto::HgSshCommandEncode,
&conn_log,
wireproto_calls.clone(),
);
let conn_log = {
let stderr_write = SenderBytesWrite {
chan: stderr.clone().wait(),
};
let client_drain = slog_term::PlainSyncDecorator::new(stderr_write);
let client_drain = slog_term::FullFormat::new(client_drain).build();
let client_drain = KVFilter::new(client_drain, Level::Critical)
.only_pass_any_on_all_keys(hashmap! {
"remote".into() => hashset!["true".into(), "remote_only".into()],
});
// send responses back
let endres = if preamble.reponame == reponame {
proto_handler
.map_err(Error::from)
.forward(stdout)
.map(|_| ())
.boxify()
} else {
Err(ErrorKind::IncorrectRepoName(preamble.reponame).into())
.into_future()
.boxify()
};
let server_drain = KVFilter::new(listen_log.clone(), Level::Critical)
.always_suppress_any(hashmap! {
"remote".into() => hashset!["remote_only".into()],
});
// If we got an error at this point, then catch it, print a message and return
// Ok (if we allow the Error to propagate further it will shutdown the listener
// rather than just the connection). Unfortunately there's no way to print what the
// actual failing command was.
// TODO: (stash) T30523706 seems to leave the client hanging?
let drain = slog::Duplicate::new(client_drain, server_drain).ignore_res();
Logger::root(drain, o!("session_uuid" => format!("{}", session_uuid)))
};
// Don't wait for more that 15 mins for a request
let endres = endres
.deadline(Instant::now() + Duration::from_secs(900))
.timed(move |stats, result| {
let mut wireproto_calls = wireproto_calls.lock().expect("lock poisoned");
let wireproto_calls = mem::replace(wireproto_calls.deref_mut(), Vec::new());
scuba_logger
.add_stats(&stats)
.add("wireproto_commands", wireproto_calls);
match result {
Ok(_) => scuba_logger.log_with_msg("Request finished - Success", None),
Err(err) => if err.is_inner() {
scuba_logger
.log_with_msg("Request finished - Failure", format!("{:#?}", err));
} else if err.is_elapsed() {
scuba_logger.log_with_msg("Request finished - Timeout", None);
} else {
scuba_logger.log_with_msg(
"Request finished - Unexpected timer error",
format!("{:#?}", err),
let mut scuba_logger = {
let client_hostname = match getnameinfo(&addr, 0) {
Ok((hostname, _)) => hostname,
Err(err) => {
warn!(
conn_log,
"failed to lookup hostname for address {}, reason: {:?}", addr, err
);
},
}
Ok(())
})
.map_err(move |err| {
if err.is_inner() {
error!(conn_log, "Command failed";
"".to_owned()
}
};
let mut scuba_logger = scuba_logger.clone();
scuba_logger
.add_preamble(&preamble)
.add("client_hostname", client_hostname);
scuba_logger
};
scuba_logger.log_with_msg("Connection established", None);
// Construct a hg protocol handler
let proto_handler = HgProtoHandler::new(
stdin,
RepoClient::new(repo.clone(), conn_log.clone(), scuba_logger.clone(), trace),
sshproto::HgSshCommandDecode,
sshproto::HgSshCommandEncode,
&conn_log,
wireproto_calls.clone(),
);
// send responses back
let endres = if preamble.reponame == reponame {
proto_handler
.map_err(Error::from)
.forward(stdout)
.map(|_| ())
.boxify()
} else {
Err(ErrorKind::IncorrectRepoName(preamble.reponame).into())
.into_future()
.boxify()
};
// If we got an error at this point, then catch it, print a message and return
// Ok (if we allow the Error to propagate further it will shutdown the listener
// rather than just the connection). Unfortunately there's no way to print what the
// actual failing command was.
// TODO: (stash) T30523706 seems to leave the client hanging?
// Don't wait for more that 15 mins for a request
let endres = endres
.deadline(Instant::now() + Duration::from_secs(900))
.timed(move |stats, result| {
let mut wireproto_calls = wireproto_calls.lock().expect("lock poisoned");
let wireproto_calls = mem::replace(wireproto_calls.deref_mut(), Vec::new());
scuba_logger
.add_stats(&stats)
.add("wireproto_commands", wireproto_calls);
match result {
Ok(_) => scuba_logger.log_with_msg("Request finished - Success", None),
Err(err) => if err.is_inner() {
scuba_logger
.log_with_msg("Request finished - Failure", format!("{:#?}", err));
} else if err.is_elapsed() {
scuba_logger.log_with_msg("Request finished - Timeout", None);
} else {
scuba_logger.log_with_msg(
"Request finished - Unexpected timer error",
format!("{:#?}", err),
);
},
}
Ok(())
})
.map_err(move |err| {
if err.is_inner() {
error!(conn_log, "Command failed";
SlogKVError(err.into_inner().unwrap()),
"remote" => "true");
} else if err.is_elapsed() {
error!(conn_log, "Timeout while handling request";
} else if err.is_elapsed() {
error!(conn_log, "Timeout while handling request";
"remote" => "true");
} else {
crit!(conn_log, "Unexpected error";
} else {
crit!(conn_log, "Unexpected error";
SlogKVError(err.into_timer().unwrap().into()),
"remote" => "true");
}
format_err!("This is a dummy error, not supposed to be catched")
});
}
});
// Make this double async.
// TODO(stash, luk) is this really necessary?
handle.spawn(asynchronize(move || endres).then(|_| Ok(())));
// Execute the request without blocking the listener
tokio::spawn(endres);
Ok(())
})
// ignoring the "()" error
.then(|_| Ok(()));
Ok(())
});
let server = server.join(initial_warmup);
core.run(server)
.expect("failure while running listener on tokio core");
// The server is an infinite stream of connections
unreachable!();
server.join(initial_warmup).map(|((), ())| ())
}

View File

@ -175,7 +175,7 @@ fn main() {
.value_of("listening-host-port")
.expect("listening path must be specified"),
secure_utils::build_tls_acceptor(ssl).expect("failed to build tls acceptor"),
)?;
);
tracing_fb303::register();
@ -184,13 +184,14 @@ fn main() {
Some(handle) => Some(handle?),
};
tokio::run(stats_aggregation.map_err(|err| panic!("Unexpected error: {:#?}", err)));
tokio::run(
repo_listeners
.join(stats_aggregation.from_err())
.map(|((), ())| ())
.map_err(|err| panic!("Unexpected error: {:#?}", err)),
);
for handle in vec![]
.into_iter()
.chain(maybe_thrift.into_iter())
.chain(repo_listeners.into_iter())
{
if let Some(handle) = maybe_thrift {
let thread_name = handle.thread().name().unwrap_or("unknown").to_owned();
match handle.join() {
Ok(_) => panic!("unexpected success"),