add virtual net

This commit is contained in:
appflowy 2021-12-26 19:10:37 +08:00
parent 0c7c3621d7
commit bbc9190bc0
36 changed files with 619 additions and 581 deletions

16
backend/Cargo.lock generated
View File

@ -1401,6 +1401,7 @@ dependencies = [
"flowy-document",
"flowy-net",
"flowy-user",
"flowy-virtual-net",
"futures-core",
"lib-dispatch",
"lib-infra",
@ -1491,6 +1492,21 @@ dependencies = [
"validator",
]
[[package]]
name = "flowy-virtual-net"
version = "0.1.0"
dependencies = [
"bytes",
"dashmap",
"flowy-collaboration",
"flowy-net",
"lib-infra",
"lib-ws",
"parking_lot",
"tokio",
"tracing",
]
[[package]]
name = "fnv"
version = "1.0.7"

View File

@ -1,7 +1,7 @@
use actix::Message;
use bytes::Bytes;
use flowy_collaboration::entities::ws::{DocumentClientWSData, DocumentServerWSData};
use lib_ws::{WSModule, WebScoketRawMessage};
use lib_ws::{WSModule, WebSocketRawMessage};
use std::convert::TryInto;
#[derive(Debug, Message, Clone)]
@ -17,7 +17,7 @@ impl std::ops::Deref for WebSocketMessage {
impl std::convert::From<DocumentClientWSData> for WebSocketMessage {
fn from(data: DocumentClientWSData) -> Self {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebScoketRawMessage {
let msg = WebSocketRawMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
@ -30,7 +30,7 @@ impl std::convert::From<DocumentClientWSData> for WebSocketMessage {
impl std::convert::From<DocumentServerWSData> for WebSocketMessage {
fn from(data: DocumentServerWSData) -> Self {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebScoketRawMessage {
let msg = WebSocketRawMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};

View File

@ -11,7 +11,7 @@ use actix::*;
use actix_web::web::Data;
use actix_web_actors::{ws, ws::Message::Text};
use bytes::Bytes;
use lib_ws::{WSModule, WebScoketRawMessage};
use lib_ws::{WSModule, WebSocketRawMessage};
use std::{collections::HashMap, convert::TryFrom, sync::Arc, time::Instant};
pub trait WebSocketReceiver: Send + Sync {
@ -85,7 +85,7 @@ impl WSClient {
fn handle_binary_message(&self, bytes: Bytes, socket: Socket) {
// TODO: ok to unwrap?
let message: WebScoketRawMessage = WebScoketRawMessage::try_from(bytes).unwrap();
let message: WebSocketRawMessage = WebSocketRawMessage::try_from(bytes).unwrap();
match self.ws_receivers.get(&message.module) {
None => {
log::error!("Can't find the receiver for {:?}", message.module);

View File

@ -13,15 +13,15 @@ import 'msg.pbenum.dart';
export 'msg.pbenum.dart';
class WebScoketRawMessage extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WebScoketRawMessage', createEmptyInstance: create)
class WebSocketRawMessage extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WebSocketRawMessage', createEmptyInstance: create)
..e<WSModule>(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'module', $pb.PbFieldType.OE, defaultOrMaker: WSModule.Doc, valueOf: WSModule.valueOf, enumValues: WSModule.values)
..a<$core.List<$core.int>>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY)
..hasRequiredFields = false
;
WebScoketRawMessage._() : super();
factory WebScoketRawMessage({
WebSocketRawMessage._() : super();
factory WebSocketRawMessage({
WSModule? module,
$core.List<$core.int>? data,
}) {
@ -34,26 +34,26 @@ class WebScoketRawMessage extends $pb.GeneratedMessage {
}
return _result;
}
factory WebScoketRawMessage.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
factory WebScoketRawMessage.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
factory WebSocketRawMessage.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
factory WebSocketRawMessage.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.deepCopy] instead. '
'Will be removed in next major version')
WebScoketRawMessage clone() => WebScoketRawMessage()..mergeFromMessage(this);
WebSocketRawMessage clone() => WebSocketRawMessage()..mergeFromMessage(this);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
'Will be removed in next major version')
WebScoketRawMessage copyWith(void Function(WebScoketRawMessage) updates) => super.copyWith((message) => updates(message as WebScoketRawMessage)) as WebScoketRawMessage; // ignore: deprecated_member_use
WebSocketRawMessage copyWith(void Function(WebSocketRawMessage) updates) => super.copyWith((message) => updates(message as WebSocketRawMessage)) as WebSocketRawMessage; // ignore: deprecated_member_use
$pb.BuilderInfo get info_ => _i;
@$core.pragma('dart2js:noInline')
static WebScoketRawMessage create() => WebScoketRawMessage._();
WebScoketRawMessage createEmptyInstance() => create();
static $pb.PbList<WebScoketRawMessage> createRepeated() => $pb.PbList<WebScoketRawMessage>();
static WebSocketRawMessage create() => WebSocketRawMessage._();
WebSocketRawMessage createEmptyInstance() => create();
static $pb.PbList<WebSocketRawMessage> createRepeated() => $pb.PbList<WebSocketRawMessage>();
@$core.pragma('dart2js:noInline')
static WebScoketRawMessage getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<WebScoketRawMessage>(create);
static WebScoketRawMessage? _defaultInstance;
static WebSocketRawMessage getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<WebSocketRawMessage>(create);
static WebSocketRawMessage? _defaultInstance;
@$pb.TagNumber(1)
WSModule get module => $_getN(0);

View File

@ -18,14 +18,14 @@ const WSModule$json = const {
/// Descriptor for `WSModule`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List wSModuleDescriptor = $convert.base64Decode('CghXU01vZHVsZRIHCgNEb2MQAA==');
@$core.Deprecated('Use webScoketRawMessageDescriptor instead')
const WebScoketRawMessage$json = const {
'1': 'WebScoketRawMessage',
@$core.Deprecated('Use webSocketRawMessageDescriptor instead')
const WebSocketRawMessage$json = const {
'1': 'WebSocketRawMessage',
'2': const [
const {'1': 'module', '3': 1, '4': 1, '5': 14, '6': '.WSModule', '10': 'module'},
const {'1': 'data', '3': 2, '4': 1, '5': 12, '10': 'data'},
],
};
/// Descriptor for `WebScoketRawMessage`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List webScoketRawMessageDescriptor = $convert.base64Decode('ChNXZWJTY29rZXRSYXdNZXNzYWdlEiEKBm1vZHVsZRgBIAEoDjIJLldTTW9kdWxlUgZtb2R1bGUSEgoEZGF0YRgCIAEoDFIEZGF0YQ==');
/// Descriptor for `WebSocketRawMessage`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List webSocketRawMessageDescriptor = $convert.base64Decode('ChNXZWJTb2NrZXRSYXdNZXNzYWdlEiEKBm1vZHVsZRgBIAEoDjIJLldTTW9kdWxlUgZtb2R1bGUSEgoEZGF0YRgCIAEoDFIEZGF0YQ==');

View File

@ -4,6 +4,7 @@ members = [
"lib-log",
"lib-sqlite",
"flowy-net",
"flowy-virtual-net",
"flowy-sdk",
"dart-ffi",
"flowy-user",

View File

@ -48,7 +48,7 @@ pin-project = "1.0.0"
[dev-dependencies]
flowy-test = { path = "../flowy-test" }
flowy-document = { path = "../flowy-document", features = ["flowy_unit_test"]}
flowy-net = { path = "../flowy-net", features = ["flowy_unit_test"] }
flowy-net = { path = "../flowy-net" }
color-eyre = { version = "0.5", default-features = false }
criterion = "0.3"
rand = "0.7.3"

View File

@ -16,7 +16,7 @@ use bytes::Bytes;
use flowy_collaboration::{
entities::{
revision::{RepeatedRevision, RevType, Revision, RevisionRange},
ws::{DocumentClientWSData, DocumentClientWSDataType, DocumentServerWSDataBuilder, NewDocumentUser},
ws::{DocumentClientWSData, NewDocumentUser},
},
errors::CollaborateResult,
};
@ -25,11 +25,7 @@ use lib_infra::future::FutureResult;
use flowy_collaboration::entities::ws::DocumentServerWSDataType;
use lib_ws::WSConnectState;
use std::{
collections::VecDeque,
convert::{TryFrom, TryInto},
sync::Arc,
};
use std::{collections::VecDeque, convert::TryFrom, sync::Arc};
use tokio::sync::{broadcast, mpsc::UnboundedSender, oneshot, RwLock};
pub(crate) trait DocumentWebSocketManager: Send + Sync {
@ -44,36 +40,32 @@ pub(crate) async fn make_document_ws_manager(
rev_manager: Arc<RevisionManager>,
ws: Arc<dyn DocumentWebSocket>,
) -> Arc<dyn DocumentWebSocketManager> {
if cfg!(feature = "http_server") {
let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone()));
let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
doc_id: doc_id.clone(),
user_id: user_id.clone(),
editor_edit_queue: editor_edit_queue.clone(),
rev_manager: rev_manager.clone(),
shared_sink: shared_sink.clone(),
});
let ws_stream_provider = DocumentWSSinkDataProviderAdapter(shared_sink.clone());
let ws_manager = Arc::new(HttpWebSocketManager::new(
&doc_id,
ws.clone(),
Arc::new(ws_stream_provider),
ws_stream_consumer,
));
notify_user_has_connected(&user_id, &doc_id, rev_manager.clone(), shared_sink).await;
listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager.clone());
let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone()));
let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
doc_id: doc_id.clone(),
user_id: user_id.clone(),
editor_edit_queue: editor_edit_queue.clone(),
rev_manager: rev_manager.clone(),
shared_sink: shared_sink.clone(),
});
let ws_stream_provider = DocumentWSSinkDataProviderAdapter(shared_sink.clone());
let ws_manager = Arc::new(HttpWebSocketManager::new(
&doc_id,
ws.clone(),
Arc::new(ws_stream_provider),
ws_stream_consumer,
));
notify_user_has_connected(&user_id, &doc_id, rev_manager.clone(), shared_sink).await;
listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager.clone());
Arc::new(ws_manager)
} else {
Arc::new(Arc::new(LocalWebSocketManager {}))
}
Arc::new(ws_manager)
}
async fn notify_user_has_connected(
user_id: &str,
doc_id: &str,
rev_manager: Arc<RevisionManager>,
shared_sink: Arc<SharedWSSinkDataProvider>,
_user_id: &str,
_doc_id: &str,
_rev_manager: Arc<RevisionManager>,
_shared_sink: Arc<SharedWSSinkDataProvider>,
) {
// let need_notify = match shared_sink.front().await {
// None => true,

View File

@ -10,8 +10,8 @@ lib-dispatch = { path = "../lib-dispatch" }
flowy-error = { path = "../flowy-error" }
flowy-derive = { path = "../../../shared-lib/flowy-derive" }
lib-infra = { path = "../../../shared-lib/lib-infra" }
lib-ws = { path = "../../../shared-lib/lib-ws" }
protobuf = {version = "2.18.0"}
lib-ws = { path = "../../../shared-lib/lib-ws" }
bytes = { version = "1.0" }
anyhow = "1.0"
tokio = {version = "1", features = ["sync"]}
@ -20,10 +20,5 @@ strum = "0.21"
strum_macros = "0.21"
tracing = { version = "0.1", features = ["log"] }
flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration", optional = true}
lazy_static = {version = "1.4.0", optional = true}
dashmap = {version = "4.0", optional = true}
[features]
flowy_unit_test = ["flowy-collaboration", "lazy_static", "dashmap"]
http_server = []

View File

@ -1,11 +1,14 @@
use crate::{entities::NetworkState, services::ws::WsManager};
use crate::{entities::NetworkState, services::ws::FlowyWSConnect};
use flowy_error::FlowyError;
use lib_dispatch::prelude::{Data, Unit};
use std::sync::Arc;
#[tracing::instrument(skip(data, ws_manager))]
pub async fn update_network_ty(data: Data<NetworkState>, ws_manager: Unit<Arc<WsManager>>) -> Result<(), FlowyError> {
pub async fn update_network_ty(
data: Data<NetworkState>,
ws_manager: Unit<Arc<FlowyWSConnect>>,
) -> Result<(), FlowyError> {
let network_state = data.into_inner();
ws_manager.update_network_type(&network_state.ty);
Ok(())

View File

@ -1,8 +1,8 @@
use crate::{event::NetworkEvent, handlers::*, services::ws::WsManager};
use crate::{event::NetworkEvent, handlers::*, services::ws::FlowyWSConnect};
use lib_dispatch::prelude::*;
use std::sync::Arc;
pub fn create(ws_manager: Arc<WsManager>) -> Module {
pub fn create(ws_manager: Arc<FlowyWSConnect>) -> Module {
Module::new()
.name("Flowy-Network")
.data(ws_manager)

View File

@ -1,3 +0,0 @@
mod ws_mock;
pub use ws_mock::*;

View File

@ -1,236 +0,0 @@
use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WSConnectState, WSMessage, WSMessageReceiver};
use bytes::Bytes;
use dashmap::DashMap;
use flowy_collaboration::{
core::sync::{DocumentPersistence, RevisionUser, ServerDocumentManager, SyncResponse},
entities::{
doc::DocumentInfo,
revision::{RepeatedRevision, Revision},
ws::{DocumentWSData, DocumentWSDataBuilder, DocumentWSDataType, NewDocumentUser},
},
errors::CollaborateError,
};
use lazy_static::lazy_static;
use lib_infra::future::{FutureResult, FutureResultSend};
use lib_ws::WSModule;
use parking_lot::RwLock;
use std::{
convert::{TryFrom, TryInto},
fmt::{Debug, Formatter},
sync::Arc,
};
use tokio::sync::{broadcast, broadcast::Receiver, mpsc};
pub struct MockWebSocket {
handlers: DashMap<WSModule, Arc<dyn WSMessageReceiver>>,
state_sender: broadcast::Sender<WSConnectState>,
ws_sender: broadcast::Sender<WSMessage>,
is_stop: RwLock<bool>,
}
impl std::default::Default for MockWebSocket {
fn default() -> Self {
let (state_sender, _) = broadcast::channel(16);
let (ws_sender, _) = broadcast::channel(16);
MockWebSocket {
handlers: DashMap::new(),
state_sender,
ws_sender,
is_stop: RwLock::new(false),
}
}
}
impl MockWebSocket {
pub fn new() -> MockWebSocket { MockWebSocket::default() }
}
impl FlowyWebSocket for Arc<MockWebSocket> {
fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> {
*self.is_stop.write() = false;
let mut ws_receiver = self.ws_sender.subscribe();
let cloned_ws = self.clone();
tokio::spawn(async move {
while let Ok(message) = ws_receiver.recv().await {
if *cloned_ws.is_stop.read() {
// do nothing
} else {
let ws_data = DocumentWSData::try_from(Bytes::from(message.data.clone())).unwrap();
let mut rx = DOC_SERVER.handle_ws_data(ws_data).await;
let new_ws_message = rx.recv().await.unwrap();
match cloned_ws.handlers.get(&new_ws_message.module) {
None => tracing::error!("Can't find any handler for message: {:?}", new_ws_message),
Some(handler) => handler.receive_message(new_ws_message.clone()),
}
}
}
});
FutureResult::new(async { Ok(()) })
}
fn stop_connect(&self) -> FutureResult<(), FlowyError> {
*self.is_stop.write() = true;
FutureResult::new(async { Ok(()) })
}
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.state_sender.subscribe() }
fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn add_message_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
let source = handler.source();
if self.handlers.contains_key(&source) {
tracing::error!("WsSource's {:?} is already registered", source);
}
self.handlers.insert(source, handler);
Ok(())
}
fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) }
}
lazy_static! {
static ref DOC_SERVER: Arc<MockDocServer> = Arc::new(MockDocServer::default());
}
struct MockDocServer {
pub manager: Arc<ServerDocumentManager>,
}
impl std::default::Default for MockDocServer {
fn default() -> Self {
let persistence = Arc::new(MockDocServerPersistence::default());
let manager = Arc::new(ServerDocumentManager::new(persistence));
MockDocServer { manager }
}
}
impl MockDocServer {
async fn handle_ws_data(&self, ws_data: DocumentWSData) -> mpsc::Receiver<WSMessage> {
let bytes = Bytes::from(ws_data.data);
match ws_data.ty {
DocumentWSDataType::Ack => {
unimplemented!()
},
DocumentWSDataType::PushRev => {
let revisions = RepeatedRevision::try_from(bytes).unwrap().into_inner();
let (tx, rx) = mpsc::channel(1);
let user = MockDocUser {
user_id: revision.user_id.clone(),
tx,
};
self.manager.apply_revisions(user, revisions).await.unwrap();
rx
},
DocumentWSDataType::PullRev => {
unimplemented!()
},
DocumentWSDataType::UserConnect => {
let new_user = NewDocumentUser::try_from(bytes).unwrap();
let (tx, rx) = mpsc::channel(1);
let data = DocumentWSDataBuilder::build_ack_message(&new_user.doc_id, &ws_data.id);
let user = Arc::new(MockDocUser {
user_id: new_user.user_id,
tx,
}) as Arc<dyn RevisionUser>;
user.receive(SyncResponse::Ack(data));
rx
},
}
}
}
struct MockDocServerPersistence {
inner: Arc<DashMap<String, DocumentInfo>>,
}
impl Debug for MockDocServerPersistence {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("MockDocServerPersistence") }
}
impl std::default::Default for MockDocServerPersistence {
fn default() -> Self {
MockDocServerPersistence {
inner: Arc::new(DashMap::new()),
}
}
}
impl DocumentPersistence for MockDocServerPersistence {
fn read_doc(&self, doc_id: &str) -> FutureResultSend<DocumentInfo, CollaborateError> {
let inner = self.inner.clone();
let doc_id = doc_id.to_owned();
FutureResultSend::new(async move {
match inner.get(&doc_id) {
None => {
//
Err(CollaborateError::record_not_found())
},
Some(val) => {
//
Ok(val.value().clone())
},
}
})
}
fn create_doc(&self, revision: Revision) -> FutureResultSend<DocumentInfo, CollaborateError> {
FutureResultSend::new(async move {
let document_info: DocumentInfo = revision.try_into().unwrap();
Ok(document_info)
})
}
fn get_revisions(&self, _doc_id: &str, _rev_ids: Vec<i64>) -> FutureResultSend<Vec<Revision>, CollaborateError> {
unimplemented!()
}
}
#[derive(Debug)]
struct MockDocUser {
user_id: String,
tx: mpsc::Sender<WSMessage>,
}
impl RevisionUser for MockDocUser {
fn user_id(&self) -> String { self.user_id.clone() }
fn receive(&self, resp: SyncResponse) {
let sender = self.tx.clone();
tokio::spawn(async move {
match resp {
SyncResponse::Pull(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WSMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).await.unwrap();
},
SyncResponse::Push(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WSMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).await.unwrap();
},
SyncResponse::Ack(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WSMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).await.unwrap();
},
SyncResponse::NewRevision(_) => {
// unimplemented!()
},
}
});
}
}

View File

@ -1,9 +1,12 @@
use lib_infra::future::FutureResult;
use std::sync::Arc;
use tokio::sync::broadcast;
use crate::entities::NetworkType;
use flowy_error::internal_error;
pub use flowy_error::FlowyError;
pub use lib_ws::{WSConnectState, WSMessageReceiver, WebScoketRawMessage};
use lib_infra::future::FutureResult;
pub use lib_ws::{WSConnectState, WSMessageReceiver, WebSocketRawMessage};
use lib_ws::{WSController, WSSender};
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::sync::{broadcast, broadcast::Receiver};
pub trait FlowyWebSocket: Send + Sync {
fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError>;
@ -11,9 +14,155 @@ pub trait FlowyWebSocket: Send + Sync {
fn subscribe_connect_state(&self) -> broadcast::Receiver<WSConnectState>;
fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>;
fn add_message_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError>;
fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError>;
fn ws_sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError>;
}
pub trait FlowyWsSender: Send + Sync {
fn send(&self, msg: WebScoketRawMessage) -> Result<(), FlowyError>;
pub trait FlowyWSSender: Send + Sync {
fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError>;
}
pub struct FlowyWSConnect {
inner: Arc<dyn FlowyWebSocket>,
connect_type: RwLock<NetworkType>,
status_notifier: broadcast::Sender<NetworkType>,
addr: String,
}
impl FlowyWSConnect {
pub fn new(addr: String, ws: Arc<dyn FlowyWebSocket>) -> Self {
let (status_notifier, _) = broadcast::channel(10);
FlowyWSConnect {
inner: ws,
connect_type: RwLock::new(NetworkType::default()),
status_notifier,
addr,
}
}
pub async fn start(&self, token: String) -> Result<(), FlowyError> {
let addr = format!("{}/{}", self.addr, token);
self.inner.stop_connect().await?;
let _ = self.inner.start_connect(addr).await?;
Ok(())
}
pub async fn stop(&self) { let _ = self.inner.stop_connect().await; }
pub fn update_network_type(&self, new_type: &NetworkType) {
tracing::debug!("Network new state: {:?}", new_type);
let old_type = self.connect_type.read().clone();
let _ = self.status_notifier.send(new_type.clone());
if &old_type != new_type {
tracing::debug!("Connect type switch from {:?} to {:?}", old_type, new_type);
match (old_type.is_connect(), new_type.is_connect()) {
(false, true) => {
let ws_controller = self.inner.clone();
tokio::spawn(async move { retry_connect(ws_controller, 100).await });
},
(true, false) => {
//
},
_ => {},
}
*self.connect_type.write() = new_type.clone();
}
}
pub fn subscribe_websocket_state(&self) -> broadcast::Receiver<WSConnectState> {
self.inner.subscribe_connect_state()
}
pub fn subscribe_network_ty(&self) -> broadcast::Receiver<NetworkType> { self.status_notifier.subscribe() }
pub fn add_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
let _ = self.inner.add_message_receiver(handler)?;
Ok(())
}
pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> { self.inner.ws_sender() }
}
#[tracing::instrument(level = "debug", skip(manager))]
pub fn listen_on_websocket(manager: Arc<FlowyWSConnect>) {
if cfg!(feature = "http_server") {
let ws = manager.inner.clone();
let mut notify = manager.inner.subscribe_connect_state();
let _ = tokio::spawn(async move {
loop {
match notify.recv().await {
Ok(state) => {
tracing::info!("Websocket state changed: {}", state);
match state {
WSConnectState::Init => {},
WSConnectState::Connected => {},
WSConnectState::Connecting => {},
WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await,
}
},
Err(e) => {
tracing::error!("Websocket state notify error: {:?}", e);
break;
},
}
}
});
} else {
// do nothing
};
}
async fn retry_connect(ws: Arc<dyn FlowyWebSocket>, count: usize) {
match ws.reconnect(count).await {
Ok(_) => {},
Err(e) => {
tracing::error!("websocket connect failed: {:?}", e);
},
}
}
impl FlowyWebSocket for Arc<WSController> {
fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone();
FutureResult::new(async move {
let _ = cloned_ws.start(addr).await.map_err(internal_error)?;
Ok(())
})
}
fn stop_connect(&self) -> FutureResult<(), FlowyError> {
let controller = self.clone();
FutureResult::new(async move {
controller.stop().await;
Ok(())
})
}
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.subscribe_state() }
fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone();
FutureResult::new(async move {
let _ = cloned_ws.retry(count).await.map_err(internal_error)?;
Ok(())
})
}
fn add_message_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
let _ = self.add_receiver(handler).map_err(internal_error)?;
Ok(())
}
fn ws_sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> {
let sender = self.sender().map_err(internal_error)?;
Ok(sender)
}
}
impl FlowyWSSender for WSSender {
fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> {
let _ = self.send_msg(msg).map_err(internal_error)?;
Ok(())
}
}

