Add repl events (#15259)

Release Notes:

- N/A

---------

Co-authored-by: Kyle Kelley <rgbkrk@gmail.com>
This commit is contained in:
Joseph T. Lyons 2024-07-26 03:31:41 -04:00 committed by GitHub
parent 745d2e4d3b
commit 856d9632e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 190 additions and 37 deletions

1
Cargo.lock generated
View File

@ -8649,6 +8649,7 @@ dependencies = [
"anyhow",
"async-dispatcher",
"base64 0.13.1",
"client",
"collections",
"command_palette_hooks",
"editor",

View File

@ -18,7 +18,7 @@ use sysinfo::{CpuRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System};
use telemetry_events::{
ActionEvent, AppEvent, AssistantEvent, AssistantKind, CallEvent, CpuEvent, EditEvent,
EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent,
MemoryEvent, SettingEvent,
MemoryEvent, ReplEvent, SettingEvent,
};
use tempfile::NamedTempFile;
#[cfg(not(debug_assertions))]
@ -531,6 +531,21 @@ impl Telemetry {
}
}
pub fn report_repl_event(
self: &Arc<Self>,
kernel_language: String,
kernel_status: String,
repl_session_id: String,
) {
let event = Event::Repl(ReplEvent {
kernel_language,
kernel_status,
repl_session_id,
});
self.report_event(event)
}
fn report_event(self: &Arc<Self>, event: Event) {
let mut state = self.state.lock();

View File

@ -16,7 +16,7 @@ use sha2::{Digest, Sha256};
use std::sync::{Arc, OnceLock};
use telemetry_events::{
ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event,
EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent,
EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, ReplEvent,
SettingEvent,
};
use uuid::Uuid;
@ -518,6 +518,13 @@ pub async fn post_events(
checksum_matched,
))
}
Event::Repl(event) => to_upload.repl_events.push(ReplEventRow::from_event(
event.clone(),
&wrapper,
&request_body,
first_event_at,
checksum_matched,
)),
}
}
@ -542,6 +549,7 @@ struct ToUpload {
extension_events: Vec<ExtensionEventRow>,
edit_events: Vec<EditEventRow>,
action_events: Vec<ActionEventRow>,
repl_events: Vec<ReplEventRow>,
}
impl ToUpload {
@ -617,6 +625,11 @@ impl ToUpload {
.await
.with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
const REPL_EVENTS_TABLE: &str = "repl_events";
Self::upload_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client)
.await
.with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?;
Ok(())
}
@ -625,22 +638,24 @@ impl ToUpload {
rows: &[T],
clickhouse_client: &clickhouse::Client,
) -> anyhow::Result<()> {
if !rows.is_empty() {
let mut insert = clickhouse_client.insert(table)?;
for event in rows {
insert.write(event).await?;
}
insert.end().await?;
let event_count = rows.len();
log::info!(
"wrote {event_count} {event_specifier} to '{table}'",
event_specifier = if event_count == 1 { "event" } else { "events" }
);
if rows.is_empty() {
return Ok(());
}
let mut insert = clickhouse_client.insert(table)?;
for event in rows {
insert.write(event).await?;
}
insert.end().await?;
let event_count = rows.len();
log::info!(
"wrote {event_count} {event_specifier} to '{table}'",
event_specifier = if event_count == 1 { "event" } else { "events" }
);
Ok(())
}
}
@ -1189,6 +1204,62 @@ impl ExtensionEventRow {
}
}
#[derive(Serialize, Debug, clickhouse::Row)]
pub struct ReplEventRow {
// AppInfoBase
app_version: String,
major: Option<i32>,
minor: Option<i32>,
patch: Option<i32>,
checksum_matched: bool,
release_channel: String,
os_name: String,
os_version: String,
// ClientEventBase
installation_id: Option<String>,
session_id: Option<String>,
is_staff: Option<bool>,
time: i64,
// ReplEventRow
kernel_language: String,
kernel_status: String,
repl_session_id: String,
}
impl ReplEventRow {
fn from_event(
event: ReplEvent,
wrapper: &EventWrapper,
body: &EventRequestBody,
first_event_at: chrono::DateTime<chrono::Utc>,
checksum_matched: bool,
) -> Self {
let semver = body.semver();
let time =
first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
Self {
app_version: body.app_version.clone(),
major: semver.map(|v| v.major() as i32),
minor: semver.map(|v| v.minor() as i32),
patch: semver.map(|v| v.patch() as i32),
checksum_matched,
release_channel: body.release_channel.clone().unwrap_or_default(),
os_name: body.os_name.clone(),
os_version: body.os_version.clone().unwrap_or_default(),
installation_id: body.installation_id.clone(),
session_id: body.session_id.clone(),
is_staff: body.is_staff,
time: time.timestamp_millis(),
kernel_language: event.kernel_language,
kernel_status: event.kernel_status,
repl_session_id: event.repl_session_id,
}
}
}
#[derive(Serialize, Debug, clickhouse::Row)]
pub struct EditEventRow {
// AppInfoBase

View File

@ -17,6 +17,7 @@ alacritty_terminal.workspace = true
anyhow.workspace = true
async-dispatcher.workspace = true
base64.workspace = true
client.workspace = true
collections.workspace = true
command_palette_hooks.workspace = true
editor.workspace = true

View File

@ -24,13 +24,14 @@ pub use crate::repl_sessions_ui::{
};
use crate::repl_store::ReplStore;
pub use crate::session::Session;
use client::telemetry::Telemetry;
pub fn init(fs: Arc<dyn Fs>, cx: &mut AppContext) {
pub fn init(fs: Arc<dyn Fs>, telemetry: Arc<Telemetry>, cx: &mut AppContext) {
set_dispatcher(zed_dispatcher(cx));
JupyterSettings::register(cx);
::editor::init_settings(cx);
repl_sessions_ui::init(cx);
ReplStore::init(fs, cx);
ReplStore::init(fs, telemetry, cx);
}
fn zed_dispatcher(cx: &mut AppContext) -> impl Dispatcher {

View File

@ -42,12 +42,15 @@ pub fn run(editor: WeakView<Editor>, cx: &mut WindowContext) -> Result<()> {
})?;
let fs = store.read(cx).fs().clone();
let telemetry = store.read(cx).telemetry().clone();
let session = if let Some(session) = store.read(cx).get_session(editor.entity_id()).cloned()
{
session
} else {
let weak_editor = editor.downgrade();
let session = cx.new_view(|cx| Session::new(weak_editor, fs, kernel_specification, cx));
let session = cx
.new_view(|cx| Session::new(weak_editor, fs, telemetry, kernel_specification, cx));
editor.update(cx, |_editor, cx| {
cx.notify();

View File

@ -1,6 +1,7 @@
use std::sync::Arc;
use anyhow::Result;
use client::telemetry::Telemetry;
use collections::HashMap;
use command_palette_hooks::CommandPaletteFilter;
use gpui::{
@ -22,14 +23,15 @@ pub struct ReplStore {
enabled: bool,
sessions: HashMap<EntityId, View<Session>>,
kernel_specifications: Vec<KernelSpecification>,
telemetry: Arc<Telemetry>,
_subscriptions: Vec<Subscription>,
}
impl ReplStore {
const NAMESPACE: &'static str = "repl";
pub(crate) fn init(fs: Arc<dyn Fs>, cx: &mut AppContext) {
let store = cx.new_model(move |cx| Self::new(fs, cx));
pub(crate) fn init(fs: Arc<dyn Fs>, telemetry: Arc<Telemetry>, cx: &mut AppContext) {
let store = cx.new_model(move |cx| Self::new(fs, telemetry, cx));
store
.update(cx, |store, cx| store.refresh_kernelspecs(cx))
@ -42,13 +44,14 @@ impl ReplStore {
cx.global::<GlobalReplStore>().0.clone()
}
pub fn new(fs: Arc<dyn Fs>, cx: &mut ModelContext<Self>) -> Self {
pub fn new(fs: Arc<dyn Fs>, telemetry: Arc<Telemetry>, cx: &mut ModelContext<Self>) -> Self {
let subscriptions = vec![cx.observe_global::<SettingsStore>(move |this, cx| {
this.set_enabled(JupyterSettings::enabled(cx), cx);
})];
let this = Self {
fs,
telemetry,
enabled: JupyterSettings::enabled(cx),
sessions: HashMap::default(),
kernel_specifications: Vec::new(),
@ -62,6 +65,10 @@ impl ReplStore {
&self.fs
}
pub fn telemetry(&self) -> &Arc<Telemetry> {
&self.telemetry
}
pub fn is_enabled(&self) -> bool {
self.enabled
}

View File

@ -1,8 +1,10 @@
use crate::components::KernelListItem;
use crate::KernelStatus;
use crate::{
kernels::{Kernel, KernelSpecification, RunningKernel},
outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
};
use client::telemetry::Telemetry;
use collections::{HashMap, HashSet};
use editor::{
display_map::{
@ -34,6 +36,7 @@ pub struct Session {
blocks: HashMap<String, EditorBlock>,
messaging_task: Task<()>,
pub kernel_specification: KernelSpecification,
telemetry: Arc<Telemetry>,
_buffer_subscription: Subscription,
}
@ -205,9 +208,18 @@ impl Session {
pub fn new(
editor: WeakView<Editor>,
fs: Arc<dyn Fs>,
telemetry: Arc<Telemetry>,
kernel_specification: KernelSpecification,
cx: &mut ViewContext<Self>,
) -> Self {
let kernel_language = kernel_specification.kernelspec.language.clone();
telemetry.report_repl_event(
kernel_language.clone(),
KernelStatus::Starting.to_string(),
cx.entity_id().to_string(),
);
let entity_id = editor.entity_id();
let working_directory = editor
.upgrade()
@ -227,11 +239,11 @@ impl Session {
match kernel {
Ok((mut kernel, mut messages_rx)) => {
this.update(&mut cx, |this, cx| {
this.update(&mut cx, |session, cx| {
// At this point we can create a new kind of kernel that has the process and our long running background tasks
let status = kernel.process.status();
this.kernel = Kernel::RunningKernel(kernel);
session.kernel(Kernel::RunningKernel(kernel), cx);
cx.spawn(|session, mut cx| async move {
let error_message = match status.await {
@ -252,8 +264,10 @@ impl Session {
session
.update(&mut cx, |session, cx| {
session.kernel =
Kernel::ErroredLaunch(error_message.clone());
session.kernel(
Kernel::ErroredLaunch(error_message.clone()),
cx,
);
session.blocks.values().for_each(|block| {
block.execution_view.update(
@ -282,7 +296,7 @@ impl Session {
})
.detach();
this.messaging_task = cx.spawn(|session, mut cx| async move {
session.messaging_task = cx.spawn(|session, mut cx| async move {
while let Some(message) = messages_rx.next().await {
session
.update(&mut cx, |session, cx| {
@ -295,8 +309,8 @@ impl Session {
.ok();
}
Err(err) => {
this.update(&mut cx, |this, _cx| {
this.kernel = Kernel::ErroredLaunch(err.to_string());
this.update(&mut cx, |session, cx| {
session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
})
.ok();
}
@ -319,6 +333,7 @@ impl Session {
blocks: HashMap::default(),
kernel_specification,
_buffer_subscription: subscription,
telemetry,
};
}
@ -474,8 +489,8 @@ impl Session {
cx.spawn(|this, mut cx| async move {
task.await;
this.update(&mut cx, |this, cx| {
this.send(message, cx).ok();
this.update(&mut cx, |session, cx| {
session.send(message, cx).ok();
})
.ok();
})
@ -501,6 +516,13 @@ impl Session {
match &message.content {
JupyterMessageContent::Status(status) => {
self.kernel.set_execution_state(&status.execution_state);
self.telemetry.report_repl_event(
self.kernel_specification.kernelspec.language.clone(),
KernelStatus::from(&self.kernel).to_string(),
cx.entity_id().to_string(),
);
cx.notify();
}
JupyterMessageContent::KernelInfoReply(reply) => {
@ -528,6 +550,23 @@ impl Session {
}
}
pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
if let Kernel::Shutdown = kernel {
cx.emit(SessionEvent::Shutdown(self.editor.clone()));
}
let kernel_status = KernelStatus::from(&kernel).to_string();
let kernel_language = self.kernel_specification.kernelspec.language.clone();
self.telemetry.report_repl_event(
kernel_language,
kernel_status,
cx.entity_id().to_string(),
);
self.kernel = kernel;
}
pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
@ -544,10 +583,9 @@ impl Session {
kernel.process.kill().ok();
this.update(&mut cx, |this, cx| {
cx.emit(SessionEvent::Shutdown(this.editor.clone()));
this.clear_outputs(cx);
this.kernel = Kernel::Shutdown;
this.update(&mut cx, |session, cx| {
session.clear_outputs(cx);
session.kernel(Kernel::Shutdown, cx);
cx.notify();
})
.ok();

View File

@ -65,6 +65,7 @@ pub enum Event {
Extension(ExtensionEvent),
Edit(EditEvent),
Action(ActionEvent),
Repl(ReplEvent),
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@ -148,6 +149,13 @@ pub struct AppEvent {
pub operation: String,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ReplEvent {
pub kernel_language: String,
pub kernel_status: String,
pub repl_session_id: String,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct BacktraceFrame {
pub ip: usize,

View File

@ -169,7 +169,11 @@ fn init_common(app_state: Arc<AppState>, cx: &mut AppContext) {
supermaven::init(app_state.client.clone(), cx);
inline_completion_registry::init(app_state.client.telemetry().clone(), cx);
assistant::init(app_state.fs.clone(), app_state.client.clone(), cx);
repl::init(app_state.fs.clone(), cx);
repl::init(
app_state.fs.clone(),
app_state.client.telemetry().clone(),
cx,
);
extension::init(
app_state.fs.clone(),
app_state.client.clone(),

View File

@ -3460,7 +3460,11 @@ mod tests {
terminal_view::init(cx);
language_model::init(app_state.client.clone(), cx);
assistant::init(app_state.fs.clone(), app_state.client.clone(), cx);
repl::init(app_state.fs.clone(), cx);
repl::init(
app_state.fs.clone(),
app_state.client.telemetry().clone(),
cx,
);
tasks_ui::init(cx);
initialize_workspace(app_state.clone(), cx);
app_state