flowy-net impl http and local server

This commit is contained in:
appflowy 2022-01-13 10:53:30 +08:00
parent 0fba8d9195
commit 718613de42
25 changed files with 667 additions and 848 deletions

5
backend/Cargo.lock generated
View File

@ -1382,14 +1382,19 @@ name = "flowy-net"
version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"backend-service",
"bytes",
"dashmap",
"flowy-collaboration",
"flowy-core",
"flowy-core-data-model",
"flowy-derive",
"flowy-document",
"flowy-error",
"flowy-user",
"flowy-user-data-model",
"futures-util",
"lazy_static",
"lib-dispatch",
"lib-infra",

View File

@ -12,7 +12,7 @@ use flowy_collaboration::{
entities::doc::{CreateDocParams, DocumentId, DocumentInfo},
};
use flowy_core_data_model::entities::prelude::*;
use flowy_net::cloud::{
use flowy_net::http_server::{
core::*,
document::{create_document_request, read_document_request},
user::*,

View File

@ -46,7 +46,6 @@ pin-project = "1.0.0"
[dev-dependencies]
flowy-test = { path = "../flowy-test" }
flowy-document = { path = "../flowy-document", features = ["flowy_unit_test"]}
flowy-net = { path = "../flowy-net" }
color-eyre = { version = "0.5", default-features = false }
criterion = "0.3"
rand = "0.7.3"

View File

@ -13,6 +13,9 @@ flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration"}
backend-service = { path = "../../../shared-lib/backend-service" }
flowy-core-data-model = { path = "../../../shared-lib/flowy-core-data-model" }
flowy-user-data-model = { path = "../../../shared-lib/flowy-user-data-model"}
flowy-core = { path = "../flowy-core" }
flowy-user = { path = "../flowy-user" }
flowy-document = { path = "../flowy-document" }
lazy_static = "1.4.0"
lib-infra = { path = "../../../shared-lib/lib-infra" }
protobuf = {version = "2.18.0"}
@ -25,5 +28,7 @@ strum = "0.21"
strum_macros = "0.21"
tracing = { version = "0.1", features = ["log"] }
dashmap = {version = "4.0"}
async-stream = "0.3.2"
futures-util = "0.3.15"
[features]
http_server = []

View File

@ -5,15 +5,16 @@ use backend_service::{
response::FlowyResponse,
};
use flowy_core_data_model::entities::{
app::{App, AppId, CreateAppParams, RepeatedApp, UpdateAppParams},
app::{App, AppId, CreateAppParams, UpdateAppParams},
trash::{RepeatedTrash, RepeatedTrashId},
view::{CreateViewParams, RepeatedView, RepeatedViewId, UpdateViewParams, View, ViewId},
view::{CreateViewParams, RepeatedViewId, UpdateViewParams, View, ViewId},
workspace::{CreateWorkspaceParams, RepeatedWorkspace, UpdateWorkspaceParams, Workspace, WorkspaceId},
};
use flowy_error::FlowyError;
use flowy_core::module::WorkspaceCloudService;
use lazy_static::lazy_static;
use lib_infra::{future::FutureResult, timestamp, uuid_string};
use lib_infra::future::FutureResult;
use std::sync::Arc;
use tokio::sync::broadcast;
@ -25,10 +26,10 @@ impl CoreHttpCloudService {
pub fn new(config: ClientServerConfiguration) -> CoreHttpCloudService { Self { config } }
}
impl CoreHttpCloudService {
pub fn init(&self) {}
impl WorkspaceCloudService for CoreHttpCloudService {
fn init(&self) {}
pub fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult<Workspace, FlowyError> {
fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult<Workspace, FlowyError> {
let token = token.to_owned();
let url = self.config.workspace_url();
FutureResult::new(async move {
@ -37,7 +38,7 @@ impl CoreHttpCloudService {
})
}
pub fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<RepeatedWorkspace, FlowyError> {
fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<RepeatedWorkspace, FlowyError> {
let token = token.to_owned();
let url = self.config.workspace_url();
FutureResult::new(async move {
@ -46,7 +47,7 @@ impl CoreHttpCloudService {
})
}
pub fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.workspace_url();
FutureResult::new(async move {
@ -55,7 +56,7 @@ impl CoreHttpCloudService {
})
}
pub fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> {
fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.workspace_url();
FutureResult::new(async move {
@ -64,7 +65,7 @@ impl CoreHttpCloudService {
})
}
pub fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult<View, FlowyError> {
fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult<View, FlowyError> {
let token = token.to_owned();
let url = self.config.view_url();
FutureResult::new(async move {
@ -73,7 +74,7 @@ impl CoreHttpCloudService {
})
}
pub fn read_view(&self, token: &str, params: ViewId) -> FutureResult<Option<View>, FlowyError> {
fn read_view(&self, token: &str, params: ViewId) -> FutureResult<Option<View>, FlowyError> {
let token = token.to_owned();
let url = self.config.view_url();
FutureResult::new(async move {
@ -82,7 +83,7 @@ impl CoreHttpCloudService {
})
}
pub fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> {
fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.view_url();
FutureResult::new(async move {
@ -91,7 +92,7 @@ impl CoreHttpCloudService {
})
}
pub fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> {
fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.view_url();
FutureResult::new(async move {
@ -100,7 +101,7 @@ impl CoreHttpCloudService {
})
}
pub fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult<App, FlowyError> {
fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult<App, FlowyError> {
let token = token.to_owned();
let url = self.config.app_url();
FutureResult::new(async move {
@ -109,7 +110,7 @@ impl CoreHttpCloudService {
})
}
pub fn read_app(&self, token: &str, params: AppId) -> FutureResult<Option<App>, FlowyError> {
fn read_app(&self, token: &str, params: AppId) -> FutureResult<Option<App>, FlowyError> {
let token = token.to_owned();
let url = self.config.app_url();
FutureResult::new(async move {
@ -118,7 +119,7 @@ impl CoreHttpCloudService {
})
}
pub fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> {
fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.app_url();
FutureResult::new(async move {
@ -127,7 +128,7 @@ impl CoreHttpCloudService {
})
}
pub fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> {
fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.app_url();
FutureResult::new(async move {
@ -136,7 +137,7 @@ impl CoreHttpCloudService {
})
}
pub fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.trash_url();
FutureResult::new(async move {
@ -145,7 +146,7 @@ impl CoreHttpCloudService {
})
}
pub fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.trash_url();
FutureResult::new(async move {
@ -154,7 +155,7 @@ impl CoreHttpCloudService {
})
}
pub fn read_trash(&self, token: &str) -> FutureResult<RepeatedTrash, FlowyError> {
fn read_trash(&self, token: &str) -> FutureResult<RepeatedTrash, FlowyError> {
let token = token.to_owned();
let url = self.config.trash_url();
FutureResult::new(async move {
@ -164,149 +165,6 @@ impl CoreHttpCloudService {
}
}
pub struct CoreLocalCloudService {}
impl CoreLocalCloudService {
pub fn new(_config: &ClientServerConfiguration) -> Self { CoreLocalCloudService {} }
}
impl CoreLocalCloudService {
pub fn init(&self) {}
pub fn create_workspace(&self, _token: &str, params: CreateWorkspaceParams) -> FutureResult<Workspace, FlowyError> {
let time = timestamp();
let workspace = Workspace {
id: uuid_string(),
name: params.name,
desc: params.desc,
apps: RepeatedApp::default(),
modified_time: time,
create_time: time,
};
FutureResult::new(async { Ok(workspace) })
}
pub fn read_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<RepeatedWorkspace, FlowyError> {
FutureResult::new(async {
let repeated_workspace = RepeatedWorkspace { items: vec![] };
Ok(repeated_workspace)
})
}
pub fn update_workspace(&self, _token: &str, _params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn delete_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn create_view(&self, _token: &str, params: CreateViewParams) -> FutureResult<View, FlowyError> {
let time = timestamp();
let view = View {
id: params.view_id,
belong_to_id: params.belong_to_id,
name: params.name,
desc: params.desc,
view_type: params.view_type,
version: 0,
belongings: RepeatedView::default(),
modified_time: time,
create_time: time,
};
FutureResult::new(async { Ok(view) })
}
pub fn read_view(&self, _token: &str, _params: ViewId) -> FutureResult<Option<View>, FlowyError> {
FutureResult::new(async { Ok(None) })
}
pub fn delete_view(&self, _token: &str, _params: RepeatedViewId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn update_view(&self, _token: &str, _params: UpdateViewParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn create_app(&self, _token: &str, params: CreateAppParams) -> FutureResult<App, FlowyError> {
let time = timestamp();
let app = App {
id: uuid_string(),
workspace_id: params.workspace_id,
name: params.name,
desc: params.desc,
belongings: RepeatedView::default(),
version: 0,
modified_time: time,
create_time: time,
};
FutureResult::new(async { Ok(app) })
}
pub fn read_app(&self, _token: &str, _params: AppId) -> FutureResult<Option<App>, FlowyError> {
FutureResult::new(async { Ok(None) })
}
pub fn update_app(&self, _token: &str, _params: UpdateAppParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn delete_app(&self, _token: &str, _params: AppId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn create_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn delete_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn read_trash(&self, _token: &str) -> FutureResult<RepeatedTrash, FlowyError> {
FutureResult::new(async {
let repeated_trash = RepeatedTrash { items: vec![] };
Ok(repeated_trash)
})
}
}
lazy_static! {
static ref MIDDLEWARE: Arc<CoreResponseMiddleware> = Arc::new(CoreResponseMiddleware::new());
}
pub struct CoreResponseMiddleware {
invalid_token_sender: broadcast::Sender<String>,
}
impl CoreResponseMiddleware {
fn new() -> Self {
let (sender, _) = broadcast::channel(10);
CoreResponseMiddleware {
invalid_token_sender: sender,
}
}
pub fn invalid_token_subscribe(&self) -> broadcast::Receiver<String> { self.invalid_token_sender.subscribe() }
}
impl ResponseMiddleware for CoreResponseMiddleware {
fn receive_response(&self, token: &Option<String>, response: &FlowyResponse) {
if let Some(error) = &response.error {
if error.is_unauthorized() {
tracing::error!("user is unauthorized");
match token {
None => {},
Some(token) => match self.invalid_token_sender.send(token.clone()) {
Ok(_) => {},
Err(e) => tracing::error!("{:?}", e),
},
}
}
}
}
}
fn request_builder() -> HttpRequestBuilder { HttpRequestBuilder::new().middleware(MIDDLEWARE.clone()) }
pub async fn create_workspace_request(
@ -474,3 +332,40 @@ pub async fn read_trash_request(token: &str, url: &str) -> Result<RepeatedTrash,
.await?;
Ok(repeated_trash)
}
lazy_static! {
static ref MIDDLEWARE: Arc<CoreResponseMiddleware> = Arc::new(CoreResponseMiddleware::new());
}
pub struct CoreResponseMiddleware {
invalid_token_sender: broadcast::Sender<String>,
}
impl CoreResponseMiddleware {
fn new() -> Self {
let (sender, _) = broadcast::channel(10);
CoreResponseMiddleware {
invalid_token_sender: sender,
}
}
#[allow(dead_code)]
fn invalid_token_subscribe(&self) -> broadcast::Receiver<String> { self.invalid_token_sender.subscribe() }
}
impl ResponseMiddleware for CoreResponseMiddleware {
fn receive_response(&self, token: &Option<String>, response: &FlowyResponse) {
if let Some(error) = &response.error {
if error.is_unauthorized() {
tracing::error!("user is unauthorized");
match token {
None => {},
Some(token) => match self.invalid_token_sender.send(token.clone()) {
Ok(_) => {},
Err(e) => tracing::error!("{:?}", e),
},
}
}
}
}
}

View File

@ -3,10 +3,8 @@ use backend_service::{
request::{HttpRequestBuilder, ResponseMiddleware},
response::FlowyResponse,
};
use flowy_collaboration::{
client_document::default::initial_delta_string,
entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams},
};
use flowy_collaboration::entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams};
use flowy_document::DocumentCloudService;
use flowy_error::FlowyError;
use lazy_static::lazy_static;
use lib_infra::future::FutureResult;
@ -20,56 +18,26 @@ impl DocumentHttpCloudService {
pub fn new(config: ClientServerConfiguration) -> Self { Self { config } }
}
impl DocumentHttpCloudService {
pub fn create_document_request(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> {
impl DocumentCloudService for DocumentHttpCloudService {
fn create_document(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { create_document_request(&token, params, &url).await })
}
pub fn read_document_request(
&self,
token: &str,
params: DocumentId,
) -> FutureResult<Option<DocumentInfo>, FlowyError> {
fn read_document(&self, token: &str, params: DocumentId) -> FutureResult<Option<DocumentInfo>, FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { read_document_request(&token, params, &url).await })
}
pub fn update_document_request(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
fn update_document(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { reset_doc_request(&token, params, &url).await })
}
}
pub struct DocumentLocalCloudService {}
impl DocumentLocalCloudService {
pub fn create_document_request(&self, _token: &str, _params: CreateDocParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn read_document_request(
&self,
_token: &str,
params: DocumentId,
) -> FutureResult<Option<DocumentInfo>, FlowyError> {
let doc = DocumentInfo {
doc_id: params.doc_id,
text: initial_delta_string(),
rev_id: 0,
base_rev_id: 0,
};
FutureResult::new(async { Ok(Some(doc)) })
}
pub fn update_document_request(&self, _token: &str, _params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
}
pub async fn create_document_request(token: &str, params: CreateDocParams, url: &str) -> Result<(), FlowyError> {
let _ = request_builder()
.post(&url.to_owned())

View File

@ -1,5 +1,6 @@
use backend_service::{configuration::*, errors::ServerError, request::HttpRequestBuilder};
use flowy_error::FlowyError;
use flowy_user::module::UserCloudService;
use flowy_user_data_model::entities::{
SignInParams,
SignInResponse,
@ -8,7 +9,7 @@ use flowy_user_data_model::entities::{
UpdateUserParams,
UserProfile,
};
use lib_infra::{future::FutureResult, uuid_string};
use lib_infra::future::FutureResult;
pub struct UserHttpCloudService {
config: ClientServerConfiguration,
@ -17,8 +18,8 @@ impl UserHttpCloudService {
pub fn new(config: &ClientServerConfiguration) -> Self { Self { config: config.clone() } }
}
impl UserHttpCloudService {
pub fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> {
impl UserCloudService for UserHttpCloudService {
fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> {
let url = self.config.sign_up_url();
FutureResult::new(async move {
let resp = user_sign_up_request(params, &url).await?;
@ -26,7 +27,7 @@ impl UserHttpCloudService {
})
}
pub fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> {
fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> {
let url = self.config.sign_in_url();
FutureResult::new(async move {
let resp = user_sign_in_request(params, &url).await?;
@ -34,7 +35,7 @@ impl UserHttpCloudService {
})
}
pub fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> {
fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.sign_out_url();
FutureResult::new(async move {
@ -43,7 +44,7 @@ impl UserHttpCloudService {
})
}
pub fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> {
fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.user_profile_url();
FutureResult::new(async move {
@ -52,7 +53,7 @@ impl UserHttpCloudService {
})
}
pub fn get_user(&self, token: &str) -> FutureResult<UserProfile, FlowyError> {
fn get_user(&self, token: &str) -> FutureResult<UserProfile, FlowyError> {
let token = token.to_owned();
let url = self.config.user_profile_url();
FutureResult::new(async move {
@ -61,49 +62,7 @@ impl UserHttpCloudService {
})
}
pub fn ws_addr(&self) -> String { self.config.ws_addr() }
}
pub struct UserLocalCloudService();
impl UserLocalCloudService {
pub fn new(_config: &ClientServerConfiguration) -> Self { Self() }
}
impl UserLocalCloudService {
pub fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> {
let uid = uuid_string();
FutureResult::new(async move {
Ok(SignUpResponse {
user_id: uid.clone(),
name: params.name,
email: params.email,
token: uid,
})
})
}
pub fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> {
let user_id = uuid_string();
FutureResult::new(async {
Ok(SignInResponse {
user_id: user_id.clone(),
name: params.name,
email: params.email,
token: user_id,
})
})
}
pub fn sign_out(&self, _token: &str) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
pub fn update_user(&self, _token: &str, _params: UpdateUserParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn get_user(&self, _token: &str) -> FutureResult<UserProfile, FlowyError> {
FutureResult::new(async { Ok(UserProfile::default()) })
}
pub fn ws_addr(&self) -> String { "ws://localhost:8000/ws/".to_owned() }
fn ws_addr(&self) -> String { self.config.ws_addr() }
}
pub async fn user_sign_up_request(params: SignUpParams, url: &str) -> Result<SignUpResponse, ServerError> {

View File

@ -1,7 +1,8 @@
pub mod cloud;
pub mod entities;
mod event;
mod handlers;
pub mod http_server;
pub mod local_server;
pub mod module;
pub mod protobuf;
pub mod ws;

View File

@ -0,0 +1,27 @@
use backend_service::configuration::ClientServerConfiguration;
use tokio::sync::{broadcast, mpsc};
mod persistence;
mod server;
mod ws;
pub use server::*;
pub use ws::*;
pub struct LocalServerContext {
pub local_ws: LocalWebSocket,
pub local_server: LocalServer,
}
pub fn build_server(_config: &ClientServerConfiguration) -> LocalServerContext {
let (client_ws_sender, server_ws_receiver) = mpsc::unbounded_channel();
let (server_ws_sender, _) = broadcast::channel(16);
// server_ws_sender -> client_ws_receiver
// server_ws_receiver <- client_ws_sender
let local_ws = LocalWebSocket::new(server_ws_receiver, server_ws_sender.clone());
let client_ws_receiver = server_ws_sender;
let local_server = LocalServer::new(client_ws_sender, client_ws_receiver);
LocalServerContext { local_ws, local_server }
}

View File

@ -1,5 +1,3 @@
use crate::ws::local::DocumentCloudStorage;
use flowy_collaboration::{
entities::doc::DocumentInfo,
errors::CollaborateError,
@ -14,6 +12,21 @@ use std::{
sync::Arc,
};
pub trait DocumentCloudStorage: Send + Sync {
fn set_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>;
fn get_revisions(
&self,
doc_id: &str,
rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<RepeatedRevisionPB, CollaborateError>;
fn reset_document(
&self,
doc_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<(), CollaborateError>;
}
pub(crate) struct LocalDocumentCloudPersistence {
// For the moment, we use memory to cache the data, it will be implemented with other storage.
// Like the Firestore,Dropbox.etc.

View File

@ -0,0 +1,372 @@
use crate::local_server::persistence::LocalDocumentCloudPersistence;
use async_stream::stream;
use bytes::Bytes;
use flowy_collaboration::{
client_document::default::initial_delta_string,
entities::{
doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams},
ws::{DocumentClientWSData, DocumentClientWSDataType},
},
errors::CollaborateError,
protobuf::DocumentClientWSData as DocumentClientWSDataPB,
server_document::*,
};
use flowy_core::module::WorkspaceCloudService;
use flowy_error::{internal_error, FlowyError};
use futures_util::stream::StreamExt;
use lib_ws::{WSModule, WebSocketRawMessage};
use parking_lot::RwLock;
use std::{
convert::{TryFrom, TryInto},
fmt::Debug,
sync::Arc,
};
use tokio::sync::{broadcast, mpsc, mpsc::UnboundedSender};
pub struct LocalServer {
doc_manager: Arc<ServerDocumentManager>,
stop_tx: RwLock<Option<mpsc::Sender<()>>>,
client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
client_ws_receiver: broadcast::Sender<WebSocketRawMessage>,
}
impl LocalServer {
pub fn new(
client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
client_ws_receiver: broadcast::Sender<WebSocketRawMessage>,
) -> Self {
let persistence = Arc::new(LocalDocumentCloudPersistence::default());
let doc_manager = Arc::new(ServerDocumentManager::new(persistence));
let stop_tx = RwLock::new(None);
LocalServer {
doc_manager,
stop_tx,
client_ws_sender,
client_ws_receiver,
}
}
pub async fn stop(&self) {
if let Some(stop_tx) = self.stop_tx.read().clone() {
let _ = stop_tx.send(()).await;
}
}
pub fn run(&self) {
let (stop_tx, stop_rx) = mpsc::channel(1);
*self.stop_tx.write() = Some(stop_tx);
let runner = LocalWebSocketRunner {
doc_manager: self.doc_manager.clone(),
stop_rx: Some(stop_rx),
client_ws_sender: self.client_ws_sender.clone(),
client_ws_receiver: Some(self.client_ws_receiver.subscribe()),
};
tokio::spawn(runner.run());
}
}
struct LocalWebSocketRunner {
doc_manager: Arc<ServerDocumentManager>,
stop_rx: Option<mpsc::Receiver<()>>,
client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
client_ws_receiver: Option<broadcast::Receiver<WebSocketRawMessage>>,
}
impl LocalWebSocketRunner {
pub async fn run(mut self) {
let mut stop_rx = self.stop_rx.take().expect("Only run once");
let mut client_ws_receiver = self.client_ws_receiver.take().expect("Only run once");
let stream = stream! {
loop {
tokio::select! {
result = client_ws_receiver.recv() => {
match result {
Ok(msg) => yield msg,
Err(_e) => {},
}
},
_ = stop_rx.recv() => {
tracing::trace!("[LocalWebSocketRunner] stop");
break
},
};
}
};
stream
.for_each(|message| async {
match self.handle_message(message).await {
Ok(_) => {},
Err(e) => tracing::error!("[LocalWebSocketRunner]: {}", e),
}
})
.await;
}
async fn handle_message(&self, message: WebSocketRawMessage) -> Result<(), FlowyError> {
let bytes = Bytes::from(message.data);
let client_data = DocumentClientWSData::try_from(bytes).map_err(internal_error)?;
let _ = self.handle_client_data(client_data, "".to_owned()).await?;
Ok(())
}
pub async fn handle_client_data(
&self,
client_data: DocumentClientWSData,
user_id: String,
) -> Result<(), CollaborateError> {
tracing::trace!(
"[LocalDocumentServer] receive: {}:{}-{:?} ",
client_data.doc_id,
client_data.id(),
client_data.ty,
);
let client_ws_sender = self.client_ws_sender.clone();
let user = Arc::new(LocalDocumentUser {
user_id,
client_ws_sender,
});
let ty = client_data.ty.clone();
let document_client_data: DocumentClientWSDataPB = client_data.try_into().unwrap();
match ty {
DocumentClientWSDataType::ClientPushRev => {
let _ = self
.doc_manager
.handle_client_revisions(user, document_client_data)
.await?;
},
DocumentClientWSDataType::ClientPing => {
let _ = self.doc_manager.handle_client_ping(user, document_client_data).await?;
},
}
Ok(())
}
}
#[derive(Debug)]
struct LocalDocumentUser {
user_id: String,
client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
}
impl RevisionUser for LocalDocumentUser {
fn user_id(&self) -> String { self.user_id.clone() }
fn receive(&self, resp: SyncResponse) {
let sender = self.client_ws_sender.clone();
let send_fn = |sender: UnboundedSender<WebSocketRawMessage>, msg: WebSocketRawMessage| match sender.send(msg) {
Ok(_) => {},
Err(e) => {
tracing::error!("LocalDocumentUser send message failed: {}", e);
},
};
tokio::spawn(async move {
match resp {
SyncResponse::Pull(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
send_fn(sender, msg);
},
SyncResponse::Push(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
send_fn(sender, msg);
},
SyncResponse::Ack(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
send_fn(sender, msg);
},
}
});
}
}
use flowy_core_data_model::entities::{
app::{App, AppId, CreateAppParams, RepeatedApp, UpdateAppParams},
trash::{RepeatedTrash, RepeatedTrashId},
view::{CreateViewParams, RepeatedView, RepeatedViewId, UpdateViewParams, View, ViewId},
workspace::{CreateWorkspaceParams, RepeatedWorkspace, UpdateWorkspaceParams, Workspace, WorkspaceId},
};
use flowy_document::DocumentCloudService;
use flowy_user::module::UserCloudService;
use flowy_user_data_model::entities::{
SignInParams,
SignInResponse,
SignUpParams,
SignUpResponse,
UpdateUserParams,
UserProfile,
};
use lib_infra::{future::FutureResult, timestamp, uuid_string};
impl WorkspaceCloudService for LocalServer {
fn init(&self) {}
fn create_workspace(&self, _token: &str, params: CreateWorkspaceParams) -> FutureResult<Workspace, FlowyError> {
let time = timestamp();
let workspace = Workspace {
id: uuid_string(),
name: params.name,
desc: params.desc,
apps: RepeatedApp::default(),
modified_time: time,
create_time: time,
};
FutureResult::new(async { Ok(workspace) })
}
fn read_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<RepeatedWorkspace, FlowyError> {
FutureResult::new(async {
let repeated_workspace = RepeatedWorkspace { items: vec![] };
Ok(repeated_workspace)
})
}
fn update_workspace(&self, _token: &str, _params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn delete_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn create_view(&self, _token: &str, params: CreateViewParams) -> FutureResult<View, FlowyError> {
let time = timestamp();
let view = View {
id: params.view_id,
belong_to_id: params.belong_to_id,
name: params.name,
desc: params.desc,
view_type: params.view_type,
version: 0,
belongings: RepeatedView::default(),
modified_time: time,
create_time: time,
};
FutureResult::new(async { Ok(view) })
}
fn read_view(&self, _token: &str, _params: ViewId) -> FutureResult<Option<View>, FlowyError> {
FutureResult::new(async { Ok(None) })
}
fn delete_view(&self, _token: &str, _params: RepeatedViewId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn update_view(&self, _token: &str, _params: UpdateViewParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn create_app(&self, _token: &str, params: CreateAppParams) -> FutureResult<App, FlowyError> {
let time = timestamp();
let app = App {
id: uuid_string(),
workspace_id: params.workspace_id,
name: params.name,
desc: params.desc,
belongings: RepeatedView::default(),
version: 0,
modified_time: time,
create_time: time,
};
FutureResult::new(async { Ok(app) })
}
fn read_app(&self, _token: &str, _params: AppId) -> FutureResult<Option<App>, FlowyError> {
FutureResult::new(async { Ok(None) })
}
fn update_app(&self, _token: &str, _params: UpdateAppParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn delete_app(&self, _token: &str, _params: AppId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn create_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn delete_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn read_trash(&self, _token: &str) -> FutureResult<RepeatedTrash, FlowyError> {
FutureResult::new(async {
let repeated_trash = RepeatedTrash { items: vec![] };
Ok(repeated_trash)
})
}
}
impl UserCloudService for LocalServer {
fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> {
let uid = uuid_string();
FutureResult::new(async move {
Ok(SignUpResponse {
user_id: uid.clone(),
name: params.name,
email: params.email,
token: uid,
})
})
}
fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> {
let user_id = uuid_string();
FutureResult::new(async {
Ok(SignInResponse {
user_id: user_id.clone(),
name: params.name,
email: params.email,
token: user_id,
})
})
}
fn sign_out(&self, _token: &str) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn update_user(&self, _token: &str, _params: UpdateUserParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn get_user(&self, _token: &str) -> FutureResult<UserProfile, FlowyError> {
FutureResult::new(async { Ok(UserProfile::default()) })
}
fn ws_addr(&self) -> String { "ws://localhost:8000/ws/".to_owned() }
}
impl DocumentCloudService for LocalServer {
fn create_document(&self, _token: &str, _params: CreateDocParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn read_document(&self, _token: &str, params: DocumentId) -> FutureResult<Option<DocumentInfo>, FlowyError> {
let doc = DocumentInfo {
doc_id: params.doc_id,
text: initial_delta_string(),
rev_id: 0,
base_rev_id: 0,
};
FutureResult::new(async { Ok(Some(doc)) })
}
fn update_document(&self, _token: &str, _params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
}

View File

@ -0,0 +1,82 @@
use crate::ws::connection::{FlowyRawWebSocket, FlowyWSSender};
use dashmap::DashMap;
use flowy_error::FlowyError;
use lib_infra::future::FutureResult;
use lib_ws::{WSConnectState, WSMessageReceiver, WSModule, WebSocketRawMessage};
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::sync::{broadcast, broadcast::Receiver, mpsc::UnboundedReceiver};
pub struct LocalWebSocket {
user_id: Arc<RwLock<Option<String>>>,
receivers: Arc<DashMap<WSModule, Arc<dyn WSMessageReceiver>>>,
state_sender: broadcast::Sender<WSConnectState>,
server_ws_receiver: RwLock<Option<UnboundedReceiver<WebSocketRawMessage>>>,
server_ws_sender: broadcast::Sender<WebSocketRawMessage>,
}
impl LocalWebSocket {
pub fn new(
server_ws_receiver: UnboundedReceiver<WebSocketRawMessage>,
server_ws_sender: broadcast::Sender<WebSocketRawMessage>,
) -> Self {
let user_id = Arc::new(RwLock::new(None));
let receivers = Arc::new(DashMap::new());
let server_ws_receiver = RwLock::new(Some(server_ws_receiver));
let (state_sender, _) = broadcast::channel(16);
LocalWebSocket {
user_id,
receivers,
state_sender,
server_ws_receiver,
server_ws_sender,
}
}
}
impl FlowyRawWebSocket for LocalWebSocket {
fn initialize(&self) -> FutureResult<(), FlowyError> {
let mut server_ws_receiver = self.server_ws_receiver.write().take().expect("Only take once");
let receivers = self.receivers.clone();
tokio::spawn(async move {
while let Some(message) = server_ws_receiver.recv().await {
match receivers.get(&message.module) {
None => tracing::error!("Can't find any handler for message: {:?}", message),
Some(receiver) => receiver.receive_message(message.clone()),
}
}
});
FutureResult::new(async { Ok(()) })
}
fn start_connect(&self, _addr: String, user_id: String) -> FutureResult<(), FlowyError> {
*self.user_id.write() = Some(user_id);
FutureResult::new(async { Ok(()) })
}
fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.state_sender.subscribe() }
fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn add_receiver(&self, receiver: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
self.receivers.insert(receiver.source(), receiver);
Ok(())
}
fn sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> {
let ws = LocalWebSocketAdaptor(self.server_ws_sender.clone());
Ok(Arc::new(ws))
}
}
#[derive(Clone)]
struct LocalWebSocketAdaptor(broadcast::Sender<WebSocketRawMessage>);
impl FlowyWSSender for LocalWebSocketAdaptor {
fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> {
let _ = self.0.send(msg);
Ok(())
}
}

View File

@ -4,6 +4,7 @@ pub use flowy_error::FlowyError;
use lib_infra::future::FutureResult;
pub use lib_ws::{WSConnectState, WSMessageReceiver, WebSocketRawMessage};
use lib_ws::WSController;
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::sync::broadcast;
@ -30,7 +31,18 @@ pub struct FlowyWebSocketConnect {
}
impl FlowyWebSocketConnect {
pub fn new(addr: String, ws: Arc<dyn FlowyRawWebSocket>) -> Self {
pub fn new(addr: String) -> Self {
let ws = Arc::new(Arc::new(WSController::new()));
let (status_notifier, _) = broadcast::channel(10);
FlowyWebSocketConnect {
inner: ws,
connect_type: RwLock::new(NetworkType::default()),
status_notifier,
addr,
}
}
pub fn from_local(addr: String, ws: Arc<dyn FlowyRawWebSocket>) -> Self {
let (status_notifier, _) = broadcast::channel(10);
FlowyWebSocketConnect {
inner: ws,

View File

@ -1,3 +0,0 @@
pub use http_ws::*;
mod http_ws;

View File

@ -1,104 +0,0 @@
use crate::ws::local::persistence::LocalDocumentCloudPersistence;
use bytes::Bytes;
use flowy_collaboration::{
entities::ws::{DocumentClientWSData, DocumentClientWSDataType},
errors::CollaborateError,
protobuf::DocumentClientWSData as DocumentClientWSDataPB,
server_document::*,
};
use lib_ws::{WSModule, WebSocketRawMessage};
use std::{convert::TryInto, fmt::Debug, sync::Arc};
use tokio::sync::{mpsc, mpsc::UnboundedSender};
pub struct LocalDocumentServer {
doc_manager: Arc<ServerDocumentManager>,
sender: mpsc::UnboundedSender<WebSocketRawMessage>,
}
impl LocalDocumentServer {
pub fn new(sender: mpsc::UnboundedSender<WebSocketRawMessage>) -> Self {
let persistence = Arc::new(LocalDocumentCloudPersistence::default());
let doc_manager = Arc::new(ServerDocumentManager::new(persistence));
LocalDocumentServer { doc_manager, sender }
}
pub async fn handle_client_data(
&self,
client_data: DocumentClientWSData,
user_id: String,
) -> Result<(), CollaborateError> {
tracing::trace!(
"[LocalDocumentServer] receive: {}:{}-{:?} ",
client_data.doc_id,
client_data.id(),
client_data.ty,
);
let user = Arc::new(LocalDocumentUser {
user_id,
ws_sender: self.sender.clone(),
});
let ty = client_data.ty.clone();
let document_client_data: DocumentClientWSDataPB = client_data.try_into().unwrap();
match ty {
DocumentClientWSDataType::ClientPushRev => {
let _ = self
.doc_manager
.handle_client_revisions(user, document_client_data)
.await?;
},
DocumentClientWSDataType::ClientPing => {
let _ = self.doc_manager.handle_client_ping(user, document_client_data).await?;
},
}
Ok(())
}
}
#[derive(Debug)]
struct LocalDocumentUser {
user_id: String,
ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
}
impl RevisionUser for LocalDocumentUser {
fn user_id(&self) -> String { self.user_id.clone() }
fn receive(&self, resp: SyncResponse) {
let sender = self.ws_sender.clone();
let send_fn = |sender: UnboundedSender<WebSocketRawMessage>, msg: WebSocketRawMessage| match sender.send(msg) {
Ok(_) => {},
Err(e) => {
tracing::error!("LocalDocumentUser send message failed: {}", e);
},
};
tokio::spawn(async move {
match resp {
SyncResponse::Pull(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
send_fn(sender, msg);
},
SyncResponse::Push(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
send_fn(sender, msg);
},
SyncResponse::Ack(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
send_fn(sender, msg);
},
}
});
}
}

View File

@ -1,166 +0,0 @@
use crate::ws::{
connection::{FlowyRawWebSocket, FlowyWSSender},
local::local_server::LocalDocumentServer,
};
use bytes::Bytes;
use dashmap::DashMap;
use flowy_collaboration::entities::ws::*;
use flowy_error::{internal_error, FlowyError};
use lib_infra::future::FutureResult;
use lib_ws::{WSConnectState, WSMessageReceiver, WSModule, WebSocketRawMessage};
use parking_lot::RwLock;
use std::{convert::TryFrom, sync::Arc};
use tokio::sync::{broadcast, broadcast::Receiver, mpsc, mpsc::UnboundedReceiver};
pub struct LocalWebSocket {
receivers: Arc<DashMap<WSModule, Arc<dyn WSMessageReceiver>>>,
state_sender: broadcast::Sender<WSConnectState>,
// LocalWSSender uses the mpsc::channel sender to simulate the web socket. It spawns a receiver that uses the
// LocalDocumentServer to handle the message. The server will send the WebSocketRawMessage messages that will
// be handled by the WebSocketRawMessage receivers.
ws_sender: LocalWSSender,
local_server: Arc<LocalDocumentServer>,
local_server_rx: RwLock<Option<UnboundedReceiver<WebSocketRawMessage>>>,
local_server_stop_tx: RwLock<Option<mpsc::Sender<()>>>,
user_id: Arc<RwLock<Option<String>>>,
}
impl std::default::Default for LocalWebSocket {
fn default() -> Self {
let (state_sender, _) = broadcast::channel(16);
let ws_sender = LocalWSSender::default();
let receivers = Arc::new(DashMap::new());
let (server_tx, server_rx) = mpsc::unbounded_channel();
let local_server = Arc::new(LocalDocumentServer::new(server_tx));
let local_server_rx = RwLock::new(Some(server_rx));
let local_server_stop_tx = RwLock::new(None);
let user_id = Arc::new(RwLock::new(None));
LocalWebSocket {
receivers,
state_sender,
ws_sender,
local_server,
local_server_rx,
local_server_stop_tx,
user_id,
}
}
}
impl LocalWebSocket {
fn cancel_pre_spawn_client(&self) {
if let Some(stop_tx) = self.local_server_stop_tx.read().clone() {
tokio::spawn(async move {
let _ = stop_tx.send(()).await;
});
}
}
fn spawn_client_ws_receiver(&self, _addr: String) {
let mut ws_receiver = self.ws_sender.subscribe();
let local_server = self.local_server.clone();
let user_id = self.user_id.clone();
let _ = self.cancel_pre_spawn_client();
let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
*self.local_server_stop_tx.write() = Some(stop_tx);
tokio::spawn(async move {
loop {
tokio::select! {
result = ws_receiver.recv() => {
match result {
Ok(message) => {
let user_id = user_id.read().clone();
handle_ws_raw_message(user_id, &local_server, message).await;
},
Err(e) => tracing::error!("[LocalWebSocket] error: {}", e),
}
}
_ = stop_rx.recv() => {
break
},
}
}
});
}
}
async fn handle_ws_raw_message(
user_id: Option<String>,
local_server: &Arc<LocalDocumentServer>,
message: WebSocketRawMessage,
) {
let f = || async {
match user_id {
None => Ok(()),
Some(user_id) => {
let bytes = Bytes::from(message.data);
let client_data = DocumentClientWSData::try_from(bytes).map_err(internal_error)?;
let _ = local_server.handle_client_data(client_data, user_id).await?;
Ok::<(), FlowyError>(())
},
}
};
if let Err(e) = f().await {
tracing::error!("[LocalWebSocket] error: {:?}", e);
}
}
impl FlowyRawWebSocket for LocalWebSocket {
fn initialize(&self) -> FutureResult<(), FlowyError> {
let mut server_rx = self.local_server_rx.write().take().expect("Only take once");
let receivers = self.receivers.clone();
tokio::spawn(async move {
while let Some(message) = server_rx.recv().await {
match receivers.get(&message.module) {
None => tracing::error!("Can't find any handler for message: {:?}", message),
Some(receiver) => receiver.receive_message(message.clone()),
}
}
});
FutureResult::new(async { Ok(()) })
}
fn start_connect(&self, addr: String, user_id: String) -> FutureResult<(), FlowyError> {
*self.user_id.write() = Some(user_id);
self.spawn_client_ws_receiver(addr);
FutureResult::new(async { Ok(()) })
}
fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.state_sender.subscribe() }
fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn add_receiver(&self, receiver: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
self.receivers.insert(receiver.source(), receiver);
Ok(())
}
fn sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) }
}
#[derive(Clone)]
struct LocalWSSender(broadcast::Sender<WebSocketRawMessage>);
impl std::default::Default for LocalWSSender {
fn default() -> Self {
let (tx, _) = broadcast::channel(16);
Self(tx)
}
}
impl FlowyWSSender for LocalWSSender {
fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> {
let _ = self.0.send(msg);
Ok(())
}
}
impl std::ops::Deref for LocalWSSender {
type Target = broadcast::Sender<WebSocketRawMessage>;
fn deref(&self) -> &Self::Target { &self.0 }
}

View File

@ -1,24 +0,0 @@
mod local_server;
mod local_ws;
mod persistence;
use flowy_collaboration::errors::CollaborateError;
pub use local_ws::*;
use flowy_collaboration::protobuf::RepeatedRevision as RepeatedRevisionPB;
use lib_infra::future::BoxResultFuture;
pub trait DocumentCloudStorage: Send + Sync {
fn set_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>;
fn get_revisions(
&self,
doc_id: &str,
rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<RepeatedRevisionPB, CollaborateError>;
fn reset_document(
&self,
doc_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<(), CollaborateError>;
}

View File

@ -1,3 +1,2 @@
pub mod connection;
pub mod http;
pub mod local;
pub mod http_ws;

View File

@ -2,34 +2,17 @@ use backend_service::configuration::ClientServerConfiguration;
use flowy_core::{
errors::FlowyError,
module::{WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser},
prelude::{
App,
AppId,
CreateAppParams,
CreateViewParams,
CreateWorkspaceParams,
RepeatedTrash,
RepeatedTrashId,
RepeatedViewId,
RepeatedWorkspace,
UpdateAppParams,
UpdateViewParams,
UpdateWorkspaceParams,
View,
ViewId,
Workspace,
WorkspaceId,
},
};
use flowy_database::ConnectionPool;
use flowy_net::cloud::core::{CoreHttpCloudService, CoreLocalCloudService};
use flowy_net::{http_server::core::CoreHttpCloudService, local_server::LocalServer};
use flowy_user::services::UserSession;
use lib_infra::future::FutureResult;
use std::sync::Arc;
pub struct CoreDepsResolver();
impl CoreDepsResolver {
pub fn resolve(
local_server: Option<Arc<LocalServer>>,
user_session: Arc<UserSession>,
server_config: &ClientServerConfiguration,
) -> (
@ -39,7 +22,10 @@ impl CoreDepsResolver {
) {
let user: Arc<dyn WorkspaceUser> = Arc::new(WorkspaceUserImpl(user_session.clone()));
let database: Arc<dyn WorkspaceDatabase> = Arc::new(WorkspaceDatabaseImpl(user_session));
let cloud_service = make_core_cloud_service(server_config);
let cloud_service: Arc<dyn WorkspaceCloudService> = match local_server {
None => Arc::new(CoreHttpCloudService::new(server_config.clone())),
Some(local_server) => local_server,
};
(user, database, cloud_service)
}
}
@ -57,144 +43,3 @@ impl WorkspaceUser for WorkspaceUserImpl {
fn token(&self) -> Result<String, FlowyError> { self.0.token().map_err(|e| FlowyError::internal().context(e)) }
}
fn make_core_cloud_service(config: &ClientServerConfiguration) -> Arc<dyn WorkspaceCloudService> {
if cfg!(feature = "http_server") {
Arc::new(CoreHttpCloudServiceAdaptor::new(config))
} else {
Arc::new(CoreLocalCloudServiceAdaptor::new(config))
}
}
struct CoreHttpCloudServiceAdaptor(CoreHttpCloudService);
impl CoreHttpCloudServiceAdaptor {
fn new(config: &ClientServerConfiguration) -> Self { Self(CoreHttpCloudService::new(config.clone())) }
}
impl WorkspaceCloudService for CoreHttpCloudServiceAdaptor {
fn init(&self) { self.0.init() }
fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult<Workspace, FlowyError> {
self.0.create_workspace(token, params)
}
fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<RepeatedWorkspace, FlowyError> {
self.0.read_workspace(token, params)
}
fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
self.0.update_workspace(token, params)
}
fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> {
self.0.delete_workspace(token, params)
}
fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult<View, FlowyError> {
self.0.create_view(token, params)
}
fn read_view(&self, token: &str, params: ViewId) -> FutureResult<Option<View>, FlowyError> {
self.0.read_view(token, params)
}
fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> {
self.0.delete_view(token, params)
}
fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> {
self.0.update_view(token, params)
}
fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult<App, FlowyError> {
self.0.create_app(token, params)
}
fn read_app(&self, token: &str, params: AppId) -> FutureResult<Option<App>, FlowyError> {
self.0.read_app(token, params)
}
fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> {
self.0.update_app(token, params)
}
fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> {
self.0.delete_app(token, params)
}
fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
self.0.create_trash(token, params)
}
fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
self.0.delete_trash(token, params)
}
fn read_trash(&self, token: &str) -> FutureResult<RepeatedTrash, FlowyError> { self.0.read_trash(token) }
}
struct CoreLocalCloudServiceAdaptor(CoreLocalCloudService);
impl CoreLocalCloudServiceAdaptor {
fn new(config: &ClientServerConfiguration) -> Self { Self(CoreLocalCloudService::new(config)) }
}
impl WorkspaceCloudService for CoreLocalCloudServiceAdaptor {
fn init(&self) { self.0.init() }
fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult<Workspace, FlowyError> {
self.0.create_workspace(token, params)
}
fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<RepeatedWorkspace, FlowyError> {
self.0.read_workspace(token, params)
}
fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
self.0.update_workspace(token, params)
}
fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> {
self.0.delete_workspace(token, params)
}
fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult<View, FlowyError> {
self.0.create_view(token, params)
}
fn read_view(&self, token: &str, params: ViewId) -> FutureResult<Option<View>, FlowyError> {
self.0.read_view(token, params)
}
fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> {
self.0.delete_view(token, params)
}
fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> {
self.0.update_view(token, params)
}
fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult<App, FlowyError> {
self.0.create_app(token, params)
}
fn read_app(&self, token: &str, params: AppId) -> FutureResult<Option<App>, FlowyError> {
self.0.read_app(token, params)
}
fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> {
self.0.update_app(token, params)
}
fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> {
self.0.delete_app(token, params)
}
fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
self.0.create_trash(token, params)
}
fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
self.0.delete_trash(token, params)
}
fn read_trash(&self, token: &str) -> FutureResult<RepeatedTrash, FlowyError> { self.0.read_trash(token) }
}

View File

@ -1,9 +1,6 @@
use backend_service::configuration::ClientServerConfiguration;
use bytes::Bytes;
use flowy_collaboration::entities::{
doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams},
ws::DocumentClientWSData,
};
use flowy_collaboration::entities::ws::DocumentClientWSData;
use flowy_database::ConnectionPool;
use flowy_document::{
context::DocumentUser,
@ -12,11 +9,12 @@ use flowy_document::{
DocumentCloudService,
};
use flowy_net::{
cloud::document::{DocumentHttpCloudService, DocumentLocalCloudService},
http_server::document::DocumentHttpCloudService,
local_server::LocalServer,
ws::connection::FlowyWebSocketConnect,
};
use flowy_user::services::UserSession;
use lib_infra::future::FutureResult;
use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage};
use std::{convert::TryInto, path::Path, sync::Arc};
@ -30,6 +28,7 @@ pub struct DocumentDependencies {
pub struct DocumentDepsResolver();
impl DocumentDepsResolver {
pub fn resolve(
local_server: Option<Arc<LocalServer>>,
ws_conn: Arc<FlowyWebSocketConnect>,
user_session: Arc<UserSession>,
server_config: &ClientServerConfiguration,
@ -39,7 +38,12 @@ impl DocumentDepsResolver {
let ws_receivers = Arc::new(DocumentWSReceivers::new());
let receiver = Arc::new(WSMessageReceiverImpl(ws_receivers.clone()));
ws_conn.add_ws_message_receiver(receiver).unwrap();
let cloud_service = make_document_cloud_service(server_config);
let cloud_service: Arc<dyn DocumentCloudService> = match local_server {
None => Arc::new(DocumentHttpCloudService::new(server_config.clone())),
Some(local_server) => local_server,
};
DocumentDependencies {
user,
ws_receivers,
@ -94,49 +98,3 @@ impl WSMessageReceiver for WSMessageReceiverImpl {
});
}
}
fn make_document_cloud_service(server_config: &ClientServerConfiguration) -> Arc<dyn DocumentCloudService> {
if cfg!(feature = "http_server") {
Arc::new(DocumentHttpCloudServiceAdaptor::new(server_config.clone()))
} else {
Arc::new(DocumentLocalCloudServiceAdaptor::new())
}
}
struct DocumentHttpCloudServiceAdaptor(DocumentHttpCloudService);
impl DocumentHttpCloudServiceAdaptor {
fn new(config: ClientServerConfiguration) -> Self {
DocumentHttpCloudServiceAdaptor(DocumentHttpCloudService::new(config))
}
}
impl DocumentCloudService for DocumentHttpCloudServiceAdaptor {
fn create_document(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> {
self.0.create_document_request(token, params)
}
fn read_document(&self, token: &str, params: DocumentId) -> FutureResult<Option<DocumentInfo>, FlowyError> {
self.0.read_document_request(token, params)
}
fn update_document(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
self.0.update_document_request(token, params)
}
}
struct DocumentLocalCloudServiceAdaptor(DocumentLocalCloudService);
impl DocumentLocalCloudServiceAdaptor {
fn new() -> Self { Self(DocumentLocalCloudService {}) }
}
impl DocumentCloudService for DocumentLocalCloudServiceAdaptor {
fn create_document(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> {
self.0.create_document_request(token, params)
}
fn read_document(&self, token: &str, params: DocumentId) -> FutureResult<Option<DocumentInfo>, FlowyError> {
self.0.read_document_request(token, params)
}
fn update_document(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
self.0.update_document_request(token, params)
}
}

View File

@ -1,58 +1,18 @@
use crate::FlowyError;
use backend_service::configuration::ClientServerConfiguration;
use flowy_net::cloud::user::{UserHttpCloudService, UserLocalCloudService};
use flowy_user::{
entities::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile},
module::UserCloudService,
};
use lib_infra::future::FutureResult;
use flowy_net::{http_server::user::UserHttpCloudService, local_server::LocalServer};
use flowy_user::module::UserCloudService;
use std::sync::Arc;
pub struct UserDepsResolver();
impl UserDepsResolver {
pub fn resolve(server_config: &ClientServerConfiguration) -> Arc<dyn UserCloudService> {
make_user_cloud_service(server_config)
pub fn resolve(
local_server: &Option<Arc<LocalServer>>,
server_config: &ClientServerConfiguration,
) -> Arc<dyn UserCloudService> {
match local_server.clone() {
None => Arc::new(UserHttpCloudService::new(server_config)),
Some(local_server) => local_server,
}
}
}
fn make_user_cloud_service(config: &ClientServerConfiguration) -> Arc<dyn UserCloudService> {
if cfg!(feature = "http_server") {
Arc::new(UserHttpCloudServiceAdaptor(UserHttpCloudService::new(config)))
} else {
Arc::new(UserLocalCloudServiceAdaptor(UserLocalCloudService::new(config)))
}
}
struct UserHttpCloudServiceAdaptor(UserHttpCloudService);
impl UserCloudService for UserHttpCloudServiceAdaptor {
fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> { self.0.sign_up(params) }
fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> { self.0.sign_in(params) }
fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> { self.0.sign_out(token) }
fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> {
self.0.update_user(token, params)
}
fn get_user(&self, token: &str) -> FutureResult<UserProfile, FlowyError> { self.0.get_user(token) }
fn ws_addr(&self) -> String { self.0.ws_addr() }
}
struct UserLocalCloudServiceAdaptor(UserLocalCloudService);
impl UserCloudService for UserLocalCloudServiceAdaptor {
fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> { self.0.sign_up(params) }
fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> { self.0.sign_in(params) }
fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> { self.0.sign_out(token) }
fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> {
self.0.update_user(token, params)
}
fn get_user(&self, token: &str) -> FutureResult<UserProfile, FlowyError> { self.0.get_user(token) }
fn ws_addr(&self) -> String { self.0.ws_addr() }
}

View File

@ -6,14 +6,12 @@ use flowy_core::{context::CoreContext, errors::FlowyError, module::init_core};
use flowy_document::context::DocumentContext;
use flowy_net::{
entities::NetworkType,
ws::{
connection::{listen_on_websocket, FlowyRawWebSocket, FlowyWebSocketConnect},
local::LocalWebSocket,
},
local_server::LocalServer,
ws::connection::{listen_on_websocket, FlowyWebSocketConnect},
};
use flowy_user::services::{notifier::UserStatus, UserSession, UserSessionConfig};
use lib_dispatch::prelude::*;
use lib_ws::WSController;
use module::mk_modules;
pub use module::*;
use std::{
@ -88,6 +86,7 @@ pub struct FlowySDK {
pub core: Arc<CoreContext>,
pub dispatcher: Arc<EventDispatcher>,
pub ws_conn: Arc<FlowyWebSocketConnect>,
pub local_server: Option<Arc<LocalServer>>,
}
impl FlowySDK {
@ -96,18 +95,25 @@ impl FlowySDK {
init_kv(&config.root);
tracing::debug!("🔥 {:?}", config);
let ws_conn = Arc::new(FlowyWebSocketConnect::new(
config.server_config.ws_addr(),
default_web_socket(),
));
let user_session = mk_user_session(&config, &config.server_config);
let flowy_document = mk_document(&ws_conn, &user_session, &config.server_config);
let core_ctx = mk_core_context(&user_session, &flowy_document, &config.server_config);
let ws_addr = config.server_config.ws_addr();
let (local_server, ws_conn) = if cfg!(feature = "http_server") {
let ws_conn = Arc::new(FlowyWebSocketConnect::new(ws_addr));
(None, ws_conn)
} else {
let context = flowy_net::local_server::build_server(&config.server_config);
let local_ws = Arc::new(context.local_ws);
let ws_conn = Arc::new(FlowyWebSocketConnect::from_local(ws_addr, local_ws));
(Some(Arc::new(context.local_server)), ws_conn)
};
let user_session = mk_user_session(&config, &local_server, &config.server_config);
let flowy_document = mk_document(&local_server, &ws_conn, &user_session, &config.server_config);
let core_ctx = mk_core_context(&local_server, &user_session, &flowy_document, &config.server_config);
//
let modules = mk_modules(&ws_conn, &core_ctx, &user_session);
let dispatcher = Arc::new(EventDispatcher::construct(|| modules));
_init(&dispatcher, &ws_conn, &user_session, &core_ctx);
_init(&local_server, &dispatcher, &ws_conn, &user_session, &core_ctx);
Self {
config,
@ -116,6 +122,7 @@ impl FlowySDK {
core: core_ctx,
dispatcher,
ws_conn,
local_server,
}
}
@ -123,6 +130,7 @@ impl FlowySDK {
}
fn _init(
local_server: &Option<Arc<LocalServer>>,
dispatch: &EventDispatcher,
ws_conn: &Arc<FlowyWebSocketConnect>,
user_session: &Arc<UserSession>,
@ -134,8 +142,13 @@ fn _init(
let cloned_core = core.clone();
let user_session = user_session.clone();
let ws_conn = ws_conn.clone();
let local_server = local_server.clone();
dispatch.spawn(async move {
if let Some(local_server) = local_server.as_ref() {
local_server.run();
}
user_session.init();
ws_conn.init().await;
listen_on_websocket(ws_conn.clone());
@ -206,36 +219,40 @@ fn init_log(config: &FlowySDKConfig) {
}
}
fn mk_user_session(config: &FlowySDKConfig, server_config: &ClientServerConfiguration) -> Arc<UserSession> {
fn mk_user_session(
config: &FlowySDKConfig,
local_server: &Option<Arc<LocalServer>>,
server_config: &ClientServerConfiguration,
) -> Arc<UserSession> {
let session_cache_key = format!("{}_session_cache", &config.name);
let user_config = UserSessionConfig::new(&config.root, &session_cache_key);
let cloud_service = UserDepsResolver::resolve(server_config);
let cloud_service = UserDepsResolver::resolve(local_server, server_config);
Arc::new(UserSession::new(user_config, cloud_service))
}
fn mk_core_context(
local_server: &Option<Arc<LocalServer>>,
user_session: &Arc<UserSession>,
flowy_document: &Arc<DocumentContext>,
server_config: &ClientServerConfiguration,
) -> Arc<CoreContext> {
let (user, database, cloud_service) = CoreDepsResolver::resolve(user_session.clone(), server_config);
let (user, database, cloud_service) =
CoreDepsResolver::resolve(local_server.clone(), user_session.clone(), server_config);
init_core(user, database, flowy_document.clone(), cloud_service)
}
fn default_web_socket() -> Arc<dyn FlowyRawWebSocket> {
if cfg!(feature = "http_server") {
Arc::new(Arc::new(WSController::new()))
} else {
Arc::new(LocalWebSocket::default())
}
}
pub fn mk_document(
local_server: &Option<Arc<LocalServer>>,
ws_conn: &Arc<FlowyWebSocketConnect>,
user_session: &Arc<UserSession>,
server_config: &ClientServerConfiguration,
) -> Arc<DocumentContext> {
let dependencies = DocumentDepsResolver::resolve(ws_conn.clone(), user_session.clone(), server_config);
let dependencies = DocumentDepsResolver::resolve(
local_server.clone(),
ws_conn.clone(),
user_session.clone(),
server_config,
);
Arc::new(DocumentContext::new(
dependencies.user,
dependencies.ws_receivers,

View File

@ -17,7 +17,6 @@ lib-dispatch = { path = "../lib-dispatch" }
flowy-error = { path = "../flowy-error", features = ["db", "backend"] }
lib-sqlite = { path = "../lib-sqlite" }
tracing = { version = "0.1", features = ["log"] }
bytes = "1.0"
serde = { version = "1.0", features = ["derive"] }