View File

@ -1,161 +0,0 @@
use crate::{
entities::NetworkType,
services::ws::{local_web_socket, FlowyWebSocket, FlowyWsSender},
};
use flowy_error::{internal_error, FlowyError};
use lib_infra::future::FutureResult;
use lib_ws::{WSConnectState, WSController, WSMessageReceiver, WSSender, WebScoketRawMessage};
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::sync::{broadcast, broadcast::Receiver};
pub struct WsManager {
inner: Arc<dyn FlowyWebSocket>,
connect_type: RwLock<NetworkType>,
status_notifier: broadcast::Sender<NetworkType>,
addr: String,
}
impl WsManager {
pub fn new(addr: String) -> Self {
let ws: Arc<dyn FlowyWebSocket> = if cfg!(feature = "http_server") {
Arc::new(Arc::new(WSController::new()))
} else {
local_web_socket()
};
let (status_notifier, _) = broadcast::channel(10);
WsManager {
inner: ws,
connect_type: RwLock::new(NetworkType::default()),
status_notifier,
addr,
}
}
pub async fn start(&self, token: String) -> Result<(), FlowyError> {
let addr = format!("{}/{}", self.addr, token);
self.inner.stop_connect().await?;
let _ = self.inner.start_connect(addr).await?;
Ok(())
}
pub async fn stop(&self) { let _ = self.inner.stop_connect().await; }
pub fn update_network_type(&self, new_type: &NetworkType) {
tracing::debug!("Network new state: {:?}", new_type);
let old_type = self.connect_type.read().clone();
let _ = self.status_notifier.send(new_type.clone());
if &old_type != new_type {
tracing::debug!("Connect type switch from {:?} to {:?}", old_type, new_type);
match (old_type.is_connect(), new_type.is_connect()) {
(false, true) => {
let ws_controller = self.inner.clone();
tokio::spawn(async move { retry_connect(ws_controller, 100).await });
},
(true, false) => {
//
},
_ => {},
}
*self.connect_type.write() = new_type.clone();
}
}
pub fn subscribe_websocket_state(&self) -> broadcast::Receiver<WSConnectState> {
self.inner.subscribe_connect_state()
}
pub fn subscribe_network_ty(&self) -> broadcast::Receiver<NetworkType> { self.status_notifier.subscribe() }
pub fn add_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
let _ = self.inner.add_message_receiver(handler)?;
Ok(())
}
pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError> { self.inner.ws_sender() }
}
#[tracing::instrument(level = "debug", skip(manager))]
pub fn listen_on_websocket(manager: Arc<WsManager>) {
if cfg!(feature = "http_server") {
let ws = manager.inner.clone();
let mut notify = manager.inner.subscribe_connect_state();
let _ = tokio::spawn(async move {
loop {
match notify.recv().await {
Ok(state) => {
tracing::info!("Websocket state changed: {}", state);
match state {
WSConnectState::Init => {},
WSConnectState::Connected => {},
WSConnectState::Connecting => {},
WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await,
}
},
Err(e) => {
tracing::error!("Websocket state notify error: {:?}", e);
break;
},
}
}
});
} else {
// do nothing
};
}
async fn retry_connect(ws: Arc<dyn FlowyWebSocket>, count: usize) {
match ws.reconnect(count).await {
Ok(_) => {},
Err(e) => {
tracing::error!("websocket connect failed: {:?}", e);
},
}
}
impl FlowyWebSocket for Arc<WSController> {
fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone();
FutureResult::new(async move {
let _ = cloned_ws.start(addr).await.map_err(internal_error)?;
Ok(())
})
}
fn stop_connect(&self) -> FutureResult<(), FlowyError> {
let controller = self.clone();
FutureResult::new(async move {
controller.stop().await;
Ok(())
})
}
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.subscribe_state() }
fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone();
FutureResult::new(async move {
let _ = cloned_ws.retry(count).await.map_err(internal_error)?;
Ok(())
})
}
fn add_message_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
let _ = self.add_receiver(handler).map_err(internal_error)?;
Ok(())
}
fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError> {
let sender = self.sender().map_err(internal_error)?;
Ok(sender)
}
}
impl FlowyWsSender for WSSender {
fn send(&self, msg: WebScoketRawMessage) -> Result<(), FlowyError> {
let _ = self.send_msg(msg).map_err(internal_error)?;
Ok(())
}
}

