Merge pull request #1212 from zed-industries/persist-project-activity

Add an API that returns the most active zed users and the projects where they've been active
This commit is contained in:
Antonio Scandurra 2022-06-21 18:16:29 +02:00 committed by GitHub
commit 69bd6bf1f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 870 additions and 374 deletions

309
Cargo.lock generated
View File

@ -261,9 +261,9 @@ checksum = "2f23d769dbf1838d5df5156e7b1ad404f4c463d1ac2c6aeb6cd943630f8a8400"
dependencies = [
"futures-core",
"futures-io",
"rustls",
"webpki",
"webpki-roots",
"rustls 0.19.1",
"webpki 0.21.4",
"webpki-roots 0.21.1",
]
[[package]]
@ -293,9 +293,9 @@ dependencies = [
[[package]]
name = "atoi"
version = "0.4.0"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5"
checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e"
dependencies = [
"num-traits",
]
@ -446,12 +446,6 @@ dependencies = [
"rustc-demangle",
]
[[package]]
name = "base-x"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc19a4937b4fbd3fe3379793130e42060d10627a360f2127802b10b87e7baf74"
[[package]]
name = "base64"
version = "0.12.3"
@ -670,7 +664,7 @@ dependencies = [
"postage",
"settings",
"theme",
"time 0.3.9",
"time 0.3.10",
"util",
"workspace",
]
@ -803,7 +797,7 @@ dependencies = [
"smol",
"sum_tree",
"thiserror",
"time 0.3.9",
"time 0.3.10",
"tiny_http",
"url",
"util",
@ -893,7 +887,7 @@ dependencies = [
"sha-1 0.9.8",
"sqlx",
"theme",
"time 0.2.27",
"time 0.3.10",
"tokio",
"tokio-tungstenite",
"toml",
@ -946,12 +940,6 @@ dependencies = [
"cache-padded",
]
[[package]]
name = "const_fn"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbdcdcb6d86f71c5e97409ad45898af11cbc995b4ee8112d59095a28d376c935"
[[package]]
name = "contacts_panel"
version = "0.1.0"
@ -1056,18 +1044,18 @@ dependencies = [
[[package]]
name = "crc"
version = "2.1.0"
version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23"
checksum = "53757d12b596c16c78b83458d732a5d1a17ab3f53f2f7412f6fb57cc8a140ab3"
dependencies = [
"crc-catalog",
]
[[package]]
name = "crc-catalog"
version = "1.1.1"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403"
checksum = "2d0165d2900ae6778e36e80bbc4da3b5eefccee9ba939761f9c2882a5d9af3ff"
[[package]]
name = "crc32fast"
@ -1340,12 +1328,6 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "discard"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0"
[[package]]
name = "dotenv"
version = "0.15.0"
@ -1930,7 +1912,7 @@ dependencies = [
"smallvec",
"smol",
"sum_tree",
"time 0.3.9",
"time 0.3.10",
"tiny-skia",
"tree-sitter",
"usvg",
@ -1971,17 +1953,23 @@ name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
[[package]]
name = "hashbrown"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db0d4cf898abf0081f964436dc980e96670a0f36863e4b83aaacdb65c9d7ccc3"
dependencies = [
"ahash",
]
[[package]]
name = "hashlink"
version = "0.7.0"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf"
checksum = "d452c155cb93fecdfb02a73dd57b5d8e442c2063bd7aac72f1bc5e4263a43086"
dependencies = [
"hashbrown",
"hashbrown 0.12.1",
]
[[package]]
@ -2229,7 +2217,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6012d540c5baa3589337a98ce73408de9b5a25ec9fc2c6fd6be8f0d39e0ca5a"
dependencies = [
"autocfg 1.1.0",
"hashbrown",
"hashbrown 0.11.2",
]
[[package]]
@ -2271,7 +2259,7 @@ dependencies = [
"rand 0.7.3",
"serde",
"tempfile",
"uuid",
"uuid 0.8.2",
"winapi 0.3.9",
]
@ -3120,7 +3108,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39fe46acc5503595e5949c17b818714d26fdf9b4920eacf3b2947f0199f4a6ff"
dependencies = [
"rustc_version 0.3.3",
"rustc_version",
]
[[package]]
@ -3250,7 +3238,7 @@ dependencies = [
"indexmap",
"line-wrap",
"serde",
"time 0.3.9",
"time 0.3.10",
"xml-rs",
]
@ -3331,12 +3319,6 @@ dependencies = [
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro2"
version = "1.0.39"
@ -3927,22 +3909,13 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
dependencies = [
"semver 0.9.0",
]
[[package]]
name = "rustc_version"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee"
dependencies = [
"semver 0.11.0",
"semver",
]
[[package]]
@ -3954,8 +3927,29 @@ dependencies = [
"base64 0.13.0",
"log",
"ring",
"sct",
"webpki",
"sct 0.6.1",
"webpki 0.21.4",
]
[[package]]
name = "rustls"
version = "0.20.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033"
dependencies = [
"log",
"ring",
"sct 0.7.0",
"webpki 0.22.0",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7522c9de787ff061458fe9a829dc790a3f5b22dc571694fc5883f448b94d9a9"
dependencies = [
"base64 0.13.0",
]
[[package]]
@ -4083,6 +4077,16 @@ dependencies = [
"untrusted",
]
[[package]]
name = "sct"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "seahash"
version = "4.1.0"
@ -4135,30 +4139,15 @@ dependencies = [
"libc",
]
[[package]]
name = "semver"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
dependencies = [
"semver-parser 0.7.0",
]
[[package]]
name = "semver"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6"
dependencies = [
"semver-parser 0.10.2",
"semver-parser",
]
[[package]]
name = "semver-parser"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "semver-parser"
version = "0.10.2"
@ -4321,21 +4310,6 @@ dependencies = [
"digest 0.10.3",
]
[[package]]
name = "sha1"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1da05c97445caa12d05e848c4a4fcbbea29e748ac28f7e80e9b010392063770"
dependencies = [
"sha1_smol",
]
[[package]]
name = "sha1_smol"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012"
[[package]]
name = "sha2"
version = "0.9.9"
@ -4526,9 +4500,9 @@ dependencies = [
[[package]]
name = "sqlx"
version = "0.5.13"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "551873805652ba0d912fec5bbb0f8b4cdd96baf8e2ebf5970e5671092966019b"
checksum = "1f82cbe94f41641d6c410ded25bbf5097c240cefdf8e3b06d04198d0a96af6a4"
dependencies = [
"sqlx-core",
"sqlx-macros",
@ -4536,9 +4510,9 @@ dependencies = [
[[package]]
name = "sqlx-core"
version = "0.5.13"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e48c61941ccf5ddcada342cd59e3e5173b007c509e1e8e990dafc830294d9dc5"
checksum = "6b69bf218860335ddda60d6ce85ee39f6cf6e5630e300e19757d1de15886a093"
dependencies = [
"ahash",
"atoi",
@ -4569,7 +4543,8 @@ dependencies = [
"paste",
"percent-encoding",
"rand 0.8.5",
"rustls",
"rustls 0.20.6",
"rustls-pemfile",
"serde",
"serde_json",
"sha-1 0.10.0",
@ -4579,20 +4554,19 @@ dependencies = [
"sqlx-rt",
"stringprep",
"thiserror",
"time 0.2.27",
"time 0.3.10",
"tokio-stream",
"url",
"uuid",
"webpki",
"webpki-roots",
"uuid 1.1.2",
"webpki-roots 0.22.3",
"whoami",
]
[[package]]
name = "sqlx-macros"
version = "0.5.13"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc0fba2b0cae21fc00fe6046f8baa4c7fcb49e379f0f592b04696607f69ed2e1"
checksum = "f40c63177cf23d356b159b60acd27c54af7423f1736988502e36bae9a712118f"
dependencies = [
"dotenv",
"either",
@ -4609,79 +4583,21 @@ dependencies = [
[[package]]
name = "sqlx-rt"
version = "0.5.13"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4db708cd3e459078f85f39f96a00960bd841f66ee2a669e90bf36907f5a79aae"
checksum = "874e93a365a598dc3dadb197565952cb143ae4aa716f7bcc933a8d836f6bf89f"
dependencies = [
"once_cell",
"tokio",
"tokio-rustls",
]
[[package]]
name = "standback"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e113fb6f3de07a243d434a56ec6f186dfd51cb08448239fe7bcae73f87ff28ff"
dependencies = [
"version_check",
]
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stdweb"
version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d022496b16281348b52d0e30ae99e01a73d737b2f45d38fed4edf79f9325a1d5"
dependencies = [
"discard",
"rustc_version 0.2.3",
"stdweb-derive",
"stdweb-internal-macros",
"stdweb-internal-runtime",
"wasm-bindgen",
]
[[package]]
name = "stdweb-derive"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c87a60a40fccc84bef0652345bbbbbe20a605bf5d0ce81719fc476f5c03b50ef"
dependencies = [
"proc-macro2",
"quote",
"serde",
"serde_derive",
"syn",
]
[[package]]
name = "stdweb-internal-macros"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58fa5ff6ad0d98d1ffa8cb115892b6e69d67799f6763e162a1c9db421dc22e11"
dependencies = [
"base-x",
"proc-macro2",
"quote",
"serde",
"serde_derive",
"serde_json",
"sha1",
"syn",
]
[[package]]
name = "stdweb-internal-runtime"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0"
[[package]]
name = "stringprep"
version = "0.1.2"
@ -4945,52 +4861,22 @@ dependencies = [
[[package]]
name = "time"
version = "0.2.27"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4752a97f8eebd6854ff91f1c1824cd6160626ac4bd44287f7f4ea2035a02a242"
dependencies = [
"const_fn",
"libc",
"standback",
"stdweb",
"time-macros",
"version_check",
"winapi 0.3.9",
]
[[package]]
name = "time"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd"
checksum = "82501a4c1c0330d640a6e176a3d6a204f5ec5237aca029029d21864a902e27b0"
dependencies = [
"itoa",
"libc",
"num_threads",
"serde",
"time-macros",
]
[[package]]
name = "time-macros"
version = "0.1.1"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "957e9c6e26f12cb6d0dd7fc776bb67a706312e7299aed74c8dd5b17ebb27e2f1"
dependencies = [
"proc-macro-hack",
"time-macros-impl",
]
[[package]]
name = "time-macros-impl"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd3c141a1b43194f3f56a1411225df8646c55781d5f26db825b3d98507eb482f"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"standback",
"syn",
]
checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792"
[[package]]
name = "tiny-skia"
@ -5087,13 +4973,13 @@ dependencies = [
[[package]]
name = "tokio-rustls"
version = "0.22.0"
version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
"rustls",
"rustls 0.20.6",
"tokio",
"webpki",
"webpki 0.22.0",
]
[[package]]
@ -5651,6 +5537,12 @@ dependencies = [
"getrandom 0.2.6",
]
[[package]]
name = "uuid"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f"
[[package]]
name = "valuable"
version = "0.1.0"
@ -5839,13 +5731,32 @@ dependencies = [
"untrusted",
]
[[package]]
name = "webpki"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "webpki-roots"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940"
dependencies = [
"webpki",
"webpki 0.21.4",
]
[[package]]
name = "webpki-roots"
version = "0.22.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d8de8415c823c8abd270ad483c6feeac771fad964890779f9a8cb24fbbc1bf"
dependencies = [
"webpki 0.22.0",
]
[[package]]

View File

@ -17,4 +17,4 @@ theme = { path = "../theme" }
util = { path = "../util" }
workspace = { path = "../workspace" }
postage = { version = "0.4.1", features = ["futures-traits"] }
time = "0.3"
time = { version = "0.3", features = ["serde", "serde-well-known"] }

View File

@ -29,7 +29,7 @@ postage = { version = "0.4.1", features = ["futures-traits"] }
rand = "0.8.3"
smol = "1.2.5"
thiserror = "1.0.29"
time = "0.3"
time = { version = "0.3", features = ["serde", "serde-well-known"] }
tiny_http = "0.8"
url = "2.2"

View File

@ -38,7 +38,7 @@ scrypt = "0.7"
serde = { version = "1.0", features = ["derive", "rc"] }
serde_json = "1.0"
sha-1 = "0.9"
time = "0.2"
time = { version = "0.3", features = ["serde", "serde-well-known"] }
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.17"
tonic = "0.6"
@ -49,7 +49,7 @@ tracing-log = "0.1.3"
tracing-subscriber = { version = "0.3.11", features = ["env-filter", "json"] }
[dependencies.sqlx]
version = "0.5.2"
version = "0.6"
features = ["runtime-tokio-rustls", "postgres", "time", "uuid"]
[dev-dependencies]

View File

@ -0,0 +1,24 @@
CREATE TABLE IF NOT EXISTS "projects" (
"id" SERIAL PRIMARY KEY,
"host_user_id" INTEGER REFERENCES users (id) NOT NULL,
"unregistered" BOOLEAN NOT NULL DEFAULT false
);
CREATE TABLE IF NOT EXISTS "worktree_extensions" (
"id" SERIAL PRIMARY KEY,
"project_id" INTEGER REFERENCES projects (id) NOT NULL,
"worktree_id" INTEGER NOT NULL,
"extension" VARCHAR(255),
"count" INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS "project_activity_periods" (
"id" SERIAL PRIMARY KEY,
"duration_millis" INTEGER NOT NULL,
"ended_at" TIMESTAMP NOT NULL,
"user_id" INTEGER REFERENCES users (id) NOT NULL,
"project_id" INTEGER REFERENCES projects (id) NOT NULL
);
CREATE INDEX "index_project_activity_periods_on_ended_at" ON "project_activity_periods" ("ended_at");
CREATE UNIQUE INDEX "index_worktree_extensions_on_project_id_and_worktree_id_and_extension" ON "worktree_extensions" ("project_id", "worktree_id", "extension");

View File

@ -1,6 +1,6 @@
use crate::{
auth,
db::{User, UserId},
db::{ProjectId, User, UserId},
rpc::{self, ResultExt},
AppState, Error, Result,
};
@ -16,7 +16,9 @@ use axum::{
};
use axum_extra::response::ErasedJson;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::sync::Arc;
use time::OffsetDateTime;
use tower::ServiceBuilder;
use tracing::instrument;
@ -32,6 +34,11 @@ pub fn routes(rpc_server: &Arc<rpc::Server>, state: Arc<AppState>) -> Router<Bod
.route("/invite_codes/:code", get(get_user_for_invite_code))
.route("/panic", post(trace_panic))
.route("/rpc_server_snapshot", get(get_rpc_server_snapshot))
.route(
"/project_activity_summary",
get(get_project_activity_summary),
)
.route("/project_metadata", get(get_project_metadata))
.layer(
ServiceBuilder::new()
.layer(Extension(state))
@ -239,6 +246,41 @@ async fn get_rpc_server_snapshot(
Ok(ErasedJson::pretty(rpc_server.snapshot().await))
}
#[derive(Deserialize)]
struct GetProjectActivityParams {
#[serde(with = "time::serde::iso8601")]
start: OffsetDateTime,
#[serde(with = "time::serde::iso8601")]
end: OffsetDateTime,
}
async fn get_project_activity_summary(
Query(params): Query<GetProjectActivityParams>,
Extension(app): Extension<Arc<AppState>>,
) -> Result<ErasedJson> {
let summary = app
.db
.summarize_project_activity(params.start..params.end, 100)
.await?;
Ok(ErasedJson::pretty(summary))
}
#[derive(Deserialize)]
struct GetProjectMetadataParams {
project_id: u64,
}
async fn get_project_metadata(
Query(params): Query<GetProjectMetadataParams>,
Extension(app): Extension<Arc<AppState>>,
) -> Result<ErasedJson> {
let extensions = app
.db
.get_project_extensions(ProjectId::from_proto(params.project_id))
.await?;
Ok(ErasedJson::pretty(json!({ "extensions": extensions })))
}
#[derive(Deserialize)]
struct CreateAccessTokenQueryParams {
public_key: String,

View File

@ -1,7 +1,10 @@
use std::{ops::Range, time::Duration};
use crate::{Error, Result};
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use axum::http::StatusCode;
use collections::HashMap;
use futures::StreamExt;
use nanoid::nanoid;
use serde::Serialize;
@ -37,6 +40,42 @@ pub trait Db: Send + Sync {
email_address: Option<&str>,
) -> Result<UserId>;
/// Registers a new project for the given user.
async fn register_project(&self, host_user_id: UserId) -> Result<ProjectId>;
/// Unregisters a project for the given project id.
async fn unregister_project(&self, project_id: ProjectId) -> Result<()>;
/// Update file counts by extension for the given project and worktree.
async fn update_worktree_extensions(
&self,
project_id: ProjectId,
worktree_id: u64,
extensions: HashMap<String, usize>,
) -> Result<()>;
/// Get the file counts on the given project keyed by their worktree and extension.
async fn get_project_extensions(
&self,
project_id: ProjectId,
) -> Result<HashMap<u64, HashMap<String, usize>>>;
/// Record which users have been active in which projects during
/// a given period of time.
async fn record_project_activity(
&self,
time_period: Range<OffsetDateTime>,
active_projects: &[(UserId, ProjectId)],
) -> Result<()>;
/// Get the users that have been most active during the given time period,
/// along with the amount of time they have been active in each project.
async fn summarize_project_activity(
&self,
time_period: Range<OffsetDateTime>,
max_user_count: usize,
) -> Result<Vec<UserActivitySummary>>;
async fn get_contacts(&self, id: UserId) -> Result<Vec<Contact>>;
async fn has_contact(&self, user_id_a: UserId, user_id_b: UserId) -> Result<bool>;
async fn send_contact_request(&self, requester_id: UserId, responder_id: UserId) -> Result<()>;
@ -145,12 +184,12 @@ impl Db for PostgresDb {
async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
let query = "SELECT * FROM users ORDER BY github_login ASC LIMIT $1 OFFSET $2";
Ok(sqlx::query_as(query)
.bind(limit)
.bind(page * limit)
.bind(limit as i32)
.bind((page * limit) as i32)
.fetch_all(&self.pool)
.await?)
}
async fn create_users(&self, users: Vec<(String, String, usize)>) -> Result<Vec<UserId>> {
let mut query = QueryBuilder::new(
"INSERT INTO users (github_login, email_address, admin, invite_code, invite_count)",
@ -163,7 +202,7 @@ impl Db for PostgresDb {
.push_bind(email_address)
.push_bind(false)
.push_bind(nanoid!(16))
.push_bind(invite_count as u32);
.push_bind(invite_count as i32);
},
);
query.push(
@ -198,7 +237,7 @@ impl Db for PostgresDb {
Ok(sqlx::query_as(query)
.bind(like_string)
.bind(name_query)
.bind(limit)
.bind(limit as i32)
.fetch_all(&self.pool)
.await?)
}
@ -289,7 +328,7 @@ impl Db for PostgresDb {
WHERE id = $2
",
)
.bind(count)
.bind(count as i32)
.bind(id)
.execute(&mut tx)
.await?;
@ -411,6 +450,178 @@ impl Db for PostgresDb {
Ok(invitee_id)
}
// projects
async fn register_project(&self, host_user_id: UserId) -> Result<ProjectId> {
Ok(sqlx::query_scalar(
"
INSERT INTO projects(host_user_id)
VALUES ($1)
RETURNING id
",
)
.bind(host_user_id)
.fetch_one(&self.pool)
.await
.map(ProjectId)?)
}
async fn unregister_project(&self, project_id: ProjectId) -> Result<()> {
sqlx::query(
"
UPDATE projects
SET unregistered = 't'
WHERE id = $1
",
)
.bind(project_id)
.execute(&self.pool)
.await?;
Ok(())
}
async fn update_worktree_extensions(
&self,
project_id: ProjectId,
worktree_id: u64,
extensions: HashMap<String, usize>,
) -> Result<()> {
let mut query = QueryBuilder::new(
"INSERT INTO worktree_extensions (project_id, worktree_id, extension, count)",
);
query.push_values(extensions, |mut query, (extension, count)| {
query
.push_bind(project_id)
.push_bind(worktree_id as i32)
.push_bind(extension)
.push_bind(count as i32);
});
query.push(
"
ON CONFLICT (project_id, worktree_id, extension) DO UPDATE SET
count = excluded.count
",
);
query.build().execute(&self.pool).await?;
Ok(())
}
async fn get_project_extensions(
&self,
project_id: ProjectId,
) -> Result<HashMap<u64, HashMap<String, usize>>> {
#[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)]
struct WorktreeExtension {
worktree_id: i32,
extension: String,
count: i32,
}
let query = "
SELECT worktree_id, extension, count
FROM worktree_extensions
WHERE project_id = $1
";
let counts = sqlx::query_as::<_, WorktreeExtension>(query)
.bind(&project_id)
.fetch_all(&self.pool)
.await?;
let mut extension_counts = HashMap::default();
for count in counts {
extension_counts
.entry(count.worktree_id as u64)
.or_insert(HashMap::default())
.insert(count.extension, count.count as usize);
}
Ok(extension_counts)
}
async fn record_project_activity(
&self,
time_period: Range<OffsetDateTime>,
projects: &[(UserId, ProjectId)],
) -> Result<()> {
let query = "
INSERT INTO project_activity_periods
(ended_at, duration_millis, user_id, project_id)
VALUES
($1, $2, $3, $4);
";
let mut tx = self.pool.begin().await?;
let duration_millis =
((time_period.end - time_period.start).as_seconds_f64() * 1000.0) as i32;
for (user_id, project_id) in projects {
sqlx::query(query)
.bind(time_period.end)
.bind(duration_millis)
.bind(user_id)
.bind(project_id)
.execute(&mut tx)
.await?;
}
tx.commit().await?;
Ok(())
}
async fn summarize_project_activity(
&self,
time_period: Range<OffsetDateTime>,
max_user_count: usize,
) -> Result<Vec<UserActivitySummary>> {
let query = "
WITH
project_durations AS (
SELECT user_id, project_id, SUM(duration_millis) AS project_duration
FROM project_activity_periods
WHERE $1 <= ended_at AND ended_at <= $2
GROUP BY user_id, project_id
),
user_durations AS (
SELECT user_id, SUM(project_duration) as total_duration
FROM project_durations
GROUP BY user_id
ORDER BY total_duration DESC
LIMIT $3
)
SELECT user_durations.user_id, users.github_login, project_id, project_duration
FROM user_durations, project_durations, users
WHERE
user_durations.user_id = project_durations.user_id AND
user_durations.user_id = users.id
ORDER BY user_id ASC, project_duration DESC
";
let mut rows = sqlx::query_as::<_, (UserId, String, ProjectId, i64)>(query)
.bind(time_period.start)
.bind(time_period.end)
.bind(max_user_count as i32)
.fetch(&self.pool);
let mut result = Vec::<UserActivitySummary>::new();
while let Some(row) = rows.next().await {
let (user_id, github_login, project_id, duration_millis) = row?;
let project_id = project_id;
let duration = Duration::from_millis(duration_millis as u64);
if let Some(last_summary) = result.last_mut() {
if last_summary.id == user_id {
last_summary.project_activity.push((project_id, duration));
continue;
}
}
result.push(UserActivitySummary {
id: user_id,
project_activity: vec![(project_id, duration)],
github_login,
});
}
Ok(result)
}
// contacts
async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
@ -651,7 +862,7 @@ impl Db for PostgresDb {
sqlx::query(cleanup_query)
.bind(user_id.0)
.bind(access_token_hash)
.bind(max_access_token_count as u32)
.bind(max_access_token_count as i32)
.execute(&mut tx)
.await?;
Ok(tx.commit().await?)
@ -927,6 +1138,21 @@ pub struct User {
pub connected_once: bool,
}
id_type!(ProjectId);
#[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)]
pub struct Project {
pub id: ProjectId,
pub host_user_id: UserId,
pub unregistered: bool,
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct UserActivitySummary {
pub id: UserId,
pub github_login: String,
pub project_activity: Vec<(ProjectId, Duration)>,
}
id_type!(OrgId);
#[derive(FromRow)]
pub struct Org {
@ -1125,6 +1351,94 @@ pub mod tests {
assert_ne!(invite_code_4, invite_code_3);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_project_activity() {
let test_db = TestDb::postgres().await;
let db = test_db.db();
let user_1 = db.create_user("user_1", None, false).await.unwrap();
let user_2 = db.create_user("user_2", None, false).await.unwrap();
let user_3 = db.create_user("user_3", None, false).await.unwrap();
let project_1 = db.register_project(user_1).await.unwrap();
let project_2 = db.register_project(user_2).await.unwrap();
let t0 = OffsetDateTime::now_utc() - Duration::from_secs(60 * 60);
// User 2 opens a project
let t1 = t0 + Duration::from_secs(10);
db.record_project_activity(t0..t1, &[(user_2, project_2)])
.await
.unwrap();
let t2 = t1 + Duration::from_secs(10);
db.record_project_activity(t1..t2, &[(user_2, project_2)])
.await
.unwrap();
// User 1 joins the project
let t3 = t2 + Duration::from_secs(10);
db.record_project_activity(t2..t3, &[(user_2, project_2), (user_1, project_2)])
.await
.unwrap();
// User 1 opens another project
let t4 = t3 + Duration::from_secs(10);
db.record_project_activity(
t3..t4,
&[
(user_2, project_2),
(user_1, project_2),
(user_1, project_1),
],
)
.await
.unwrap();
// User 3 joins that project
let t5 = t4 + Duration::from_secs(10);
db.record_project_activity(
t4..t5,
&[
(user_2, project_2),
(user_1, project_2),
(user_1, project_1),
(user_3, project_1),
],
)
.await
.unwrap();
// User 2 leaves
let t6 = t5 + Duration::from_secs(5);
db.record_project_activity(t5..t6, &[(user_1, project_1), (user_3, project_1)])
.await
.unwrap();
let summary = db.summarize_project_activity(t0..t6, 10).await.unwrap();
assert_eq!(
summary,
&[
UserActivitySummary {
id: user_1,
github_login: "user_1".to_string(),
project_activity: vec![
(project_2, Duration::from_secs(30)),
(project_1, Duration::from_secs(25))
]
},
UserActivitySummary {
id: user_2,
github_login: "user_2".to_string(),
project_activity: vec![(project_2, Duration::from_secs(50))]
},
UserActivitySummary {
id: user_3,
github_login: "user_3".to_string(),
project_activity: vec![(project_1, Duration::from_secs(15))]
},
]
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_recent_channel_messages() {
for test_db in [
@ -1696,6 +2010,8 @@ pub mod tests {
pub struct FakeDb {
background: Arc<Background>,
pub users: Mutex<BTreeMap<UserId, User>>,
pub projects: Mutex<BTreeMap<ProjectId, Project>>,
pub worktree_extensions: Mutex<BTreeMap<(ProjectId, u64, String), usize>>,
pub orgs: Mutex<BTreeMap<OrgId, Org>>,
pub org_memberships: Mutex<BTreeMap<(OrgId, UserId), bool>>,
pub channels: Mutex<BTreeMap<ChannelId, Channel>>,
@ -1706,6 +2022,7 @@ pub mod tests {
next_user_id: Mutex<i32>,
next_org_id: Mutex<i32>,
next_channel_id: Mutex<i32>,
next_project_id: Mutex<i32>,
}
#[derive(Debug)]
@ -1722,6 +2039,9 @@ pub mod tests {
background,
users: Default::default(),
next_user_id: Mutex::new(1),
projects: Default::default(),
worktree_extensions: Default::default(),
next_project_id: Mutex::new(1),
orgs: Default::default(),
next_org_id: Mutex::new(1),
org_memberships: Default::default(),
@ -1841,6 +2161,78 @@ pub mod tests {
unimplemented!()
}
// projects
async fn register_project(&self, host_user_id: UserId) -> Result<ProjectId> {
self.background.simulate_random_delay().await;
if !self.users.lock().contains_key(&host_user_id) {
Err(anyhow!("no such user"))?;
}
let project_id = ProjectId(post_inc(&mut *self.next_project_id.lock()));
self.projects.lock().insert(
project_id,
Project {
id: project_id,
host_user_id,
unregistered: false,
},
);
Ok(project_id)
}
async fn unregister_project(&self, project_id: ProjectId) -> Result<()> {
self.projects
.lock()
.get_mut(&project_id)
.ok_or_else(|| anyhow!("no such project"))?
.unregistered = true;
Ok(())
}
async fn update_worktree_extensions(
&self,
project_id: ProjectId,
worktree_id: u64,
extensions: HashMap<String, usize>,
) -> Result<()> {
self.background.simulate_random_delay().await;
if !self.projects.lock().contains_key(&project_id) {
Err(anyhow!("no such project"))?;
}
for (extension, count) in extensions {
self.worktree_extensions
.lock()
.insert((project_id, worktree_id, extension), count);
}
Ok(())
}
async fn get_project_extensions(
&self,
_project_id: ProjectId,
) -> Result<HashMap<u64, HashMap<String, usize>>> {
unimplemented!()
}
async fn record_project_activity(
&self,
_period: Range<OffsetDateTime>,
_active_projects: &[(UserId, ProjectId)],
) -> Result<()> {
unimplemented!()
}
async fn summarize_project_activity(
&self,
_period: Range<OffsetDateTime>,
_limit: usize,
) -> Result<Vec<UserActivitySummary>> {
unimplemented!()
}
// contacts
async fn get_contacts(&self, id: UserId) -> Result<Vec<Contact>> {

View File

@ -1,5 +1,5 @@
use crate::{
db::{tests::TestDb, UserId},
db::{tests::TestDb, ProjectId, UserId},
rpc::{Executor, Server, Store},
AppState,
};
@ -1447,7 +1447,7 @@ async fn test_collaborating_with_diagnostics(
deterministic.run_until_parked();
{
let store = server.store.read().await;
let project = store.project(project_id).unwrap();
let project = store.project(ProjectId::from_proto(project_id)).unwrap();
let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
assert!(!worktree.diagnostic_summaries.is_empty());
}
@ -4722,7 +4722,7 @@ impl TestServer {
foreground: Rc<executor::Foreground>,
background: Arc<executor::Background>,
) -> Self {
let test_db = TestDb::fake(background);
let test_db = TestDb::fake(background.clone());
let app_state = Self::build_app_state(&test_db).await;
let peer = Peer::new();
let notifications = mpsc::unbounded();

View File

@ -14,6 +14,7 @@ use serde::Deserialize;
use std::{
net::{SocketAddr, TcpListener},
sync::Arc,
time::Duration,
};
use tracing_log::LogTracer;
use tracing_subscriber::{filter::EnvFilter, fmt::format::JsonFields, Layer};
@ -66,6 +67,8 @@ async fn main() -> Result<()> {
.expect("failed to bind TCP listener");
let rpc_server = rpc::Server::new(state.clone(), None);
rpc_server.start_recording_project_activity(Duration::from_secs(5 * 60), rpc::RealExecutor);
let app = Router::<Body>::new()
.merge(api::routes(&rpc_server, state.clone()))
.merge(rpc::routes(rpc_server));

View File

@ -2,7 +2,7 @@ mod store;
use crate::{
auth,
db::{self, ChannelId, MessageId, User, UserId},
db::{self, ChannelId, MessageId, ProjectId, User, UserId},
AppState, Result,
};
use anyhow::anyhow;
@ -288,6 +288,58 @@ impl Server {
})
}
/// Start a long lived task that records which users are active in which projects.
pub fn start_recording_project_activity<E: 'static + Executor>(
self: &Arc<Self>,
interval: Duration,
executor: E,
) {
executor.spawn_detached({
let this = Arc::downgrade(self);
let executor = executor.clone();
async move {
let mut period_start = OffsetDateTime::now_utc();
let mut active_projects = Vec::<(UserId, ProjectId)>::new();
loop {
let sleep = executor.sleep(interval);
sleep.await;
let this = if let Some(this) = this.upgrade() {
this
} else {
break;
};
active_projects.clear();
active_projects.extend(this.store().await.projects().flat_map(
|(project_id, project)| {
project.guests.values().chain([&project.host]).filter_map(
|collaborator| {
if !collaborator.admin
&& collaborator
.last_activity
.map_or(false, |activity| activity > period_start)
{
Some((collaborator.user_id, *project_id))
} else {
None
}
},
)
},
));
let period_end = OffsetDateTime::now_utc();
this.app_state
.db
.record_project_activity(period_start..period_end, &active_projects)
.await
.trace_err();
period_start = period_end;
}
}
});
}
pub fn handle_connection<E: Executor>(
self: &Arc<Self>,
connection: Connection,
@ -401,14 +453,21 @@ impl Server {
async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
self.peer.disconnect(connection_id);
let removed_user_id = {
let mut projects_to_unregister = Vec::new();
let removed_user_id;
{
let mut store = self.store_mut().await;
let removed_connection = store.remove_connection(connection_id)?;
for (project_id, project) in removed_connection.hosted_projects {
projects_to_unregister.push(project_id);
broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
self.peer
.send(conn_id, proto::UnregisterProject { project_id })
self.peer.send(
conn_id,
proto::UnregisterProject {
project_id: project_id.to_proto(),
},
)
});
for (_, receipts) in project.join_requests {
@ -433,7 +492,7 @@ impl Server {
self.peer.send(
conn_id,
proto::RemoveProjectCollaborator {
project_id,
project_id: project_id.to_proto(),
peer_id: connection_id.0,
},
)
@ -442,17 +501,27 @@ impl Server {
self.peer
.send(
project.host_connection_id,
proto::ProjectUnshared { project_id },
proto::ProjectUnshared {
project_id: project_id.to_proto(),
},
)
.trace_err();
}
}
}
removed_connection.user_id
removed_user_id = removed_connection.user_id;
};
self.update_user_contacts(removed_user_id).await?;
self.update_user_contacts(removed_user_id).await.trace_err();
for project_id in projects_to_unregister {
self.app_state
.db
.unregister_project(project_id)
.await
.trace_err();
}
Ok(())
}
@ -516,14 +585,18 @@ impl Server {
request: TypedEnvelope<proto::RegisterProject>,
response: Response<proto::RegisterProject>,
) -> Result<()> {
let project_id;
{
let mut state = self.store_mut().await;
let user_id = state.user_id_for_connection(request.sender_id)?;
project_id = state.register_project(request.sender_id, user_id);
};
let user_id = self
.store()
.await
.user_id_for_connection(request.sender_id)?;
let project_id = self.app_state.db.register_project(user_id).await?;
self.store_mut()
.await
.register_project(request.sender_id, project_id)?;
response.send(proto::RegisterProjectResponse { project_id })?;
response.send(proto::RegisterProjectResponse {
project_id: project_id.to_proto(),
})?;
Ok(())
}
@ -533,12 +606,13 @@ impl Server {
request: TypedEnvelope<proto::UnregisterProject>,
response: Response<proto::UnregisterProject>,
) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let (user_id, project) = {
let mut state = self.store_mut().await;
let project =
state.unregister_project(request.payload.project_id, request.sender_id)?;
let project = state.unregister_project(project_id, request.sender_id)?;
(state.user_id_for_connection(request.sender_id)?, project)
};
self.app_state.db.unregister_project(project_id).await?;
broadcast(
request.sender_id,
@ -547,7 +621,7 @@ impl Server {
self.peer.send(
conn_id,
proto::UnregisterProject {
project_id: request.payload.project_id,
project_id: project_id.to_proto(),
},
)
},
@ -613,7 +687,7 @@ impl Server {
request: TypedEnvelope<proto::JoinProject>,
response: Response<proto::JoinProject>,
) -> Result<()> {
let project_id = request.payload.project_id;
let project_id = ProjectId::from_proto(request.payload.project_id);
let host_user_id;
let guest_user_id;
@ -621,12 +695,12 @@ impl Server {
{
let state = self.store().await;
let project = state.project(project_id)?;
host_user_id = project.host_user_id;
host_user_id = project.host.user_id;
host_connection_id = project.host_connection_id;
guest_user_id = state.user_id_for_connection(request.sender_id)?;
};
tracing::info!(project_id, %host_user_id, %host_connection_id, "join project");
tracing::info!(%project_id, %host_user_id, %host_connection_id, "join project");
let has_contact = self
.app_state
.db
@ -644,7 +718,7 @@ impl Server {
self.peer.send(
host_connection_id,
proto::RequestJoinProject {
project_id,
project_id: project_id.to_proto(),
requester_id: guest_user_id.to_proto(),
},
)?;
@ -659,13 +733,13 @@ impl Server {
{
let mut state = self.store_mut().await;
let project_id = request.payload.project_id;
let project_id = ProjectId::from_proto(request.payload.project_id);
let project = state.project(project_id)?;
if project.host_connection_id != request.sender_id {
Err(anyhow!("no such connection"))?;
}
host_user_id = project.host_user_id;
host_user_id = project.host.user_id;
let guest_user_id = UserId::from_proto(request.payload.requester_id);
if !request.payload.allow {
@ -697,7 +771,7 @@ impl Server {
collaborators.push(proto::Collaborator {
peer_id: project.host_connection_id.0,
replica_id: 0,
user_id: project.host_user_id.to_proto(),
user_id: project.host.user_id.to_proto(),
});
let worktrees = project
.worktrees
@ -720,15 +794,15 @@ impl Server {
.collect::<Vec<_>>();
// Add all guests other than the requesting user's own connections as collaborators
for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
for (guest_conn_id, guest) in &project.guests {
if receipts_with_replica_ids
.iter()
.all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
.all(|(receipt, _)| receipt.sender_id != *guest_conn_id)
{
collaborators.push(proto::Collaborator {
peer_id: peer_conn_id.0,
replica_id: *peer_replica_id as u32,
user_id: peer_user_id.to_proto(),
peer_id: guest_conn_id.0,
replica_id: guest.replica_id as u32,
user_id: guest.user_id.to_proto(),
});
}
}
@ -739,7 +813,7 @@ impl Server {
self.peer.send(
conn_id,
proto::AddProjectCollaborator {
project_id,
project_id: project_id.to_proto(),
collaborator: Some(proto::Collaborator {
peer_id: receipt.sender_id.0,
replica_id: *replica_id as u32,
@ -777,13 +851,13 @@ impl Server {
request: TypedEnvelope<proto::LeaveProject>,
) -> Result<()> {
let sender_id = request.sender_id;
let project_id = request.payload.project_id;
let project_id = ProjectId::from_proto(request.payload.project_id);
let project;
{
let mut store = self.store_mut().await;
project = store.leave_project(sender_id, project_id)?;
tracing::info!(
project_id,
%project_id,
host_user_id = %project.host_user_id,
host_connection_id = %project.host_connection_id,
"leave project"
@ -794,7 +868,7 @@ impl Server {
self.peer.send(
conn_id,
proto::RemoveProjectCollaborator {
project_id,
project_id: project_id.to_proto(),
peer_id: sender_id.0,
},
)
@ -805,7 +879,7 @@ impl Server {
self.peer.send(
project.host_connection_id,
proto::JoinProjectRequestCancelled {
project_id,
project_id: project_id.to_proto(),
requester_id: requester_id.to_proto(),
},
)?;
@ -814,7 +888,9 @@ impl Server {
if project.unshare {
self.peer.send(
project.host_connection_id,
proto::ProjectUnshared { project_id },
proto::ProjectUnshared {
project_id: project_id.to_proto(),
},
)?;
}
}
@ -826,18 +902,15 @@ impl Server {
self: Arc<Server>,
request: TypedEnvelope<proto::UpdateProject>,
) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let user_id;
{
let mut state = self.store_mut().await;
user_id = state.user_id_for_connection(request.sender_id)?;
let guest_connection_ids = state
.read_project(request.payload.project_id, request.sender_id)?
.read_project(project_id, request.sender_id)?
.guest_connection_ids();
state.update_project(
request.payload.project_id,
&request.payload.worktrees,
request.sender_id,
)?;
state.update_project(project_id, &request.payload.worktrees, request.sender_id)?;
broadcast(request.sender_id, guest_connection_ids, |connection_id| {
self.peer
.forward_send(request.sender_id, connection_id, request.payload.clone())
@ -851,9 +924,10 @@ impl Server {
self: Arc<Server>,
request: TypedEnvelope<proto::RegisterProjectActivity>,
) -> Result<()> {
self.store_mut()
.await
.register_project_activity(request.payload.project_id, request.sender_id)?;
self.store_mut().await.register_project_activity(
ProjectId::from_proto(request.payload.project_id),
request.sender_id,
)?;
Ok(())
}
@ -862,28 +936,25 @@ impl Server {
request: TypedEnvelope<proto::UpdateWorktree>,
response: Response<proto::UpdateWorktree>,
) -> Result<()> {
let (connection_ids, metadata_changed) = {
let project_id = ProjectId::from_proto(request.payload.project_id);
let worktree_id = request.payload.worktree_id;
let (connection_ids, metadata_changed, extension_counts) = {
let mut store = self.store_mut().await;
let (connection_ids, metadata_changed, extension_counts) = store.update_worktree(
request.sender_id,
request.payload.project_id,
request.payload.worktree_id,
project_id,
worktree_id,
&request.payload.root_name,
&request.payload.removed_entries,
&request.payload.updated_entries,
request.payload.scan_id,
)?;
for (extension, count) in extension_counts {
tracing::info!(
project_id = request.payload.project_id,
worktree_id = request.payload.worktree_id,
?extension,
%count,
"worktree updated"
);
}
(connection_ids, metadata_changed)
(connection_ids, metadata_changed, extension_counts.clone())
};
self.app_state
.db
.update_worktree_extensions(project_id, worktree_id, extension_counts)
.await?;
broadcast(request.sender_id, connection_ids, |connection_id| {
self.peer
@ -910,7 +981,7 @@ impl Server {
.clone()
.ok_or_else(|| anyhow!("invalid summary"))?;
let receiver_ids = self.store_mut().await.update_diagnostic_summary(
request.payload.project_id,
ProjectId::from_proto(request.payload.project_id),
request.payload.worktree_id,
request.sender_id,
summary,
@ -928,7 +999,7 @@ impl Server {
request: TypedEnvelope<proto::StartLanguageServer>,
) -> Result<()> {
let receiver_ids = self.store_mut().await.start_language_server(
request.payload.project_id,
ProjectId::from_proto(request.payload.project_id),
request.sender_id,
request
.payload
@ -947,10 +1018,10 @@ impl Server {
self: Arc<Server>,
request: TypedEnvelope<proto::UpdateLanguageServer>,
) -> Result<()> {
let receiver_ids = self
.store()
.await
.project_connection_ids(request.payload.project_id, request.sender_id)?;
let receiver_ids = self.store().await.project_connection_ids(
ProjectId::from_proto(request.payload.project_id),
request.sender_id,
)?;
broadcast(request.sender_id, receiver_ids, |connection_id| {
self.peer
.forward_send(request.sender_id, connection_id, request.payload.clone())
@ -969,7 +1040,10 @@ impl Server {
let host_connection_id = self
.store()
.await
.read_project(request.payload.remote_entity_id(), request.sender_id)?
.read_project(
ProjectId::from_proto(request.payload.remote_entity_id()),
request.sender_id,
)?
.host_connection_id;
response.send(
@ -985,10 +1059,11 @@ impl Server {
request: TypedEnvelope<proto::SaveBuffer>,
response: Response<proto::SaveBuffer>,
) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let host = self
.store()
.await
.read_project(request.payload.project_id, request.sender_id)?
.read_project(project_id, request.sender_id)?
.host_connection_id;
let response_payload = self
.peer
@ -998,7 +1073,7 @@ impl Server {
let mut guests = self
.store()
.await
.read_project(request.payload.project_id, request.sender_id)?
.read_project(project_id, request.sender_id)?
.connection_ids();
guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
broadcast(host, guests, |conn_id| {
@ -1014,10 +1089,11 @@ impl Server {
request: TypedEnvelope<proto::UpdateBuffer>,
response: Response<proto::UpdateBuffer>,
) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let receiver_ids = {
let mut store = self.store_mut().await;
store.register_project_activity(request.payload.project_id, request.sender_id)?;
store.project_connection_ids(request.payload.project_id, request.sender_id)?
store.register_project_activity(project_id, request.sender_id)?;
store.project_connection_ids(project_id, request.sender_id)?
};
broadcast(request.sender_id, receiver_ids, |connection_id| {
@ -1032,10 +1108,10 @@ impl Server {
self: Arc<Server>,
request: TypedEnvelope<proto::UpdateBufferFile>,
) -> Result<()> {
let receiver_ids = self
.store()
.await
.project_connection_ids(request.payload.project_id, request.sender_id)?;
let receiver_ids = self.store().await.project_connection_ids(
ProjectId::from_proto(request.payload.project_id),
request.sender_id,
)?;
broadcast(request.sender_id, receiver_ids, |connection_id| {
self.peer
.forward_send(request.sender_id, connection_id, request.payload.clone())
@ -1047,10 +1123,10 @@ impl Server {
self: Arc<Server>,
request: TypedEnvelope<proto::BufferReloaded>,
) -> Result<()> {
let receiver_ids = self
.store()
.await
.project_connection_ids(request.payload.project_id, request.sender_id)?;
let receiver_ids = self.store().await.project_connection_ids(
ProjectId::from_proto(request.payload.project_id),
request.sender_id,
)?;
broadcast(request.sender_id, receiver_ids, |connection_id| {
self.peer
.forward_send(request.sender_id, connection_id, request.payload.clone())
@ -1062,10 +1138,10 @@ impl Server {
self: Arc<Server>,
request: TypedEnvelope<proto::BufferSaved>,
) -> Result<()> {
let receiver_ids = self
.store()
.await
.project_connection_ids(request.payload.project_id, request.sender_id)?;
let receiver_ids = self.store().await.project_connection_ids(
ProjectId::from_proto(request.payload.project_id),
request.sender_id,
)?;
broadcast(request.sender_id, receiver_ids, |connection_id| {
self.peer
.forward_send(request.sender_id, connection_id, request.payload.clone())
@ -1078,18 +1154,19 @@ impl Server {
request: TypedEnvelope<proto::Follow>,
response: Response<proto::Follow>,
) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let leader_id = ConnectionId(request.payload.leader_id);
let follower_id = request.sender_id;
{
let mut store = self.store_mut().await;
if !store
.project_connection_ids(request.payload.project_id, follower_id)?
.project_connection_ids(project_id, follower_id)?
.contains(&leader_id)
{
Err(anyhow!("no such peer"))?;
}
store.register_project_activity(request.payload.project_id, follower_id)?;
store.register_project_activity(project_id, follower_id)?;
}
let mut response_payload = self
@ -1104,15 +1181,16 @@ impl Server {
}
async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let leader_id = ConnectionId(request.payload.leader_id);
let mut store = self.store_mut().await;
if !store
.project_connection_ids(request.payload.project_id, request.sender_id)?
.project_connection_ids(project_id, request.sender_id)?
.contains(&leader_id)
{
Err(anyhow!("no such peer"))?;
}
store.register_project_activity(request.payload.project_id, request.sender_id)?;
store.register_project_activity(project_id, request.sender_id)?;
self.peer
.forward_send(request.sender_id, leader_id, request.payload)?;
Ok(())
@ -1122,10 +1200,10 @@ impl Server {
self: Arc<Self>,
request: TypedEnvelope<proto::UpdateFollowers>,
) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let mut store = self.store_mut().await;
store.register_project_activity(request.payload.project_id, request.sender_id)?;
let connection_ids =
store.project_connection_ids(request.payload.project_id, request.sender_id)?;
store.register_project_activity(project_id, request.sender_id)?;
let connection_ids = store.project_connection_ids(project_id, request.sender_id)?;
let leader_id = request
.payload
.variant

View File

@ -1,49 +1,58 @@
use crate::db::{self, ChannelId, UserId};
use crate::db::{self, ChannelId, ProjectId, UserId};
use anyhow::{anyhow, Result};
use collections::{hash_map::Entry, BTreeMap, HashMap, HashSet};
use collections::{
btree_map,
hash_map::{self, Entry},
BTreeMap, BTreeSet, HashMap, HashSet,
};
use rpc::{proto, ConnectionId, Receipt};
use serde::Serialize;
use std::{
collections::hash_map,
ffi::{OsStr, OsString},
mem,
path::{Path, PathBuf},
str,
time::{Duration, Instant},
time::Duration,
};
use time::OffsetDateTime;
use tracing::instrument;
#[derive(Default, Serialize)]
pub struct Store {
connections: HashMap<ConnectionId, ConnectionState>,
connections_by_user_id: HashMap<UserId, HashSet<ConnectionId>>,
projects: HashMap<u64, Project>,
projects: BTreeMap<ProjectId, Project>,
#[serde(skip)]
channels: HashMap<ChannelId, Channel>,
next_project_id: u64,
}
#[derive(Serialize)]
struct ConnectionState {
user_id: UserId,
admin: bool,
projects: HashSet<u64>,
requested_projects: HashSet<u64>,
projects: BTreeSet<ProjectId>,
requested_projects: HashSet<ProjectId>,
channels: HashSet<ChannelId>,
}
#[derive(Serialize)]
pub struct Project {
pub host_connection_id: ConnectionId,
pub host_user_id: UserId,
pub guests: HashMap<ConnectionId, (ReplicaId, UserId)>,
pub host: Collaborator,
pub guests: HashMap<ConnectionId, Collaborator>,
#[serde(skip)]
pub join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
pub active_replica_ids: HashSet<ReplicaId>,
pub worktrees: BTreeMap<u64, Worktree>,
pub language_servers: Vec<proto::LanguageServer>,
}
#[derive(Serialize)]
pub struct Collaborator {
pub replica_id: ReplicaId,
pub user_id: UserId,
#[serde(skip)]
last_activity: Option<Instant>,
pub last_activity: Option<OffsetDateTime>,
pub admin: bool,
}
#[derive(Default, Serialize)]
@ -53,7 +62,7 @@ pub struct Worktree {
#[serde(skip)]
pub entries: HashMap<u64, proto::Entry>,
#[serde(skip)]
pub extension_counts: HashMap<OsString, usize>,
pub extension_counts: HashMap<String, usize>,
#[serde(skip)]
pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
pub scan_id: u64,
@ -69,8 +78,8 @@ pub type ReplicaId = u16;
#[derive(Default)]
pub struct RemovedConnectionState {
pub user_id: UserId,
pub hosted_projects: HashMap<u64, Project>,
pub guest_project_ids: HashSet<u64>,
pub hosted_projects: HashMap<ProjectId, Project>,
pub guest_project_ids: HashSet<ProjectId>,
pub contact_ids: HashSet<UserId>,
}
@ -93,6 +102,9 @@ pub struct Metrics {
impl Store {
pub fn metrics(&self) -> Metrics {
const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
let connections = self.connections.values().filter(|c| !c.admin).count();
let mut registered_projects = 0;
let mut active_projects = 0;
@ -101,7 +113,7 @@ impl Store {
if let Some(connection) = self.connections.get(&project.host_connection_id) {
if !connection.admin {
registered_projects += 1;
if project.is_active() {
if project.is_active_since(active_window_start) {
active_projects += 1;
if !project.guests.is_empty() {
shared_projects += 1;
@ -289,9 +301,9 @@ impl Store {
let mut metadata = Vec::new();
for project_id in project_ids {
if let Some(project) = self.projects.get(&project_id) {
if project.host_user_id == user_id {
if project.host.user_id == user_id {
metadata.push(proto::ProjectMetadata {
id: project_id,
id: project_id.to_proto(),
visible_worktree_root_names: project
.worktrees
.values()
@ -301,7 +313,7 @@ impl Store {
guests: project
.guests
.values()
.map(|(_, user_id)| user_id.to_proto())
.map(|guest| guest.user_id.to_proto())
.collect(),
});
}
@ -314,32 +326,36 @@ impl Store {
pub fn register_project(
&mut self,
host_connection_id: ConnectionId,
host_user_id: UserId,
) -> u64 {
let project_id = self.next_project_id;
project_id: ProjectId,
) -> Result<()> {
let connection = self
.connections
.get_mut(&host_connection_id)
.ok_or_else(|| anyhow!("no such connection"))?;
connection.projects.insert(project_id);
self.projects.insert(
project_id,
Project {
host_connection_id,
host_user_id,
host: Collaborator {
user_id: connection.user_id,
replica_id: 0,
last_activity: None,
admin: connection.admin,
},
guests: Default::default(),
join_requests: Default::default(),
active_replica_ids: Default::default(),
worktrees: Default::default(),
language_servers: Default::default(),
last_activity: None,
},
);
if let Some(connection) = self.connections.get_mut(&host_connection_id) {
connection.projects.insert(project_id);
}
self.next_project_id += 1;
project_id
Ok(())
}
pub fn update_project(
&mut self,
project_id: u64,
project_id: ProjectId,
worktrees: &[proto::WorktreeMetadata],
connection_id: ConnectionId,
) -> Result<()> {
@ -371,11 +387,11 @@ impl Store {
pub fn unregister_project(
&mut self,
project_id: u64,
project_id: ProjectId,
connection_id: ConnectionId,
) -> Result<Project> {
match self.projects.entry(project_id) {
hash_map::Entry::Occupied(e) => {
btree_map::Entry::Occupied(e) => {
if e.get().host_connection_id == connection_id {
let project = e.remove();
@ -408,13 +424,13 @@ impl Store {
Err(anyhow!("no such project"))?
}
}
hash_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
}
}
pub fn update_diagnostic_summary(
&mut self,
project_id: u64,
project_id: ProjectId,
worktree_id: u64,
connection_id: ConnectionId,
summary: proto::DiagnosticSummary,
@ -439,7 +455,7 @@ impl Store {
pub fn start_language_server(
&mut self,
project_id: u64,
project_id: ProjectId,
connection_id: ConnectionId,
language_server: proto::LanguageServer,
) -> Result<Vec<ConnectionId>> {
@ -458,7 +474,7 @@ impl Store {
pub fn request_join_project(
&mut self,
requester_id: UserId,
project_id: u64,
project_id: ProjectId,
receipt: Receipt<proto::JoinProject>,
) -> Result<()> {
let connection = self
@ -470,7 +486,6 @@ impl Store {
.get_mut(&project_id)
.ok_or_else(|| anyhow!("no such project"))?;
connection.requested_projects.insert(project_id);
project.last_activity = Some(Instant::now());
project
.join_requests
.entry(requester_id)
@ -483,7 +498,7 @@ impl Store {
&mut self,
responder_connection_id: ConnectionId,
requester_id: UserId,
project_id: u64,
project_id: ProjectId,
) -> Option<Vec<Receipt<proto::JoinProject>>> {
let project = self.projects.get_mut(&project_id)?;
if responder_connection_id != project.host_connection_id {
@ -495,7 +510,7 @@ impl Store {
let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
requester_connection.requested_projects.remove(&project_id);
}
project.last_activity = Some(Instant::now());
project.host.last_activity = Some(OffsetDateTime::now_utc());
Some(receipts)
}
@ -504,7 +519,7 @@ impl Store {
&mut self,
responder_connection_id: ConnectionId,
requester_id: UserId,
project_id: u64,
project_id: ProjectId,
) -> Option<(Vec<(Receipt<proto::JoinProject>, ReplicaId)>, &Project)> {
let project = self.projects.get_mut(&project_id)?;
if responder_connection_id != project.host_connection_id {
@ -522,20 +537,26 @@ impl Store {
replica_id += 1;
}
project.active_replica_ids.insert(replica_id);
project
.guests
.insert(receipt.sender_id, (replica_id, requester_id));
project.guests.insert(
receipt.sender_id,
Collaborator {
replica_id,
user_id: requester_id,
last_activity: Some(OffsetDateTime::now_utc()),
admin: requester_connection.admin,
},
);
receipts_with_replica_ids.push((receipt, replica_id));
}
project.last_activity = Some(Instant::now());
project.host.last_activity = Some(OffsetDateTime::now_utc());
Some((receipts_with_replica_ids, project))
}
pub fn leave_project(
&mut self,
connection_id: ConnectionId,
project_id: u64,
project_id: ProjectId,
) -> Result<LeftProject> {
let user_id = self.user_id_for_connection(connection_id)?;
let project = self
@ -544,13 +565,12 @@ impl Store {
.ok_or_else(|| anyhow!("no such project"))?;
// If the connection leaving the project is a collaborator, remove it.
let remove_collaborator =
if let Some((replica_id, _)) = project.guests.remove(&connection_id) {
project.active_replica_ids.remove(&replica_id);
true
} else {
false
};
let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
project.active_replica_ids.remove(&guest.replica_id);
true
} else {
false
};
// If the connection leaving the project has a pending request, remove it.
// If that user has no other pending requests on other connections, indicate that the request should be cancelled.
@ -579,11 +599,9 @@ impl Store {
}
}
project.last_activity = Some(Instant::now());
Ok(LeftProject {
host_connection_id: project.host_connection_id,
host_user_id: project.host_user_id,
host_user_id: project.host.user_id,
connection_ids,
cancel_request,
unshare,
@ -594,13 +612,13 @@ impl Store {
pub fn update_worktree(
&mut self,
connection_id: ConnectionId,
project_id: u64,
project_id: ProjectId,
worktree_id: u64,
worktree_root_name: &str,
removed_entries: &[u64],
updated_entries: &[proto::Entry],
scan_id: u64,
) -> Result<(Vec<ConnectionId>, bool, &HashMap<OsString, usize>)> {
) -> Result<(Vec<ConnectionId>, bool, HashMap<String, usize>)> {
let project = self.write_project(project_id, connection_id)?;
let connection_ids = project.connection_ids();
let mut worktree = project.worktrees.entry(worktree_id).or_default();
@ -642,12 +660,16 @@ impl Store {
}
worktree.scan_id = scan_id;
Ok((connection_ids, metadata_changed, &worktree.extension_counts))
Ok((
connection_ids,
metadata_changed,
worktree.extension_counts.clone(),
))
}
pub fn project_connection_ids(
&self,
project_id: u64,
project_id: ProjectId,
acting_connection_id: ConnectionId,
) -> Result<Vec<ConnectionId>> {
Ok(self
@ -663,7 +685,7 @@ impl Store {
.connection_ids())
}
pub fn project(&self, project_id: u64) -> Result<&Project> {
pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
self.projects
.get(&project_id)
.ok_or_else(|| anyhow!("no such project"))
@ -671,14 +693,33 @@ impl Store {
pub fn register_project_activity(
&mut self,
project_id: u64,
project_id: ProjectId,
connection_id: ConnectionId,
) -> Result<()> {
self.write_project(project_id, connection_id)?.last_activity = Some(Instant::now());
let project = self
.projects
.get_mut(&project_id)
.ok_or_else(|| anyhow!("no such project"))?;
let collaborator = if connection_id == project.host_connection_id {
&mut project.host
} else if let Some(guest) = project.guests.get_mut(&connection_id) {
guest
} else {
return Err(anyhow!("no such project"))?;
};
collaborator.last_activity = Some(OffsetDateTime::now_utc());
Ok(())
}
pub fn read_project(&self, project_id: u64, connection_id: ConnectionId) -> Result<&Project> {
pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
self.projects.iter()
}
pub fn read_project(
&self,
project_id: ProjectId,
connection_id: ConnectionId,
) -> Result<&Project> {
let project = self
.projects
.get(&project_id)
@ -694,7 +735,7 @@ impl Store {
fn write_project(
&mut self,
project_id: u64,
project_id: ProjectId,
connection_id: ConnectionId,
) -> Result<&mut Project> {
let project = self
@ -768,7 +809,7 @@ impl Store {
project
.guests
.values()
.map(|(replica_id, _)| *replica_id)
.map(|guest| guest.replica_id)
.collect::<HashSet<_>>(),
);
}
@ -783,11 +824,15 @@ impl Store {
}
impl Project {
fn is_active(&self) -> bool {
const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
self.last_activity.map_or(false, |last_activity| {
last_activity.elapsed() < ACTIVE_PROJECT_TIMEOUT
})
fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
self.guests
.values()
.chain([&self.host])
.any(|collaborator| {
collaborator
.last_activity
.map_or(false, |active_time| active_time > start_time)
})
}
pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
@ -809,9 +854,10 @@ impl Channel {
}
}
fn extension_for_entry(entry: &proto::Entry) -> Option<&OsStr> {
fn extension_for_entry(entry: &proto::Entry) -> Option<&str> {
str::from_utf8(&entry.path)
.ok()
.map(Path::new)
.and_then(|p| p.extension())
.and_then(|e| e.to_str())
}

View File

@ -40,7 +40,7 @@ serde = { version = "1.0", features = ["derive", "rc"] }
serde_json = "1.0"
smallvec = { version = "1.6", features = ["union"] }
smol = "1.2"
time = { version = "0.3" }
time = { version = "0.3", features = ["serde", "serde-well-known"] }
tiny-skia = "0.5"
tree-sitter = "0.20"
usvg = "0.14"