mirror of
https://github.com/zed-industries/zed.git
synced 2024-11-07 20:39:04 +03:00
WIP: Enhance tracing in Peer
- Add a bunch of events to Peer's async connection handling logic - Use an EnvFilter to allow more control over the verbosity level of tracing on a per-module basis - Wire up logging to emit trace events (we actually probably want to do this the other way around) Co-Authored-By: Antonio Scandurra <me@as-cii.com>
This commit is contained in:
parent
85d9ac5b95
commit
9f6e82720d
32
Cargo.lock
generated
32
Cargo.lock
generated
@ -876,6 +876,7 @@ dependencies = [
|
||||
"tonic",
|
||||
"tower",
|
||||
"tracing",
|
||||
"tracing-log",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
"util",
|
||||
@ -2588,6 +2589,15 @@ version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
|
||||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
|
||||
dependencies = [
|
||||
"regex-automata",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matches"
|
||||
version = "0.1.8"
|
||||
@ -3734,6 +3744,15 @@ dependencies = [
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex-automata"
|
||||
version = "0.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
|
||||
dependencies = [
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.6.25"
|
||||
@ -3856,6 +3875,7 @@ dependencies = [
|
||||
"smol",
|
||||
"smol-timeout",
|
||||
"tempdir",
|
||||
"tracing",
|
||||
"util",
|
||||
"zstd",
|
||||
]
|
||||
@ -5244,9 +5264,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.26"
|
||||
version = "0.1.34"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d"
|
||||
checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"log",
|
||||
@ -5257,9 +5277,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-attributes"
|
||||
version = "0.1.15"
|
||||
version = "0.1.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c42e6fa53307c8a17e4ccd4dc81cf5ec38db9209f59b222210375b54ee40d1e2"
|
||||
checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@ -5317,9 +5337,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596"
|
||||
dependencies = [
|
||||
"ansi_term 0.12.1",
|
||||
"lazy_static",
|
||||
"matchers",
|
||||
"regex",
|
||||
"sharded-slab",
|
||||
"smallvec",
|
||||
"thread_local",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-log",
|
||||
]
|
||||
|
@ -44,9 +44,10 @@ tokio-tungstenite = "0.17"
|
||||
tonic = "0.6"
|
||||
tower = "0.4"
|
||||
toml = "0.5.8"
|
||||
tracing = "0.1"
|
||||
tracing = "0.1.34"
|
||||
tracing-log = "0.1.3"
|
||||
tracing-opentelemetry = "0.17"
|
||||
tracing-subscriber = "0.3"
|
||||
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
|
||||
|
||||
[dependencies.sqlx]
|
||||
version = "0.5.2"
|
||||
|
@ -1,3 +1,2 @@
|
||||
ZED_ENVIRONMENT=production
|
||||
RUST_LOG=info
|
||||
TRACE_LEVEL=debug
|
||||
RUST_LOG=info,rpc=debug
|
||||
|
@ -1,3 +1,2 @@
|
||||
ZED_ENVIRONMENT=staging
|
||||
RUST_LOG=info
|
||||
TRACE_LEVEL=debug
|
||||
RUST_LOG=info,rpc=debug
|
||||
|
@ -83,8 +83,6 @@ spec:
|
||||
key: token
|
||||
- name: RUST_LOG
|
||||
value: ${RUST_LOG}
|
||||
- name: TRACE_LEVEL
|
||||
value: ${TRACE_LEVEL}
|
||||
- name: HONEYCOMB_DATASET
|
||||
value: "collab"
|
||||
- name: HONEYCOMB_API_KEY
|
||||
|
@ -11,7 +11,9 @@ use std::{
|
||||
net::{SocketAddr, TcpListener},
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::metadata::LevelFilter;
|
||||
use tracing_log::LogTracer;
|
||||
use tracing_subscriber::filter::EnvFilter;
|
||||
use util::ResultExt;
|
||||
|
||||
#[derive(Default, Deserialize)]
|
||||
pub struct Config {
|
||||
@ -20,7 +22,7 @@ pub struct Config {
|
||||
pub api_token: String,
|
||||
pub honeycomb_api_key: Option<String>,
|
||||
pub honeycomb_dataset: Option<String>,
|
||||
pub trace_level: Option<String>,
|
||||
pub rust_log: Option<String>,
|
||||
}
|
||||
|
||||
pub struct AppState {
|
||||
@ -41,8 +43,6 @@ impl AppState {
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
env_logger::init();
|
||||
|
||||
if let Err(error) = env::load_dotenv() {
|
||||
log::error!(
|
||||
"error loading .env.toml (this is expected in production): {}",
|
||||
@ -119,42 +119,44 @@ pub fn init_tracing(config: &Config) -> Option<()> {
|
||||
use std::str::FromStr;
|
||||
use tracing_opentelemetry::OpenTelemetryLayer;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
let rust_trace = config.rust_log.clone()?;
|
||||
|
||||
let (honeycomb_api_key, honeycomb_dataset) = config
|
||||
LogTracer::init().log_err()?;
|
||||
|
||||
let open_telemetry_layer = config
|
||||
.honeycomb_api_key
|
||||
.clone()
|
||||
.zip(config.honeycomb_dataset.clone())?;
|
||||
.zip(config.honeycomb_dataset.clone())
|
||||
.map(|(honeycomb_api_key, honeycomb_dataset)| {
|
||||
let mut metadata = tonic::metadata::MetadataMap::new();
|
||||
metadata.insert("x-honeycomb-team", honeycomb_api_key.parse().unwrap());
|
||||
let tracer = opentelemetry_otlp::new_pipeline()
|
||||
.tracing()
|
||||
.with_exporter(
|
||||
opentelemetry_otlp::new_exporter()
|
||||
.tonic()
|
||||
.with_endpoint("https://api.honeycomb.io")
|
||||
.with_metadata(metadata),
|
||||
)
|
||||
.with_trace_config(opentelemetry::sdk::trace::config().with_resource(
|
||||
opentelemetry::sdk::Resource::new(vec![KeyValue::new(
|
||||
"service.name",
|
||||
honeycomb_dataset,
|
||||
)]),
|
||||
))
|
||||
.install_batch(opentelemetry::runtime::Tokio)
|
||||
.expect("failed to initialize tracing");
|
||||
|
||||
let mut metadata = tonic::metadata::MetadataMap::new();
|
||||
metadata.insert("x-honeycomb-team", honeycomb_api_key.parse().unwrap());
|
||||
let tracer = opentelemetry_otlp::new_pipeline()
|
||||
.tracing()
|
||||
.with_exporter(
|
||||
opentelemetry_otlp::new_exporter()
|
||||
.tonic()
|
||||
.with_endpoint("https://api.honeycomb.io")
|
||||
.with_metadata(metadata),
|
||||
)
|
||||
.with_trace_config(opentelemetry::sdk::trace::config().with_resource(
|
||||
opentelemetry::sdk::Resource::new(vec![KeyValue::new(
|
||||
"service.name",
|
||||
honeycomb_dataset,
|
||||
)]),
|
||||
))
|
||||
.install_batch(opentelemetry::runtime::Tokio)
|
||||
.expect("failed to initialize tracing");
|
||||
OpenTelemetryLayer::new(tracer)
|
||||
});
|
||||
|
||||
let subscriber = tracing_subscriber::Registry::default()
|
||||
.with(OpenTelemetryLayer::new(tracer))
|
||||
.with(tracing_subscriber::fmt::layer())
|
||||
.with(open_telemetry_layer)
|
||||
.with(
|
||||
config
|
||||
.trace_level
|
||||
.as_ref()
|
||||
.map_or(LevelFilter::INFO, |level| {
|
||||
LevelFilter::from_str(level).unwrap()
|
||||
}),
|
||||
);
|
||||
tracing_subscriber::fmt::layer()
|
||||
.event_format(tracing_subscriber::fmt::format().pretty()),
|
||||
)
|
||||
.with(EnvFilter::from_str(rust_trace.as_str()).log_err()?);
|
||||
|
||||
tracing::subscriber::set_global_default(subscriber).unwrap();
|
||||
|
||||
|
@ -28,6 +28,7 @@ rand = "0.8"
|
||||
rsa = "0.4"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
smol-timeout = "0.6"
|
||||
tracing = "0.1.34"
|
||||
zstd = "0.9"
|
||||
|
||||
[build-dependencies]
|
||||
|
@ -9,6 +9,7 @@ use futures::{
|
||||
stream::BoxStream,
|
||||
FutureExt, SinkExt, StreamExt,
|
||||
};
|
||||
use log::as_debug;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use smol_timeout::TimeoutExt;
|
||||
use std::sync::atomic::Ordering::SeqCst;
|
||||
@ -22,6 +23,7 @@ use std::{
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
use tracing::instrument;
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
|
||||
pub struct ConnectionId(pub u32);
|
||||
@ -108,6 +110,7 @@ impl Peer {
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn add_connection<F, Fut, Out>(
|
||||
self: &Arc<Self>,
|
||||
connection: Connection,
|
||||
@ -145,9 +148,12 @@ impl Peer {
|
||||
let this = self.clone();
|
||||
let response_channels = connection_state.response_channels.clone();
|
||||
let handle_io = async move {
|
||||
log::debug!(connection_id = connection_id.0; "handle io future: start");
|
||||
|
||||
let _end_connection = util::defer(|| {
|
||||
response_channels.lock().take();
|
||||
this.connections.write().remove(&connection_id);
|
||||
log::debug!(connection_id = connection_id.0; "handle io future: end");
|
||||
});
|
||||
|
||||
// Send messages on this frequency so the connection isn't closed.
|
||||
@ -159,49 +165,68 @@ impl Peer {
|
||||
futures::pin_mut!(receive_timeout);
|
||||
|
||||
loop {
|
||||
log::debug!(connection_id = connection_id.0; "outer loop iteration start");
|
||||
let read_message = reader.read().fuse();
|
||||
futures::pin_mut!(read_message);
|
||||
|
||||
loop {
|
||||
log::debug!(connection_id = connection_id.0; "inner loop iteration start");
|
||||
futures::select_biased! {
|
||||
outgoing = outgoing_rx.next().fuse() => match outgoing {
|
||||
Some(outgoing) => {
|
||||
log::debug!(connection_id = connection_id.0; "outgoing rpc message: writing");
|
||||
if let Some(result) = writer.write(outgoing).timeout(WRITE_TIMEOUT).await {
|
||||
log::debug!(connection_id = connection_id.0; "outgoing rpc message: done writing");
|
||||
result.context("failed to write RPC message")?;
|
||||
log::debug!(connection_id = connection_id.0; "keepalive interval: resetting after sending message");
|
||||
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
|
||||
} else {
|
||||
log::debug!(connection_id = connection_id.0; "outgoing rpc message: writing timed out");
|
||||
Err(anyhow!("timed out writing message"))?;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
log::info!("outgoing channel closed");
|
||||
log::debug!(connection_id = connection_id.0; "outgoing rpc message: channel closed");
|
||||
return Ok(())
|
||||
},
|
||||
},
|
||||
incoming = read_message => {
|
||||
log::debug!(connection_id = connection_id.0; "incoming rpc message: received");
|
||||
let incoming = incoming.context("received invalid RPC message")?;
|
||||
log::debug!(connection_id = connection_id.0; "receive timeout: resetting");
|
||||
receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse());
|
||||
if let proto::Message::Envelope(incoming) = incoming {
|
||||
log::debug!(connection_id = connection_id.0; "incoming rpc message: processing");
|
||||
match incoming_tx.send(incoming).timeout(RECEIVE_TIMEOUT).await {
|
||||
Some(Ok(_)) => {},
|
||||
Some(Ok(_)) => {
|
||||
log::debug!(connection_id = connection_id.0; "incoming rpc message: processed");
|
||||
},
|
||||
Some(Err(_)) => {
|
||||
log::info!("incoming channel closed");
|
||||
log::debug!(connection_id = connection_id.0; "incoming rpc message: channel closed");
|
||||
return Ok(())
|
||||
},
|
||||
None => Err(anyhow!("timed out processing incoming message"))?,
|
||||
None => {
|
||||
log::debug!(connection_id = connection_id.0; "incoming rpc message: processing timed out");
|
||||
Err(anyhow!("timed out processing incoming message"))?
|
||||
},
|
||||
}
|
||||
}
|
||||
break;
|
||||
},
|
||||
_ = keepalive_timer => {
|
||||
log::debug!(connection_id = connection_id.0; "keepalive interval: pinging");
|
||||
if let Some(result) = writer.write(proto::Message::Ping).timeout(WRITE_TIMEOUT).await {
|
||||
log::debug!(connection_id = connection_id.0; "keepalive interval: done pinging");
|
||||
result.context("failed to send keepalive")?;
|
||||
log::debug!(connection_id = connection_id.0; "keepalive interval: resetting after pinging");
|
||||
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
|
||||
} else {
|
||||
log::debug!(connection_id = connection_id.0; "keepalive interval: pinging timed out");
|
||||
Err(anyhow!("timed out sending keepalive"))?;
|
||||
}
|
||||
}
|
||||
_ = receive_timeout => {
|
||||
log::debug!(connection_id = connection_id.0; "receive timeout: delay between messages too long");
|
||||
Err(anyhow!("delay between messages too long"))?
|
||||
}
|
||||
}
|
||||
@ -217,25 +242,71 @@ impl Peer {
|
||||
let incoming_rx = incoming_rx.filter_map(move |incoming| {
|
||||
let response_channels = response_channels.clone();
|
||||
async move {
|
||||
let message_id = incoming.id;
|
||||
log::debug!(incoming = as_debug!(&incoming); "incoming message future: start");
|
||||
let _end = util::defer(move || {
|
||||
log::debug!(
|
||||
connection_id = connection_id.0,
|
||||
message_id = message_id;
|
||||
"incoming message future: end"
|
||||
);
|
||||
});
|
||||
|
||||
if let Some(responding_to) = incoming.responding_to {
|
||||
log::debug!(
|
||||
connection_id = connection_id.0,
|
||||
message_id = message_id,
|
||||
responding_to = responding_to;
|
||||
"incoming response: received"
|
||||
);
|
||||
let channel = response_channels.lock().as_mut()?.remove(&responding_to);
|
||||
if let Some(tx) = channel {
|
||||
let requester_resumed = oneshot::channel();
|
||||
if let Err(error) = tx.send((incoming, requester_resumed.0)) {
|
||||
log::debug!(
|
||||
"received RPC but request future was dropped {:?}",
|
||||
error.0
|
||||
connection_id = connection_id.0,
|
||||
message_id = message_id,
|
||||
responding_to = responding_to,
|
||||
error = as_debug!(error);
|
||||
"incoming response: request future dropped",
|
||||
);
|
||||
}
|
||||
|
||||
log::debug!(
|
||||
connection_id = connection_id.0,
|
||||
message_id = message_id,
|
||||
responding_to = responding_to;
|
||||
"incoming response: waiting to resume requester"
|
||||
);
|
||||
let _ = requester_resumed.1.await;
|
||||
log::debug!(
|
||||
connection_id = connection_id.0,
|
||||
message_id = message_id,
|
||||
responding_to = responding_to;
|
||||
"incoming response: requester resumed"
|
||||
);
|
||||
} else {
|
||||
log::warn!("received RPC response to unknown request {}", responding_to);
|
||||
log::warn!(
|
||||
connection_id = connection_id.0,
|
||||
message_id = message_id,
|
||||
responding_to = responding_to;
|
||||
"incoming response: unknown request"
|
||||
);
|
||||
}
|
||||
|
||||
None
|
||||
} else {
|
||||
log::debug!(
|
||||
connection_id = connection_id.0,
|
||||
message_id = message_id;
|
||||
"incoming message: received"
|
||||
);
|
||||
proto::build_typed_envelope(connection_id, incoming).or_else(|| {
|
||||
log::error!("unable to construct a typed envelope");
|
||||
log::error!(
|
||||
connection_id = connection_id.0,
|
||||
message_id = message_id;
|
||||
"unable to construct a typed envelope"
|
||||
);
|
||||
None
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user