View File

@ -1,18 +1,3 @@
pub use conn::*;
pub use manager::*;
use std::sync::Arc;
mod conn;
mod manager;
mod ws_local;
// #[cfg(not(feature = "flowy_unit_test"))]
// pub(crate) fn local_web_socket() -> Arc<dyn FlowyWebSocket> {
// Arc::new(Arc::new(ws_local::LocalWebSocket::default())) }
//
// #[cfg(feature = "flowy_unit_test")]
// pub(crate) fn local_web_socket() -> Arc<dyn FlowyWebSocket> {
// Arc::new(Arc::new(crate::services::mock::MockWebSocket::default()))
// }
pub(crate) fn local_web_socket() -> Arc<dyn FlowyWebSocket> { Arc::new(Arc::new(ws_local::LocalWebSocket::default())) }

View File

@ -10,6 +10,7 @@ lib-dispatch = { path = "../lib-dispatch" }
lib-log = { path = "../lib-log" }
flowy-user = { path = "../flowy-user" }
flowy-net = { path = "../flowy-net" }
flowy-virtual-net = { path = "../flowy-virtual-net" }
flowy-core = { path = "../flowy-core", default-features = false }
flowy-database = { path = "../flowy-database" }
flowy-document = { path = "../flowy-document" }

View File

@ -6,15 +6,15 @@ use flowy_document::{
errors::{internal_error, FlowyError},
services::doc::{DocumentWSReceivers, DocumentWebSocket, WSStateReceiver},
};
use flowy_net::services::ws::WsManager;
use flowy_net::services::ws::FlowyWSConnect;
use flowy_user::services::user::UserSession;
use lib_ws::{WSMessageReceiver, WSModule, WebScoketRawMessage};
use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage};
use std::{convert::TryInto, path::Path, sync::Arc};
pub struct DocumentDepsResolver();
impl DocumentDepsResolver {
pub fn resolve(
ws_manager: Arc<WsManager>,
ws_manager: Arc<FlowyWSConnect>,
user_session: Arc<UserSession>,
) -> (
Arc<dyn DocumentUser>,
@ -61,13 +61,13 @@ impl DocumentUser for DocumentUserImpl {
}
struct DocumentWebSocketAdapter {
ws_manager: Arc<WsManager>,
ws_manager: Arc<FlowyWSConnect>,
}
impl DocumentWebSocket for DocumentWebSocketAdapter {
fn send(&self, data: DocumentClientWSData) -> Result<(), FlowyError> {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebScoketRawMessage {
let msg = WebSocketRawMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
@ -84,5 +84,5 @@ struct WSMessageReceiverAdaptor(Arc<DocumentWSReceivers>);
impl WSMessageReceiver for WSMessageReceiverAdaptor {
fn source(&self) -> WSModule { WSModule::Doc }
fn receive_message(&self, msg: WebScoketRawMessage) { self.0.did_receive_data(Bytes::from(msg.data)); }
fn receive_message(&self, msg: WebSocketRawMessage) { self.0.did_receive_data(Bytes::from(msg.data)); }
}

View File

@ -1,5 +1,4 @@
mod deps_resolve;
// mod flowy_server;
pub mod module;
use crate::deps_resolve::{DocumentDepsResolver, WorkspaceDepsResolver};
use backend_service::configuration::ClientServerConfiguration;
@ -7,13 +6,15 @@ use flowy_core::{errors::FlowyError, module::init_core, prelude::CoreContext};
use flowy_document::context::DocumentContext;
use flowy_net::{
entities::NetworkType,
services::ws::{listen_on_websocket, WsManager},
services::ws::{listen_on_websocket, FlowyWSConnect, FlowyWebSocket},
};
use flowy_user::{
prelude::UserStatus,
services::user::{UserSession, UserSessionConfig},
};
use flowy_virtual_net::local_web_socket;
use lib_dispatch::prelude::*;
use lib_ws::WSController;
use module::mk_modules;
pub use module::*;
use std::sync::{
@ -72,7 +73,7 @@ pub struct FlowySDK {
pub flowy_document: Arc<DocumentContext>,
pub core: Arc<CoreContext>,
pub dispatcher: Arc<EventDispatcher>,
pub ws_manager: Arc<WsManager>,
pub ws_manager: Arc<FlowyWSConnect>,
}
impl FlowySDK {
@ -81,7 +82,13 @@ impl FlowySDK {
init_kv(&config.root);
tracing::debug!("🔥 {:?}", config);
let ws_manager = Arc::new(WsManager::new(config.server_config.ws_addr()));
let ws: Arc<dyn FlowyWebSocket> = if cfg!(feature = "http_server") {
Arc::new(Arc::new(WSController::new()))
} else {
local_web_socket()
};
let ws_manager = Arc::new(FlowyWSConnect::new(config.server_config.ws_addr(), ws));
let user_session = mk_user_session(&config);
let flowy_document = mk_document(ws_manager.clone(), user_session.clone(), &config.server_config);
let core_ctx = mk_core_context(user_session.clone(), flowy_document.clone(), &config.server_config);
@ -106,7 +113,7 @@ impl FlowySDK {
fn _init(
dispatch: &EventDispatcher,
ws_manager: Arc<WsManager>,
ws_manager: Arc<FlowyWSConnect>,
user_session: Arc<UserSession>,
core: Arc<CoreContext>,
) {
@ -126,7 +133,7 @@ fn _init(
}
async fn _listen_user_status(
ws_manager: Arc<WsManager>,
ws_manager: Arc<FlowyWSConnect>,
mut subscribe: broadcast::Receiver<UserStatus>,
core: Arc<CoreContext>,
) {
@ -201,7 +208,7 @@ fn mk_core_context(
}
pub fn mk_document(
ws_manager: Arc<WsManager>,
ws_manager: Arc<FlowyWSConnect>,
user_session: Arc<UserSession>,
server_config: &ClientServerConfiguration,
) -> Arc<DocumentContext> {

View File

@ -1,11 +1,15 @@
use flowy_core::prelude::CoreContext;
use flowy_net::services::ws::WsManager;
use flowy_net::services::ws::FlowyWSConnect;
use flowy_user::services::user::UserSession;
use lib_dispatch::prelude::Module;
use std::sync::Arc;
pub fn mk_modules(ws_manager: Arc<WsManager>, core: Arc<CoreContext>, user_session: Arc<UserSession>) -> Vec<Module> {
pub fn mk_modules(
ws_manager: Arc<FlowyWSConnect>,
core: Arc<CoreContext>,
user_session: Arc<UserSession>,
) -> Vec<Module> {
let user_module = mk_user_module(user_session);
let core_module = mk_core_module(core);
let network_module = mk_network_module(ws_manager);
@ -16,4 +20,4 @@ fn mk_user_module(user_session: Arc<UserSession>) -> Module { flowy_user::module
fn mk_core_module(core: Arc<CoreContext>) -> Module { flowy_core::module::create(core) }
fn mk_network_module(ws_manager: Arc<WsManager>) -> Module { flowy_net::module::create(ws_manager) }
fn mk_network_module(ws_manager: Arc<FlowyWSConnect>) -> Module { flowy_net::module::create(ws_manager) }

View File

@ -36,4 +36,4 @@ fake = "~2.3.0"
claim = "0.4.0"
futures = "0.3.15"
serial_test = "0.5.1"
flowy-net = { path = "../flowy-net", features = ["flowy_unit_test"] }
flowy-virtual-net = { path = "../flowy-virtual-net", features = ["flowy_unit_test"] }

View File

@ -77,7 +77,7 @@ impl EditorTest {
EditorScript::AssertNextRevId(rev_id) => {
let next_revision = rev_manager.next_sync_revision().await.unwrap();
if rev_id.is_none() {
assert_eq!(next_revision.is_none(), true);
assert_eq!(next_revision.is_none(), true, "Next revision should be None");
return;
}
let next_revision = next_revision.unwrap();

View File

@ -1,31 +1,31 @@
// use flowy_test::doc_script::{EditorScript::*, *};
// use lib_ot::revision::RevState;
//
// #[tokio::test]
// async fn doc_sync_test() {
// let scripts = vec![
// InsertText("1", 0),
// InsertText("2", 1),
// InsertText("3", 2),
// AssertJson(r#"[{"insert":"123\n"}]"#),
// AssertNextRevId(None),
// ];
// EditorTest::new().await.run_scripts(scripts).await;
// }
//
// #[tokio::test]
// async fn doc_sync_retry_ws_conn() {
// let scripts = vec![
// InsertText("1", 0),
// StopWs,
// InsertText("2", 1),
// InsertText("3", 2),
// StartWs,
// WaitSyncFinished,
// AssertRevisionState(2, RevState::Acked),
// AssertRevisionState(3, RevState::Acked),
// AssertNextRevId(None),
// AssertJson(r#"[{"insert":"123\n"}]"#),
// ];
// EditorTest::new().await.run_scripts(scripts).await;
// }
use flowy_collaboration::entities::revision::RevState;
use flowy_test::doc_script::{EditorScript::*, *};
#[tokio::test]
async fn doc_sync_test() {
let scripts = vec![
InsertText("1", 0),
InsertText("2", 1),
InsertText("3", 2),
AssertJson(r#"[{"insert":"123\n"}]"#),
AssertNextRevId(None),
];
EditorTest::new().await.run_scripts(scripts).await;
}
#[tokio::test]
async fn doc_sync_retry_ws_conn() {
let scripts = vec![
InsertText("1", 0),
StopWs,
InsertText("2", 1),
InsertText("3", 2),
StartWs,
WaitSyncFinished,
AssertRevisionState(2, RevState::Ack),
AssertRevisionState(3, RevState::Ack),
AssertNextRevId(None),
AssertJson(r#"[{"insert":"123\n"}]"#),
];
EditorTest::new().await.run_scripts(scripts).await;
}

View File

@ -0,0 +1,23 @@
[package]
name = "flowy-virtual-net"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
lib-ws = { path = "../../../shared-lib/lib-ws" }
lib-infra = { path = "../../../shared-lib/lib-infra" }
flowy-net = { path = "../flowy-net" }
bytes = { version = "1.0" }
parking_lot = "0.11"
tokio = {version = "1", features = ["sync"]}
tracing = { version = "0.1", features = ["log"] }
# flowy-collaboration and dashmap would be optional
flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration"}
dashmap = {version = "4.0"}
[features]
flowy_unit_test = []
http_server = []

View File

@ -0,0 +1,12 @@
use flowy_net::services::ws::FlowyWebSocket;
use std::sync::Arc;
mod ws;
#[cfg(not(feature = "flowy_unit_test"))]
pub fn local_web_socket() -> Arc<dyn FlowyWebSocket> { Arc::new(ws::LocalWebSocket::default()) }
#[cfg(feature = "flowy_unit_test")]
mod mock;
#[cfg(feature = "flowy_unit_test")]
pub fn local_web_socket() -> Arc<dyn FlowyWebSocket> { Arc::new(crate::mock::MockWebSocket::default()) }

View File

@ -0,0 +1,4 @@
mod server;
mod ws_local;
pub use ws_local::*;

View File

@ -0,0 +1,137 @@
use bytes::Bytes;
use dashmap::DashMap;
use flowy_collaboration::{entities::prelude::*, errors::CollaborateError, sync::*};
use flowy_net::services::ws::*;
use lib_infra::future::FutureResultSend;
use lib_ws::{WSModule, WebSocketRawMessage};
use std::{
convert::{TryFrom, TryInto},
fmt::{Debug, Formatter},
sync::Arc,
};
use tokio::sync::mpsc;
pub struct MockDocServer {
pub manager: Arc<ServerDocumentManager>,
}
impl std::default::Default for MockDocServer {
fn default() -> Self {
let persistence = Arc::new(MockDocServerPersistence::default());
let manager = Arc::new(ServerDocumentManager::new(persistence));
MockDocServer { manager }
}
}
impl MockDocServer {
pub async fn handle_ws_data(&self, ws_data: DocumentClientWSData) -> Option<mpsc::Receiver<WebSocketRawMessage>> {
let bytes = Bytes::from(ws_data.data);
match ws_data.ty {
DocumentClientWSDataType::ClientPushRev => {
let revisions = RepeatedRevision::try_from(bytes).unwrap().into_inner();
if revisions.is_empty() {
return None;
}
let first_revision = revisions.first().unwrap();
let (tx, rx) = mpsc::channel(1);
let user = Arc::new(MockDocUser {
user_id: first_revision.user_id.clone(),
tx,
});
self.manager.apply_revisions(user, revisions).await.unwrap();
Some(rx)
},
}
}
}
struct MockDocServerPersistence {
inner: Arc<DashMap<String, DocumentInfo>>,
}
impl Debug for MockDocServerPersistence {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("MockDocServerPersistence") }
}
impl std::default::Default for MockDocServerPersistence {
fn default() -> Self {
MockDocServerPersistence {
inner: Arc::new(DashMap::new()),
}
}
}
impl DocumentPersistence for MockDocServerPersistence {
fn read_doc(&self, doc_id: &str) -> FutureResultSend<DocumentInfo, CollaborateError> {
let inner = self.inner.clone();
let doc_id = doc_id.to_owned();
FutureResultSend::new(async move {
match inner.get(&doc_id) {
None => {
//
Err(CollaborateError::record_not_found())
},
Some(val) => {
//
Ok(val.value().clone())
},
}
})
}
fn create_doc(&self, revision: Revision) -> FutureResultSend<DocumentInfo, CollaborateError> {
FutureResultSend::new(async move {
let document_info: DocumentInfo = revision.try_into().unwrap();
Ok(document_info)
})
}
fn get_revisions(&self, _doc_id: &str, _rev_ids: Vec<i64>) -> FutureResultSend<Vec<Revision>, CollaborateError> {
unimplemented!()
}
}
#[derive(Debug)]
struct MockDocUser {
user_id: String,
tx: mpsc::Sender<WebSocketRawMessage>,
}
impl RevisionUser for MockDocUser {
fn user_id(&self) -> String { self.user_id.clone() }
fn receive(&self, resp: SyncResponse) {
let sender = self.tx.clone();
tokio::spawn(async move {
match resp {
SyncResponse::Pull(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).await.unwrap();
},
SyncResponse::Push(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).await.unwrap();
},
SyncResponse::Ack(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).await.unwrap();
},
SyncResponse::NewRevision(_) => {
// unimplemented!()
},
}
});
}
}

View File

@ -0,0 +1,95 @@
use crate::mock::server::MockDocServer;
use bytes::Bytes;
use dashmap::DashMap;
use flowy_collaboration::entities::ws::*;
use flowy_net::services::ws::*;
use lib_infra::future::FutureResult;
use lib_ws::{WSModule, WebSocketRawMessage};
use parking_lot::RwLock;
use std::{convert::TryFrom, sync::Arc};
use tokio::sync::{broadcast, broadcast::Receiver};
pub struct MockWebSocket {
receivers: Arc<DashMap<WSModule, Arc<dyn WSMessageReceiver>>>,
state_sender: broadcast::Sender<WSConnectState>,
ws_sender: MockWSSender,
is_stop: Arc<RwLock<bool>>,
server: Arc<MockDocServer>,
}
impl std::default::Default for MockWebSocket {
fn default() -> Self {
let (state_sender, _) = broadcast::channel(16);
let (ws_sender, _) = broadcast::channel(16);
let server = Arc::new(MockDocServer::default());
MockWebSocket {
receivers: Arc::new(DashMap::new()),
state_sender,
ws_sender: MockWSSender(ws_sender),
is_stop: Arc::new(RwLock::new(false)),
server,
}
}
}
impl FlowyWebSocket for MockWebSocket {
fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> {
*self.is_stop.write() = false;
let mut ws_receiver = self.ws_sender.subscribe();
let receivers = self.receivers.clone();
let is_stop = self.is_stop.clone();
let server = self.server.clone();
tokio::spawn(async move {
while let Ok(message) = ws_receiver.recv().await {
if *is_stop.read() {
// do nothing
} else {
let ws_data = DocumentClientWSData::try_from(Bytes::from(message.data.clone())).unwrap();
if let Some(mut rx) = server.handle_ws_data(ws_data).await {
let new_ws_message = rx.recv().await.unwrap();
match receivers.get(&new_ws_message.module) {
None => tracing::error!("Can't find any handler for message: {:?}", new_ws_message),
Some(handler) => handler.receive_message(new_ws_message.clone()),
}
}
}
}
});
FutureResult::new(async { Ok(()) })
}
fn stop_connect(&self) -> FutureResult<(), FlowyError> {
*self.is_stop.write() = true;
FutureResult::new(async { Ok(()) })
}
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.state_sender.subscribe() }
fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn add_message_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
self.receivers.insert(handler.source(), handler);
Ok(())
}
fn ws_sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) }
}
#[derive(Clone)]
pub struct MockWSSender(broadcast::Sender<WebSocketRawMessage>);
impl FlowyWSSender for MockWSSender {
fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> {
let _ = self.0.send(msg);
Ok(())
}
}
impl std::ops::Deref for MockWSSender {
type Target = broadcast::Sender<WebSocketRawMessage>;
fn deref(&self) -> &Self::Target { &self.0 }
}

View File

@ -0,0 +1,3 @@
mod ws_local;
pub use ws_local::*;

View File

@ -1,10 +1,10 @@
use crate::services::ws::{
use flowy_net::services::ws::{
FlowyError,
FlowyWSSender,
FlowyWebSocket,
FlowyWsSender,
WSConnectState,
WSMessageReceiver,
WebScoketRawMessage,
WebSocketRawMessage,
};
use lib_infra::future::FutureResult;
use std::sync::Arc;
@ -12,7 +12,7 @@ use tokio::sync::{broadcast, broadcast::Receiver};
pub(crate) struct LocalWebSocket {
state_sender: broadcast::Sender<WSConnectState>,
ws_sender: broadcast::Sender<WebScoketRawMessage>,
ws_sender: LocalWSSender,
}
impl std::default::Default for LocalWebSocket {
@ -21,12 +21,12 @@ impl std::default::Default for LocalWebSocket {
let (ws_sender, _) = broadcast::channel(16);
LocalWebSocket {
state_sender,
ws_sender,
ws_sender: LocalWSSender(ws_sender),
}
}
}
impl FlowyWebSocket for Arc<LocalWebSocket> {
impl FlowyWebSocket for LocalWebSocket {
fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
@ -37,12 +37,19 @@ impl FlowyWebSocket for Arc<LocalWebSocket> {
fn add_message_receiver(&self, _handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> { Ok(()) }
fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) }
fn ws_sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) }
}
impl FlowyWsSender for broadcast::Sender<WebScoketRawMessage> {
fn send(&self, msg: WebScoketRawMessage) -> Result<(), FlowyError> {
let _ = self.send(msg);
#[derive(Clone)]
pub struct LocalWSSender(broadcast::Sender<WebSocketRawMessage>);
impl FlowyWSSender for LocalWSSender {
fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> {
let _ = self.0.send(msg);
Ok(())
}
}
impl std::ops::Deref for LocalWSSender {
type Target = broadcast::Sender<WebSocketRawMessage>;
fn deref(&self) -> &Self::Target { &self.0 }
}

View File

@ -2,3 +2,7 @@ pub mod doc;
pub mod parser;
pub mod revision;
pub mod ws;
pub mod prelude {
pub use crate::entities::{doc::*, parser::*, revision::*, ws::*};
}

View File

@ -78,7 +78,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
| "ExportRequest"
| "ExportData"
| "WSError"
| "WebScoketRawMessage"
| "WebSocketRawMessage"
=> TypeCategory::Protobuf,
"WorkspaceEvent"
| "WorkspaceNotification"

View File

@ -4,7 +4,7 @@ use std::convert::TryInto;
use tokio_tungstenite::tungstenite::Message as TokioMessage;
#[derive(ProtoBuf, Debug, Clone, Default)]
pub struct WebScoketRawMessage {
pub struct WebSocketRawMessage {
#[pb(index = 1)]
pub module: WSModule,
@ -29,8 +29,8 @@ impl ToString for WSModule {
}
}
impl std::convert::From<WebScoketRawMessage> for TokioMessage {
fn from(msg: WebScoketRawMessage) -> Self {
impl std::convert::From<WebSocketRawMessage> for TokioMessage {
fn from(msg: WebSocketRawMessage) -> Self {
let result: Result<Bytes, ::protobuf::ProtobufError> = msg.try_into();
match result {
Ok(bytes) => TokioMessage::Binary(bytes.to_vec()),

View File

@ -24,7 +24,7 @@
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1;
#[derive(PartialEq,Clone,Default)]
pub struct WebScoketRawMessage {
pub struct WebSocketRawMessage {
// message fields
pub module: WSModule,
pub data: ::std::vec::Vec<u8>,
@ -33,14 +33,14 @@ pub struct WebScoketRawMessage {
pub cached_size: ::protobuf::CachedSize,
}
impl<'a> ::std::default::Default for &'a WebScoketRawMessage {
fn default() -> &'a WebScoketRawMessage {
<WebScoketRawMessage as ::protobuf::Message>::default_instance()
impl<'a> ::std::default::Default for &'a WebSocketRawMessage {
fn default() -> &'a WebSocketRawMessage {
<WebSocketRawMessage as ::protobuf::Message>::default_instance()
}
}
impl WebScoketRawMessage {
pub fn new() -> WebScoketRawMessage {
impl WebSocketRawMessage {
pub fn new() -> WebSocketRawMessage {
::std::default::Default::default()
}
@ -86,7 +86,7 @@ impl WebScoketRawMessage {
}
}
impl ::protobuf::Message for WebScoketRawMessage {
impl ::protobuf::Message for WebSocketRawMessage {
fn is_initialized(&self) -> bool {
true
}
@ -161,8 +161,8 @@ impl ::protobuf::Message for WebScoketRawMessage {
Self::descriptor_static()
}
fn new() -> WebScoketRawMessage {
WebScoketRawMessage::new()
fn new() -> WebSocketRawMessage {
WebSocketRawMessage::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
@ -171,29 +171,29 @@ impl ::protobuf::Message for WebScoketRawMessage {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum<WSModule>>(
"module",
|m: &WebScoketRawMessage| { &m.module },
|m: &mut WebScoketRawMessage| { &mut m.module },
|m: &WebSocketRawMessage| { &m.module },
|m: &mut WebSocketRawMessage| { &mut m.module },
));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"data",
|m: &WebScoketRawMessage| { &m.data },
|m: &mut WebScoketRawMessage| { &mut m.data },
|m: &WebSocketRawMessage| { &m.data },
|m: &mut WebSocketRawMessage| { &mut m.data },
));
::protobuf::reflect::MessageDescriptor::new_pb_name::<WebScoketRawMessage>(
"WebScoketRawMessage",
::protobuf::reflect::MessageDescriptor::new_pb_name::<WebSocketRawMessage>(
"WebSocketRawMessage",
fields,
file_descriptor_proto()
)
})
}
fn default_instance() -> &'static WebScoketRawMessage {
static instance: ::protobuf::rt::LazyV2<WebScoketRawMessage> = ::protobuf::rt::LazyV2::INIT;
instance.get(WebScoketRawMessage::new)
fn default_instance() -> &'static WebSocketRawMessage {
static instance: ::protobuf::rt::LazyV2<WebSocketRawMessage> = ::protobuf::rt::LazyV2::INIT;
instance.get(WebSocketRawMessage::new)
}
}
impl ::protobuf::Clear for WebScoketRawMessage {
impl ::protobuf::Clear for WebSocketRawMessage {
fn clear(&mut self) {
self.module = WSModule::Doc;
self.data.clear();
@ -201,13 +201,13 @@ impl ::protobuf::Clear for WebScoketRawMessage {
}
}
impl ::std::fmt::Debug for WebScoketRawMessage {
impl ::std::fmt::Debug for WebSocketRawMessage {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for WebScoketRawMessage {
impl ::protobuf::reflect::ProtobufValue for WebSocketRawMessage {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Message(self)
}
@ -261,7 +261,7 @@ impl ::protobuf::reflect::ProtobufValue for WSModule {
}
static file_descriptor_proto_data: &'static [u8] = b"\
\n\tmsg.proto\"L\n\x13WebScoketRawMessage\x12!\n\x06module\x18\x01\x20\
\n\tmsg.proto\"L\n\x13WebSocketRawMessage\x12!\n\x06module\x18\x01\x20\
\x01(\x0e2\t.WSModuleR\x06module\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\
\x04data*\x13\n\x08WSModule\x12\x07\n\x03Doc\x10\0J\xd9\x01\n\x06\x12\
\x04\0\0\x08\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\

View File

@ -1,6 +1,6 @@
syntax = "proto3";
message WebScoketRawMessage {
message WebSocketRawMessage {
WSModule module = 1;
bytes data = 2;
}

View File

@ -3,7 +3,7 @@ use crate::{
connect::{WSConnectionFuture, WSStream},
errors::WSError,
WSModule,
WebScoketRawMessage,
WebSocketRawMessage,
};
use backend_service::errors::ServerError;
use bytes::Bytes;
@ -34,7 +34,7 @@ type Handlers = DashMap<WSModule, Arc<dyn WSMessageReceiver>>;
pub trait WSMessageReceiver: Sync + Send + 'static {
fn source(&self) -> WSModule;
fn receive_message(&self, msg: WebScoketRawMessage);
fn receive_message(&self, msg: WebSocketRawMessage);
}
pub struct WSController {
@ -175,7 +175,7 @@ impl WSHandlerFuture {
fn handle_binary_message(&self, bytes: Vec<u8>) {
let bytes = Bytes::from(bytes);
match WebScoketRawMessage::try_from(bytes) {
match WebSocketRawMessage::try_from(bytes) {
Ok(message) => match self.handlers.get(&message.module) {
None => log::error!("Can't find any handler for message: {:?}", message),
Some(handler) => handler.receive_message(message.clone()),
@ -207,7 +207,7 @@ pub struct WSSender {
}
impl WSSender {
pub fn send_msg<T: Into<WebScoketRawMessage>>(&self, msg: T) -> Result<(), WSError> {
pub fn send_msg<T: Into<WebSocketRawMessage>>(&self, msg: T) -> Result<(), WSError> {
let msg = msg.into();
let _ = self
.ws_tx
@ -217,7 +217,7 @@ impl WSSender {
}
pub fn send_text(&self, source: &WSModule, text: &str) -> Result<(), WSError> {
let msg = WebScoketRawMessage {
let msg = WebSocketRawMessage {
module: source.clone(),
data: text.as_bytes().to_vec(),
};
@ -225,7 +225,7 @@ impl WSSender {
}
pub fn send_binary(&self, source: &WSModule, bytes: Vec<u8>) -> Result<(), WSError> {
let msg = WebScoketRawMessage {
let msg = WebSocketRawMessage {
module: source.clone(),
data: bytes,
};