diff --git a/Cargo.toml b/Cargo.toml index 818b94cdb5..c712b202df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,11 +2,13 @@ members = [ "common/rust/ast/core", - "common/rust/json-rpc", "common/rust/ast/macros", + "common/rust/file-manager", + "common/rust/json-rpc", "common/rust/macro-utils", "common/rust/parser", "common/rust/prelude", "common/rust/shapely/core", - "common/rust/shapely/macros" -] \ No newline at end of file + "common/rust/shapely/macros", + "common/rust/utils", +] diff --git a/common/rust/file-manager/Cargo.toml b/common/rust/file-manager/Cargo.toml new file mode 100644 index 0000000000..6c58f922e4 --- /dev/null +++ b/common/rust/file-manager/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "file-manager-client" +version = "0.1.0" +authors = ["Enso Team "] +edition = "2018" + +[lib] +crate-type = ["cdylib", "rlib"] + +[dependencies] +futures = "0.3.1" +paste = "0.1.6" +serde_json = "1.0" + +chrono = { version = "0.4" , features = ["serde"] } +serde = { version = "1.0" , features = ["derive"] } +uuid = { version = "0.8" , features = ["serde", "v5"] } + +json-rpc = { version = "0.1.0" , path = "../json-rpc" } +prelude = { version = "0.1.0" , path = "../prelude" } +utils = { version = "0.1.0" , path = "../utils" } diff --git a/common/rust/file-manager/README.md b/common/rust/file-manager/README.md new file mode 100644 index 0000000000..8a3a027eec --- /dev/null +++ b/common/rust/file-manager/README.md @@ -0,0 +1,136 @@ +File Manager consists of pair server and client. They communicate by exchaning +messages, following JSON-RPC 2.0. + +# Setup +Establish websocket connection. + +In future it is expected that some kind of authorization will be required by the +server. As of now, its details remain unspecified. + +# General protocol +Remote calls made between File Manager client and server follow [JSON-RPC 2.0 +protocol](https://www.jsonrpc.org/specification). + +There are two primary cases: +* RPC calls from client to [server methods](#Methods); +* [Notifications](#Notifications) sent from server to client. + +All messages are text with JSON-encoded values. + +File Manager accepts only method calls (request objects). + +File Manager responds with call results and may send notifications. + +# Methods +| Method | Input | Result | +|---------------|------------------------------|------------| +| copyDirectory | {from:Path, to:Path} | () | +| copyFile | {from:Path, to:Path} | () | +| deleteFile | {path:Path} | () | +| exists | {path:Path} | Boolean | +| list | {path:Path} | [Path] | +| moveDirectory | {from:Path, to:Path} | () | +| moveFile | {from:Path, to:Path} | () | +| read | {path:Path} | String | +| status | {path:Path} | Attributes | +| touch | {path:Path} | () | +| write | {path:Path, contents:String} | () | +| createWatch | {path:Path} | UUID | +| deleteWatch | {watchId:UUID} | () | + +Where `()` is a unit value. + +# Notifications +Notifications are emitted by the server. + +| Method | Input | Result | +|-----------------|-----------------------------|--------| +| filesystemEvent | {path:Path, kind:EventKind} | N/A | + +`filesystemEvent` notification is emitted for paths that are tracked by watch +(i.e. subtree of a location passed to `createWatch` method). + +It should be noted that watch notifications are not reliable and significantly +os-dependent. + +# Types +``` +Attributes = struct { + creationTime : FileTime, + lastAccess_time : FileTime, + lastModified_time : FileTime, + fileKind : FileKind, + byteSize : u64, +} + +EventKind = enum { Created, Deleted, Modified, Overflow } +FileKind = enum { Directory, RegularFile, SymbolicLink, Other } +``` + +# JSON Encoding +Struct values are serialized as map, e.g. `{ "field_name" : field_value }`. + +Enum values are serialized as map `{ "variant_name" : variant_value }` or just +`"variant_name"` if there variants have no inner value. +Transitive enums (i.e. enums of enums) are flattened, no intermediate variant +names shall appear. + +`()` (unit value) is serialized as `null`. + +`FileTime` value is serialized as a string compliant with RFC3339 / ISO8601 text +format, e.g. `"2020-01-07T21:25:26Z"`. + +`Path` is serialized as JSON string value, e.g. `"./Main.luna"`. + +`UUID` is serialzied as string using 8-4-4-4-12 format, e.g. +`"02723954-fbb0-4641-af53-cec0883f260a"`. + +`u64` is an unsigned 64-bit integer value. + +## Examples + +### Call to `exists` method +#### Request (call) +```json +{ + "jsonrpc" : "2.0", + "id" : 0, + "method" : "exists", + "input" : { "path" : "./Main.luna" } +} +``` +#### Response +```json +{ + "jsonrpc" : "2.0", + "id" : 0, + "result" : true +} +``` + +### Filesystem Event Notification +#### Request (notification) +```json +{ + "jsonrpc" : "2.0", + "method" : "filesystemEvent", + "params" : { "path" : "./Main.luna", "kind" : "Modified" } +} +``` + +Notification requests gets no response. + + +### `Attributes` structure +`Attributes` value may be serialized to a following JSON: +```json +{ + "creationTime" : "2020-01-07T21:25:26Z", + "lastAccessTime" : "2020-01-21T22:16:51.123994500+00:00", + "lastModifiedTime" : "2020-01-07T21:25:26Z", + "fileKind" : "RegularFile", + "sizeInBytes" : 125125 +} +``` + + diff --git a/common/rust/file-manager/src/lib.rs b/common/rust/file-manager/src/lib.rs new file mode 100644 index 0000000000..a76dbc053c --- /dev/null +++ b/common/rust/file-manager/src/lib.rs @@ -0,0 +1,442 @@ +//! Client library for the JSON-RPC-based File Manager service. + +#![warn(missing_docs)] +#![warn(trivial_casts)] +#![warn(trivial_numeric_casts)] +#![warn(unused_import_braces)] +#![warn(unused_qualifications)] +#![warn(unsafe_code)] +#![warn(missing_copy_implementations)] +#![warn(missing_debug_implementations)] + +use prelude::*; + +use json_rpc::api::Result; +use json_rpc::Handler; +use futures::Stream; +use serde::Serialize; +use serde::Deserialize; +use std::future::Future; +use uuid::Uuid; + + + +// ============= +// === Event === +// ============= + +/// Event emitted by the File Manager `Client`. +pub type Event = json_rpc::handler::Event; + + + +// ============ +// === Path === +// ============ + +/// Path to a file. +#[derive(Clone,Debug,Display,Eq,Hash,PartialEq,PartialOrd,Ord)] +#[derive(Serialize, Deserialize)] +#[derive(Shrinkwrap)] +pub struct Path(pub String); + +impl Path { + /// Wraps a `String`-like entity into a new `Path`. + pub fn new(s:S) -> Path where S:Into { + Path(s.into()) + } +} + + + +// ==================== +// === Notification === +// ==================== + +/// Notification generated by the File Manager. +#[derive(Clone,Debug,PartialEq)] +#[derive(Serialize, Deserialize)] +#[serde(tag="method", content="params")] +pub enum Notification { + /// Filesystem event occurred for a watched path. + #[serde(rename = "filesystemEvent")] + FilesystemEvent(FilesystemEvent), +} + + + +// ======================= +// === FilesystemEvent === +// ======================= + +/// Filesystem event notification, generated by an active file watch. +#[derive(Clone,Debug,PartialEq)] +#[derive(Serialize, Deserialize)] +pub struct FilesystemEvent { + /// Path of the file that the event is about. + pub path : Path, + /// What kind of event is it. + pub kind : FilesystemEventKind +} + +/// Describes kind of filesystem event (was the file created or deleted, etc.) +#[derive(Clone,Copy,Debug,PartialEq)] +#[derive(Serialize, Deserialize)] +pub enum FilesystemEventKind { + /// A new file under path was created. + Created, + /// Existing file under path was deleted. + Deleted, + /// File under path was modified. + Modified, + /// An overflow occurred and some events were lost, + Overflow +} + + + +// ================== +// === Attributes === +// ================== + +/// Attributes of the file in the filesystem. +#[derive(Clone,Copy,Debug,PartialEq)] +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Attributes{ + /// When the file was created. + pub creation_time : FileTime, + /// When the file was last accessed. + pub last_access_time : FileTime, + /// When the file was last modified. + pub last_modified_time : FileTime, + /// What kind of file is this. + pub file_kind : FileKind, + /// Size of the file in bytes. + /// (size of files not being `RegularFile`s is unspecified). + pub byte_size : u64 +} + +/// A filesystem's timestamp. +pub type FileTime = chrono::DateTime; + +/// What kind of file (regular, directory, symlink) is this. +#[derive(Clone,Copy,Debug,PartialEq)] +#[derive(Serialize, Deserialize)] +pub enum FileKind { + /// File being a directory. + Directory, + /// File being a symbolic link. + SymbolicLink, + /// File being a regular file with opaque content. + RegularFile, + /// File being none of the above, e.g. a physical device or a pipe. + Other +} + + + +// ============== +// === Client === +// ============== + +/// File Manager client. Contains numerous asynchronous methods for remote calls +/// on File Manager server. Also, allows obtaining events stream by calling +/// `events`. +#[derive(Debug)] +pub struct Client { + /// JSON-RPC protocol handler. + handler : Handler, +} + +impl Client { + /// Create a new File Manager client that will use given transport. + pub fn new(transport:impl json_rpc::Transport + 'static) -> Client { + let handler = Handler::new(transport); + Client { handler } + } + + /// Asynchronous event stream with notification and errors. + /// + /// On a repeated call, previous stream is closed. + pub fn events(&mut self) -> impl Stream { + self.handler.events() + } + + /// Method that should be called on each frame. + /// + /// Processes incoming transport events, generating File Manager events and + /// driving asynchronous calls to completion. + pub fn process_events(&mut self) { + self.handler.process_events() + } +} + + + +// =================== +// === RPC Methods === +// =================== + + +// === Helper macro === + +/// Macro that generates a asynchronous method making relevant RPC call to the +/// server. First three args is the name appropriately in CamelCase, +/// snake_case, camelCase. Then goes the function signature, in form of +/// `(arg:Arg) -> Ret`. +/// +/// Macro generates: +/// * a method in Client named `snake_case` that takes `(arg:Arg)` and returns +/// `Future`. +/// * a structure named `CamelCase` that stores function arguments as fields and +/// its JSON serialization conforms to JSON-RPC (yielding `method` and +/// `params` fields). +/// * `snakeCase` is the name of the remote method. +macro_rules! make_rpc_method { + ( $name_typename:ident + $name:ident + $name_ext:ident + ($($arg:ident : $type:ty),* $(,)?) -> $out:ty ) => { + paste::item! { + impl Client { + /// Remote call to the method on the File Manager Server. + pub fn $name + (&mut self, $($arg:$type),*) -> impl Future> { + let input = [<$name_typename Input>] { $($arg:$arg),* }; + self.handler.open_request(input) + } + } + + /// Structure transporting method arguments. + #[derive(Serialize,Deserialize,Debug,PartialEq)] + #[serde(rename_all = "camelCase")] + struct [<$name_typename Input>] { + $($arg : $type),* + } + + impl json_rpc::RemoteMethodCall for [<$name_typename Input>] { + const NAME:&'static str = stringify!($name_ext); + type Returned = $out; + } + }} +} + + +// === Remote API definition === + +make_rpc_method!(CopyDirectory copy_directory copyDirectory (from:Path, to:Path) -> () ); +make_rpc_method!(CopyFile copy_file copyFile (from:Path, to:Path) -> () ); +make_rpc_method!(DeleteFile delete_file deleteFile (path:Path) -> () ); +make_rpc_method!(Exists exists exists (path:Path) -> bool ); +make_rpc_method!(List list list (path:Path) -> Vec ); +make_rpc_method!(MoveDirectory move_directory moveDirectory (from:Path, to:Path) -> () ); +make_rpc_method!(MoveFile move_file moveFile (from:Path, to:Path) -> () ); +make_rpc_method!(Read read read (path:Path) -> String ); +make_rpc_method!(Status status status (path:Path) -> Attributes); +make_rpc_method!(Touch touch touch (path:Path) -> () ); +make_rpc_method!(Write write write (path:Path, contents:String) -> () ); +make_rpc_method!(CreateWatch create_watch createWatch (path:Path) -> Uuid ); +make_rpc_method!(DeleteWatch delete_watch deleteWatch (watch_id:Uuid) -> () ); + + + +// ============= +// === Tests === +// ============= + +#[cfg(test)] +mod tests { + use super::*; + use super::FileKind::RegularFile; + + use json_rpc::messages::Message; + use json_rpc::messages::RequestMessage; + use json_rpc::test_util::transport::mock::MockTransport; + use serde_json::json; + use serde_json::Value; + use std::future::Future; + use utils::poll_future_output; + use utils::poll_stream_output; + + fn setup_fm() -> (MockTransport, Client) { + let transport = MockTransport::new(); + let client = Client::new(transport.clone()); + (transport,client) + } + + #[test] + fn test_notification() { + let (mut transport, mut client) = setup_fm(); + let mut events = Box::pin(client.events()); + assert!(poll_stream_output(&mut events).is_none()); + + let expected_notification = FilesystemEvent { + path : Path::new("./Main.luna"), + kind : FilesystemEventKind::Modified, + }; + let notification_text = r#"{ + "jsonrpc": "2.0", + "method": "filesystemEvent", + "params": {"path" : "./Main.luna", "kind" : "Modified"} + }"#; + transport.mock_peer_message_text(notification_text); + assert!(poll_stream_output(&mut events).is_none()); + client.process_events(); + let event = poll_stream_output(&mut events); + if let Some(Event::Notification(n)) = event { + assert_eq!(n, Notification::FilesystemEvent(expected_notification)); + } else { + panic!("expected notification event"); + } + } + + /// Tests making a request using file manager: + /// * creates FM client and uses `make_request` to make a request + /// * checks that request is made for `expected_method` + /// * checks that request input is `expected_input` + /// * mocks receiving a response from server with `result` + /// * checks that FM-returned Future yields `expected_output` + fn test_request + ( make_request:Fun + , expected_method:&str + , expected_input:Value + , result:Value + , expected_output:T ) + where Fun : FnOnce(&mut Client) -> Fut, + Fut : Future>, + T : Debug + PartialEq { + let (mut transport, mut client) = setup_fm(); + let mut fut = Box::pin(make_request(&mut client)); + + let request = transport.expect_message::>(); + assert_eq!(request.method, expected_method); + assert_eq!(request.input, expected_input); + + let response = Message::new_success(request.id, result); + transport.mock_peer_message(response); + + client.process_events(); + let output = poll_future_output(&mut fut).unwrap().unwrap(); + assert_eq!(output, expected_output); + } + + #[test] + fn test_requests() { + let main = Path::new("./Main.luna"); + let target = Path::new("./Target.luna"); + let path_main = json!({"path" : "./Main.luna"}); + let from_main_to_target = json!({ + "from" : "./Main.luna", + "to" : "./Target.luna" + }); + let true_json = json!(true); + let unit_json = json!(null); + + test_request( + |client| client.copy_directory(main.clone(), target.clone()), + "copyDirectory", + from_main_to_target.clone(), + unit_json.clone(), + ()); + test_request( + |client| client.copy_file(main.clone(), target.clone()), + "copyFile", + from_main_to_target.clone(), + unit_json.clone(), + ()); + test_request( + |client| client.delete_file(main.clone()), + "deleteFile", + path_main.clone(), + unit_json.clone(), + ()); + test_request( + |client| client.exists(main.clone()), + "exists", + path_main.clone(), + true_json, + true); + + let list_response_json = json!([ "Bar.luna", "Foo.luna" ]); + let list_response_value = vec! [Path::new("Bar.luna"),Path::new("Foo.luna")]; + test_request( + |client| client.list(main.clone()), + "list", + path_main.clone(), + list_response_json, + list_response_value); + test_request( + |client| client.move_directory(main.clone(), target.clone()), + "moveDirectory", + from_main_to_target.clone(), + unit_json.clone(), + ()); + test_request( + |client| client.move_file(main.clone(), target.clone()), + "moveFile", + from_main_to_target.clone(), + unit_json.clone(), + ()); + test_request( + |client| client.read(main.clone()), + "read", + path_main.clone(), + json!("Hello world!"), + "Hello world!".into()); + + let parse_rfc3339 = |s| { + chrono::DateTime::parse_from_rfc3339(s).unwrap() + }; + let expected_attributes = Attributes { + creation_time : parse_rfc3339("2020-01-07T21:25:26Z"), + last_access_time : parse_rfc3339("2020-01-21T22:16:51.123994500+00:00"), + last_modified_time : parse_rfc3339("2020-01-07T21:25:26Z"), + file_kind : RegularFile, + byte_size : 125125, + }; + let sample_attributes_json = json!({ + "creationTime" : "2020-01-07T21:25:26Z", + "lastAccessTime" : "2020-01-21T22:16:51.123994500+00:00", + "lastModifiedTime" : "2020-01-07T21:25:26Z", + "fileKind" : "RegularFile", + "byteSize" : 125125 + }); + test_request( + |client| client.status(main.clone()), + "status", + path_main.clone(), + sample_attributes_json, + expected_attributes); + test_request( + |client| client.touch(main.clone()), + "touch", + path_main.clone(), + unit_json.clone(), + ()); + test_request( + |client| client.write(main.clone(), "Hello world!".into()), + "write", + json!({"path" : "./Main.luna", "contents" : "Hello world!"}), + unit_json.clone(), + ()); + + let uuid_value = uuid::Uuid::parse_str("02723954-fbb0-4641-af53-cec0883f260a").unwrap(); + let uuid_json = json!("02723954-fbb0-4641-af53-cec0883f260a"); + test_request( + |client| client.create_watch(main.clone()), + "createWatch", + path_main.clone(), + uuid_json.clone(), + uuid_value); + let watch_id = json!({ + "watchId" : "02723954-fbb0-4641-af53-cec0883f260a" + }); + test_request( + |client| client.delete_watch(uuid_value.clone()), + "deleteWatch", + watch_id.clone(), + unit_json.clone(), + ()); + } +} diff --git a/common/rust/json-rpc/Cargo.toml b/common/rust/json-rpc/Cargo.toml index 9f3f57beac..314abde316 100644 --- a/common/rust/json-rpc/Cargo.toml +++ b/common/rust/json-rpc/Cargo.toml @@ -8,9 +8,10 @@ edition = "2018" crate-type = ["cdylib", "rlib"] [dependencies] -prelude = { version = "0.1.0" , path = "../prelude" } -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -shrinkwraprs = "0.3.0" +prelude = { version = "0.1.0" , path = "../prelude" } +utils = { version = "0.1.0" , path = "../utils" } +serde = { version = "1.0", features = ["derive"] } futures = "0.3.1" failure = "0.1.6" +serde_json = "1.0" +shrinkwraprs = "0.3.0" diff --git a/common/rust/json-rpc/src/handler.rs b/common/rust/json-rpc/src/handler.rs index a8d0acbcb2..7a811ff6ac 100644 --- a/common/rust/json-rpc/src/handler.rs +++ b/common/rust/json-rpc/src/handler.rs @@ -8,12 +8,13 @@ use crate::error::HandlingError; use crate::error::RpcError; use crate::messages; use crate::messages::Id; -use crate::messages::IncomingMessage; -use crate::messages::Message; use crate::transport::Transport; use crate::transport::TransportEvent; use futures::FutureExt; +use futures::Stream; +use futures::channel::mpsc::unbounded; +use futures::channel::mpsc::UnboundedSender; use futures::channel::oneshot; use serde::de::DeserializeOwned; use std::future::Future; @@ -117,47 +118,19 @@ impl SharedBuffer { -// ================ -// === Callback === -// ================ +// ============= +// === Event === +// ============= -/// An optional callback procedure taking `T`. -pub struct Callback { - cb: Option ()>> -} - -impl Debug for Callback { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - write!(f, "{}", match self.cb { - Some(_) => "Some()", - None => "None", - }) - } -} - -impl Callback { - /// Create a new, empty callaback. - pub fn new() -> Callback { - Callback { cb:None } - } - - /// Sets callback to the given callable. - pub fn set () + 'static>(&mut self, cb:F) { - self.cb = Some(Box::new(cb)); - } - - /// Clears the previously set callback (if any). - pub fn unset(&mut self) { - self.cb = None; - } - - /// Calls the provided callback with `t`. Does nothing, if no callback was - /// provided. - pub fn try_call(&mut self, t:T) { - if let Some(cb) = &self.cb { - cb(t); - } - } +/// Event emitted by the `Handler`. +#[derive(Debug)] +pub enum Event { + /// Transport has been closed. + Closed, + /// Error occurred. + Error(HandlingError), + /// Notification received. + Notification(N), } @@ -177,10 +150,13 @@ pub type OngoingCalls = HashMap>; /// It allows making request, where method calls are described by values /// implementing `RemoteMethodCall`. The response is returned as a `Future`. /// -/// Notifications and internal messages are emitted using an optionally set -/// callbacks. +/// Notifications and internal messages are emitted using the `events` stream. +/// +/// `Notification` is a type for notifications. It should implement +/// `DeserializeOwned` and deserialize from JSON maps with `method` and `params` +/// fields. #[derive(Debug)] -pub struct Handler { +pub struct Handler { /// Contains handles to calls that were made but no response has came. pub ongoing_calls : OngoingCalls, /// Provides identifiers for requests. @@ -189,25 +165,22 @@ pub struct Handler { pub transport : Box, /// Allows receiving events from the `Transport`. pub incoming_events : std::sync::mpsc::Receiver, - /// Callback called when internal error happens. - pub on_error : Callback, - /// Callback called when notification from server is received. - pub on_notification : Callback>, + /// Handle to send outgoing events. + pub outgoing_events : Option>>, } -impl Handler { +impl Handler { /// Creates a new handler working on a given `Transport`. /// /// `Transport` must be functional (e.g. not in the process of opening). - pub fn new(transport:impl Transport + 'static) -> Handler { + pub fn new(transport:impl Transport + 'static) -> Handler { let (event_tx, event_rx) = std::sync::mpsc::channel(); let mut ret = Handler { ongoing_calls : OngoingCalls::new(), id_generator : IdGenerator::new(), transport : Box::new(transport), incoming_events : event_rx, - on_error : Callback::new(), - on_notification : Callback::new(), + outgoing_events : None, }; ret.transport.set_event_tx(event_tx); ret @@ -243,7 +216,7 @@ impl Handler { if let Some(sender) = self.ongoing_calls.remove(&message.id) { // Disregard any error. We do not care if RPC caller already // dropped the future. - let _ = sender.send(message.result); + sender.send(message.result).ok(); } else { self.error_occurred(HandlingError::UnexpectedResponse(message)); } @@ -251,32 +224,30 @@ impl Handler { /// Deal with `Notification` message from the peer. /// - /// It shall be announced using `on_notification` callback, allowing a - /// specific API client to properly deal with message details. + /// If possible, emits a message with notification. In case of failure, + /// emits relevant error. pub fn process_notification - (&mut self, message:messages::Notification) { - self.on_notification.try_call(message); - } - - /// Partially decodes incoming message. - /// - /// This checks if has `jsonrpc` version string, and whetehr it is a - /// response or a notification. - pub fn decode_incoming_message(&mut self, message:String) - -> serde_json::Result { - use serde_json::Value; - use serde_json::from_str; - use serde_json::from_value; - let message = from_str::>(&message)?; - from_value::(message.payload) + (&mut self, message:messages::Notification) + where Notification: DeserializeOwned { + match serde_json::from_value(message.0) { + Ok(notification) => { + let event = Event::Notification(notification); + self.send_event(event); + }, + Err(e) => { + let err = HandlingError::InvalidNotification(e); + self.error_occurred(err); + } + } } /// Deal with incoming text message from the peer. /// /// The message must conform either to the `Response` or to the /// `Notification` JSON-serialized format. Otherwise, an error is raised. - pub fn process_incoming_message(&mut self, message:String) { - match self.decode_incoming_message(message) { + pub fn process_incoming_message(&mut self, message:String) + where Notification: DeserializeOwned { + match messages::decode_incoming_message(message) { Ok(messages::IncomingMessage::Response(response)) => self.process_response(response), Ok(messages::IncomingMessage::Notification(notification)) => @@ -289,11 +260,14 @@ impl Handler { /// With with a handling error. Uses `on_error` callback to notify the /// owner. pub fn error_occurred(&mut self, error: HandlingError) { - self.on_error.try_call(error); + self.send_event(Event::Error(error)) } /// Processes a single transport event. - pub fn process_event(&mut self, event:TransportEvent) { + /// + /// Each event either completes a requests or is translated into `Event`. + pub fn process_event(&mut self, event:TransportEvent) + where Notification: DeserializeOwned { match event { TransportEvent::TextMessage(msg) => self.process_incoming_message(msg), @@ -301,6 +275,7 @@ impl Handler { // Dropping all ongoing calls will mark their futures as // cancelled. self.ongoing_calls.clear(); + self.send_event(Event::Closed); } } } @@ -311,7 +286,8 @@ impl Handler { /// This will decode the incoming messages, providing input to the futures /// returned from RPC calls. /// Also this cancels any ongoing calls if the connection was lost. - pub fn process_events(&mut self) { + pub fn process_events(&mut self) + where Notification: DeserializeOwned { loop { match self.incoming_events.try_recv() { Ok(event) => self.process_event(event), @@ -322,19 +298,32 @@ impl Handler { } } - /// Decode expected notification type from JSON. - /// - /// Returns Some on success and None on failure. Addittionaly, a handling - /// error is raised in case of failure. - pub fn decode_notification - (&mut self, json:serde_json::Value) -> Option { - match serde_json::from_value(json) { - Ok(ret) => ret, - Err(e) => { - let err = HandlingError::InvalidNotification(e); - self.error_occurred(err); - None + /// Sends a handler event to the event stream. + pub fn send_event(&mut self, event:Event) { + if let Some(tx) = self.outgoing_events.as_mut() { + match tx.unbounded_send(event) { + Ok(()) => {}, + Err(e) => + if e.is_full() { + // Impossible, as per `futures` library docs. + panic!("unbounded channel should never be full") + } else if e.is_disconnected() { + // It is ok for receiver to disconnect and ignore events. + } else { + // Never happens unless `futures` library changes API. + panic!("unknown unexpected error") + } } } } + + /// Creates a new stream with events from this handler. + /// + /// If such stream was already existing, it will be finished (and + /// continuations should be able to process any remaining events). + pub fn events(&mut self) -> impl Stream> { + let (tx,rx) = unbounded(); + self.outgoing_events = Some(tx); + rx + } } diff --git a/common/rust/json-rpc/src/lib.rs b/common/rust/json-rpc/src/lib.rs index b5dfa7410d..c8463d5d60 100644 --- a/common/rust/json-rpc/src/lib.rs +++ b/common/rust/json-rpc/src/lib.rs @@ -15,8 +15,12 @@ pub mod api; pub mod error; pub mod handler; pub mod messages; +pub mod test_util; pub mod transport; +pub use api::RemoteMethodCall; +pub use api::Result; pub use transport::Transport; pub use transport::TransportEvent; +pub use handler::Event; pub use handler::Handler; diff --git a/common/rust/json-rpc/src/messages.rs b/common/rust/json-rpc/src/messages.rs index 9933c03b5f..d41c0f2986 100644 --- a/common/rust/json-rpc/src/messages.rs +++ b/common/rust/json-rpc/src/messages.rs @@ -186,6 +186,19 @@ pub enum IncomingMessage { Notification(Notification), } +/// Partially decodes incoming message. +/// +/// This checks if has `jsonrpc` version string, and whether it is a +/// response or a notification. +pub fn decode_incoming_message +(message:String) -> serde_json::Result { + use serde_json::Value; + use serde_json::from_str; + use serde_json::from_value; + let message = from_str::>(&message)?; + from_value::(message.payload) +} + /// Message from server to client. /// /// `In` is any serializable (or already serialized) representation of the diff --git a/common/rust/json-rpc/src/test_util/mod.rs b/common/rust/json-rpc/src/test_util/mod.rs new file mode 100644 index 0000000000..7582ff70d0 --- /dev/null +++ b/common/rust/json-rpc/src/test_util/mod.rs @@ -0,0 +1,5 @@ +//! Test utilities. Should not be used in production code. +//! +//! Reusable code for other crates that want to test usage of this crate. + +pub mod transport; diff --git a/common/rust/json-rpc/src/test_util/transport/mock.rs b/common/rust/json-rpc/src/test_util/transport/mock.rs new file mode 100644 index 0000000000..792c43bb25 --- /dev/null +++ b/common/rust/json-rpc/src/test_util/transport/mock.rs @@ -0,0 +1,151 @@ +//! Module provides a `MockTransport` that implements `Transport`. +//! +//! It is meant to be used in tests. + + +use prelude::*; + +use crate::transport::Transport; +use crate::transport::TransportEvent; + +use std::collections::VecDeque; +use failure::Error; +use serde::de::DeserializeOwned; +use serde::Serialize; + + + +// ==================== +// === SendingError === +// ==================== + +/// Errors emitted by the `MockTransport`. +#[derive(Clone,Copy,Debug,Fail)] +pub enum SendError { + /// Cannot send message while the connection is closed. + #[fail(display = "Cannot send message when socket is closed.")] + TransportClosed, +} + + + +// ======================== +// === Transport Status === +// ======================== + +/// Status of the `MockTransport`. +#[derive(Clone,Copy,Debug)] +pub enum Status { + /// Transport is functional, can send messages. + Open, + /// Transport is not functional at the moment, cannot send messages. + Closed +} + + + +// ====================== +// === Transport Data === +// ====================== + +/// Mock transport shared data. Collects all the messages sent by the owner. +/// +/// Allows mocking messages from the peer. +#[derive(Debug,Default)] +pub struct MockTransportData { + /// Events sink. + pub event_tx : Option>, + /// Messages sent by the user. + pub sent_msgs : VecDeque, + /// Transport status. + pub is_closed : bool, +} + + + +// ====================== +// === Mock Transport === +// ====================== + +/// Shareable wrapper over `MockTransportData`. +#[derive(Clone,Debug,Default)] +pub struct MockTransport(Rc>); + +impl Transport for MockTransport { + fn send_text(&mut self, text:String) -> Result<(), Error> { + self.with_mut_data(|data| { + if data.is_closed { + Err(SendError::TransportClosed)? + } else { + data.sent_msgs.push_back(text.clone()); + Ok(()) + } + }) + } + + fn set_event_tx(&mut self, tx:std::sync::mpsc::Sender) { + self.with_mut_data(|data| { + data.event_tx = Some(tx); + }) + } +} + +impl MockTransport { + /// Create a new `MockTransport`. + pub fn new() -> MockTransport { + MockTransport::default() + } + + /// Executes given function with access to borrowed mutable data reference. + pub fn with_mut_data(&mut self, f:F) -> R + where F: FnOnce(&mut MockTransportData) -> R { + let mut data = self.0.borrow_mut(); + f(&mut data) + } + + /// Generates event that mocks receiving a text message from a peer. + pub fn mock_peer_message_text>(&mut self, message:S) { + let message = message.into(); + if let Some(ref tx) = self.0.borrow_mut().event_tx { + let _ = tx.send(TransportEvent::TextMessage(message)); + } + } + + /// Generates event that mocks receiving a text message from a peer with + /// serialized JSON contents. + pub fn mock_peer_message(&mut self, message:T) { + let text = serde_json::to_string(&message); + let text = text.expect("failed to serialize mock message"); + self.mock_peer_message_text(text) + } + + /// Mocks event generated when peer closes the socket (or connection is lost + /// for any other reason). + pub fn mock_connection_closed(&mut self) { + self.with_mut_data(|data| { + if let Some(ref tx) = data.event_tx { + data.is_closed = true; + let _ = tx.send(TransportEvent::Closed); + } + }) + } + + /// Takes the message sent by the client and returns its texts. + /// + /// If the client did not sent any messages, panics. + /// If the client sent multiple messages, the first one is returned. + /// Further messages can be obtained by subsequent calls. + pub fn expect_message_text(&mut self) -> String { + self.with_mut_data(|data| { + data.sent_msgs.pop_front().expect("client should have sent request") + }) + } + + /// Similar to `expect_message_text` but deserializes the message into + /// given type `T` from JSON. + pub fn expect_message(&mut self) -> T { + let text = self.expect_message_text(); + let res = serde_json::from_str(&text); + res.expect("failed to deserialize client's message") + } +} diff --git a/common/rust/json-rpc/src/test_util/transport/mod.rs b/common/rust/json-rpc/src/test_util/transport/mod.rs new file mode 100644 index 0000000000..6e47061ba8 --- /dev/null +++ b/common/rust/json-rpc/src/test_util/transport/mod.rs @@ -0,0 +1,3 @@ +//! Test utilities for the Transport. + +pub mod mock; diff --git a/common/rust/json-rpc/src/transport.rs b/common/rust/json-rpc/src/transport.rs index bca9c333ab..99b0335bb8 100644 --- a/common/rust/json-rpc/src/transport.rs +++ b/common/rust/json-rpc/src/transport.rs @@ -19,15 +19,6 @@ pub trait Transport : Debug { fn set_event_tx(&mut self, tx:std::sync::mpsc::Sender); } -impl Transport for Rc> { - fn send_text(&mut self, message:String) -> Result<(), Error> { - self.borrow_mut().send_text(message) - } - fn set_event_tx(&mut self, tx:std::sync::mpsc::Sender) { - self.borrow_mut().set_event_tx(tx) - } -} - /// An event generated by the `Transport`. #[derive(Debug)] pub enum TransportEvent { diff --git a/common/rust/json-rpc/tests/test.rs b/common/rust/json-rpc/tests/test.rs index 422655e8da..d551491380 100644 --- a/common/rust/json-rpc/tests/test.rs +++ b/common/rust/json-rpc/tests/test.rs @@ -1,5 +1,7 @@ use prelude::*; +use futures::FutureExt; +use futures::Stream; use json_rpc::*; use json_rpc::api::RemoteMethodCall; use json_rpc::api::Result; @@ -7,20 +9,17 @@ use json_rpc::error::RpcError; use json_rpc::error::HandlingError; use json_rpc::messages::Id; use json_rpc::messages::Message; -use json_rpc::messages::Notification; use json_rpc::messages::Version; -use json_rpc::transport::TransportEvent; - -use failure::Error; -use futures::FutureExt; -use futures::task::Context; -use serde::de::DeserializeOwned; -use serde::Serialize; +use json_rpc::test_util::transport::mock::MockTransport; use serde::Deserialize; +use serde::Serialize; use std::future::Future; use std::pin::Pin; -use std::task::Poll; use std::sync::mpsc::TryRecvError; +use utils::poll_future_output; +use utils::poll_stream_output; + +type MockEvent = json_rpc::handler::Event; @@ -66,126 +65,23 @@ type MockResponseMessage = messages::ResponseMessage; -// ====================== -// === Mock Transport === -// ====================== - -#[derive(Debug, Fail)] -enum MockError { - #[fail(display = "Cannot send message when socket is closed.")] - TransportClosed, -} - -#[derive(Debug)] -struct MockTransport { - pub event_tx : Option>, - pub sent_msgs : Vec, - pub is_closed : bool, -} - -impl Transport for MockTransport { - fn set_event_tx(&mut self, tx:std::sync::mpsc::Sender) { - self.event_tx = Some(tx); - } - - fn send_text(&mut self, text:String) -> std::result::Result<(), Error> { - if self.is_closed { - Err(MockError::TransportClosed)? - } else { - self.sent_msgs.push(text.clone()); - Ok(()) - } - } -} - -impl MockTransport { - pub fn new() -> MockTransport { - MockTransport { - event_tx : None, - sent_msgs : Vec::new(), - is_closed : false, - } - } - - pub fn mock_peer_message_text>(&mut self, message:S) { - let message = message.into(); - if let Some(ref tx) = self.event_tx { - let _ = tx.send(TransportEvent::TextMessage(message)); - } - } - - pub fn mock_peer_message(&mut self, message:T) { - let text = serde_json::to_string(&message).expect("failed to serialize"); - self.mock_peer_message_text(text) - } - - pub fn mock_connection_closed(&mut self) { - if let Some(ref tx) = self.event_tx { - self.is_closed = true; - let _ = tx.send(TransportEvent::Closed); - } - } - - pub fn expect_message_text(&mut self) -> String { - self.sent_msgs.pop().expect("client should have sent request") - } - - pub fn expect_message(&mut self) -> T { - let text = self.expect_message_text(); - let res = serde_json::from_str(&text); - res.expect("failed to deserialize client's message") - } -} - - - -// ================ -// === Executor === -// ================ - -/// Polls the future, performing any available work. If future is complete, -/// returns result. Otherwise, returns control when stalled. -fn poll_for_output(f:&mut Pin>) -> Option { - let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); - match f.as_mut().poll(&mut ctx) { - Poll::Ready(result) => Some(result), - Poll::Pending => None, - } -} - - - // =================== // === Mock Client === // =================== -#[derive(Debug)] pub struct Client { - pub handler :Handler, - pub errors :Rc>>, - pub notifications:Rc>>>, + pub handler : Handler, + pub events_stream : Pin>>, } impl Client { pub fn new(transport:impl Transport + 'static) -> Client { let mut handler = Handler::new(transport); - let errors = Rc::new(RefCell::new(Vec::new())); - let notifications = Rc::new(RefCell::new(Vec::new())); - - let errors2 = errors.clone(); - handler.on_error.set(move |e| { - errors2.borrow_mut().push(e); - }); - - let notifications2 = notifications.clone(); - handler.on_notification.set(move |e| { - notifications2.borrow_mut().push(e); - }); + let events_stream = Box::pin(handler.events()); Client { handler, - errors, - notifications, + events_stream, } } @@ -198,18 +94,30 @@ impl Client { self.handler.process_events() } + pub fn try_get_event(&mut self) -> Option { + poll_stream_output(&mut self.events_stream) + } + pub fn try_get_notification(&mut self) -> Option { - let n = self.notifications.borrow_mut().pop()?; - self.handler.decode_notification::(n.0) + let event = self.try_get_event()?; + if let MockEvent::Notification(n) = event { + Some(n) + } else { + None + } } pub fn expect_notification(&mut self) -> MockNotification { - self.try_get_notification().unwrap() + self.try_get_notification().expect("expected notification event") } pub fn expect_handling_error(&mut self) -> HandlingError { - let handling_error = self.errors.borrow_mut().pop(); - handling_error.expect("there should be an error") + let event = self.try_get_event().expect("no events, while expected error event"); + if let json_rpc::handler::Event::Error(err) = event { + err + } else { + panic!("expected error event, encountered: {:?}", event) + } } } @@ -219,44 +127,44 @@ impl Client { // === Test === // ============ -fn setup() -> (Rc>, Client) { - let ws = Rc::new(RefCell::new(MockTransport::new())); - let fm = Client::new(ws.clone()); - (ws,fm) +fn setup() -> (MockTransport, Client) { + let transport = MockTransport::new(); + let client = Client::new(transport.clone()); + (transport,client) } #[test] fn test_success_call() { - let (ws, mut fm) = setup(); - let call_input = 8; - let mut fut = Box::pin(fm.pow(8)); - let expected_first_request_id = Id(0); + let (mut transport, mut client) = setup(); + let call_input = 8; + let mut fut = Box::pin(client.pow(8)); + let expected_first_request_id = Id(0); // validate request sent - let req_msg = ws.borrow_mut().expect_message::(); + let req_msg = transport.expect_message::(); assert_eq!(req_msg.id, expected_first_request_id); assert_eq!(req_msg.method, MockRequest::NAME); assert_eq!(req_msg.i, call_input); assert_eq!(req_msg.jsonrpc, Version::V2); - assert!(poll_for_output(&mut fut).is_none()); // no reply + assert!(poll_future_output(&mut fut).is_none()); // no reply // let's reply let reply = pow_impl(req_msg); - ws.borrow_mut().mock_peer_message(reply); + transport.mock_peer_message(reply); // before tick message should be in buffer and callbacks should not // complete - assert!(poll_for_output(&mut fut).is_none()); // not ticked + assert!(poll_future_output(&mut fut).is_none()); // not ticked // now tick - fm.process_events(); - if let Err(TryRecvError::Empty) = fm.handler.incoming_events.try_recv() { + client.process_events(); + if let Err(TryRecvError::Empty) = client.handler.incoming_events.try_recv() { // ok } else { panic!("All messages from the buffer should be already processed"); } - let result = poll_for_output(&mut fut); + let result = poll_future_output(&mut fut); let result = result.expect("result should be present"); let result = result.expect("result should be a success"); assert_eq!(result, 8*8); @@ -264,26 +172,26 @@ fn test_success_call() { #[test] fn test_error_call() { - let (ws, mut fm) = setup(); - let mut fut = Box::pin(fm.pow(8)); - assert!(poll_for_output(&mut fut).is_none()); // no reply + let (mut transport, mut client) = setup(); + let mut fut = Box::pin(client.pow(8)); + assert!(poll_future_output(&mut fut).is_none()); // no reply // reply with error - let req_msg = ws.borrow_mut().expect_message::(); - let error_code = 5; + let req_msg = transport.expect_message::(); + let error_code = 5; let error_description = "wrong!"; - let error_data = None; + let error_data = None; let error_msg: MockResponseMessage = Message::new_error( req_msg.id, error_code, error_description.into(), error_data.clone(), ); - ws.borrow_mut().mock_peer_message(error_msg); + transport.mock_peer_message(error_msg); // receive error - fm.process_events(); - let result = poll_for_output(&mut fut); + client.process_events(); + let result = poll_future_output(&mut fut); let result = result.expect("result should be present"); let result = result.expect_err("result should be a failure"); if let RpcError::RemoteError(e) = result { @@ -297,13 +205,13 @@ fn test_error_call() { #[test] fn test_garbage_reply_error() { - let (ws, mut fm) = setup(); - let mut fut = Box::pin(fm.pow(8)); - assert!(poll_for_output(&mut fut).is_none()); // no reply - ws.borrow_mut().mock_peer_message_text("hello, nice to meet you"); - fm.process_events(); - assert!(poll_for_output(&mut fut).is_none()); // no valid reply - let internal_error = fm.expect_handling_error(); + let (mut transport, mut client) = setup(); + let mut fut = Box::pin(client.pow(8)); + assert!(poll_future_output(&mut fut).is_none()); // no reply + transport.mock_peer_message_text("hello, nice to meet you"); + client.process_events(); + assert!(poll_future_output(&mut fut).is_none()); // no valid reply + let internal_error = client.expect_handling_error(); if let HandlingError::InvalidMessage(_) = internal_error { } else { panic!("Expected an error to be InvalidMessage"); @@ -312,13 +220,13 @@ fn test_garbage_reply_error() { #[test] fn test_disconnect_error() { - let (ws, mut fm) = setup(); - let mut fut = Box::pin(fm.pow(8)); - assert!(poll_for_output(&mut fut).is_none()); // no reply - ws.borrow_mut().mock_connection_closed(); - assert!(poll_for_output(&mut fut).is_none()); // no reply - fm.process_events(); - let result = poll_for_output(&mut fut); + let (mut transport, mut client) = setup(); + let mut fut = Box::pin(client.pow(8)); + assert!(poll_future_output(&mut fut).is_none()); // no reply + transport.mock_connection_closed(); + assert!(poll_future_output(&mut fut).is_none()); // no reply + client.process_events(); + let result = poll_future_output(&mut fut); let result = result.expect("result should be present"); let result = result.expect_err("result should be a failure"); if let RpcError::LostConnection = result {} else { @@ -328,23 +236,23 @@ fn test_disconnect_error() { #[test] fn test_sending_while_disconnected() { - let (ws, mut fm) = setup(); - ws.borrow_mut().mock_connection_closed(); - let mut fut = Box::pin(fm.pow(8)); - let result = poll_for_output(&mut fut).unwrap(); + let (mut transport, mut client) = setup(); + transport.mock_connection_closed(); + let mut fut = Box::pin(client.pow(8)); + let result = poll_future_output(&mut fut).unwrap(); assert!(result.is_err()) } fn test_notification(mock_notif:MockNotification) { - let (ws, mut fm) = setup(); - let message = Message::new(mock_notif.clone()); - assert!(fm.notifications.borrow().is_empty()); - ws.borrow_mut().mock_peer_message(message.clone()); - assert!(fm.notifications.borrow().is_empty()); - fm.process_events(); - assert_eq!(fm.notifications.borrow().is_empty(), false); - let notif = fm.expect_notification(); - assert_eq!(notif, mock_notif); + let (mut transport, mut client) = setup(); + let message = Message::new(mock_notif.clone()); + assert!(client.try_get_notification().is_none()); + transport.mock_peer_message(message.clone()); + assert!(client.try_get_notification().is_none()); + client.process_events(); + let notification = client.try_get_notification(); + assert_eq!(notification.is_none(), false); + assert_eq!(notification, Some(mock_notif)); } #[test] @@ -364,18 +272,14 @@ fn test_handling_invalid_notification() { "params": [1,2,3,4,5] }"#; - let (ws, mut fm) = setup(); - assert!(fm.notifications.borrow().is_empty()); - ws.borrow_mut().mock_peer_message_text(other_notification); - assert!(fm.notifications.borrow().is_empty()); - fm.process_events(); - assert_eq!(fm.notifications.borrow().is_empty(), false); - assert_eq!(fm.try_get_notification(), None); - let internal_error = fm.expect_handling_error(); + let (mut transport, mut client) = setup(); + assert!(client.try_get_notification().is_none()); + transport.mock_peer_message_text(other_notification); + assert!(client.try_get_notification().is_none()); + client.process_events(); + let internal_error = client.expect_handling_error(); if let HandlingError::InvalidNotification(_) = internal_error {} else { panic!("expected InvalidNotification error"); } } - - diff --git a/common/rust/shapely/core/src/lib.rs b/common/rust/shapely/core/src/lib.rs index 395f0f4086..39ce9abb4a 100644 --- a/common/rust/shapely/core/src/lib.rs +++ b/common/rust/shapely/core/src/lib.rs @@ -41,6 +41,7 @@ where G: Generator + Unpin { pub struct EmptyIterator(PhantomData); impl EmptyIterator { + /// Create a new empty iterator. pub fn new() -> Self { Self(PhantomData) } diff --git a/common/rust/shapely/macros/src/lib.rs b/common/rust/shapely/macros/src/lib.rs index 0f23a8849a..c835c1ed62 100644 --- a/common/rust/shapely/macros/src/lib.rs +++ b/common/rust/shapely/macros/src/lib.rs @@ -6,6 +6,7 @@ extern crate proc_macro; mod derive_iterator; +#[allow(unused)] use prelude::*; use crate::derive_iterator::IsMut; diff --git a/common/rust/utils/Cargo.toml b/common/rust/utils/Cargo.toml new file mode 100644 index 0000000000..9957fe6f06 --- /dev/null +++ b/common/rust/utils/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "utils" +version = "0.1.0" +authors = ["Enso Team "] +edition = "2018" + +[lib] +crate-type = ["cdylib", "rlib"] + +[dependencies] +futures = "0.3.1" +prelude = { version = "0.1.0" , path = "../prelude" } diff --git a/common/rust/utils/src/lib.rs b/common/rust/utils/src/lib.rs new file mode 100644 index 0000000000..f90a6cf8f6 --- /dev/null +++ b/common/rust/utils/src/lib.rs @@ -0,0 +1,52 @@ +//! General purpose functions to be reused between components, not belonging to +//! any other crate and yet not worth of being split into their own creates. + +#![warn(missing_docs)] +#![warn(trivial_casts)] +#![warn(trivial_numeric_casts)] +#![warn(unused_import_braces)] +#![warn(unused_qualifications)] +#![warn(unsafe_code)] +#![warn(missing_copy_implementations)] +#![warn(missing_debug_implementations)] + +#[allow(unused)] +use prelude::*; + +use futures::Stream; +use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + + + +// ====================== +// === Task Execution === +// ====================== + +/// Polls the future, performing any available work. +/// +/// If future is complete, returns result. Otherwise, returns control when +/// stalled. +/// It is not legal to call this on future that already completed. +pub fn poll_future_output(f:&mut Pin>) -> Option { + let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); + match f.as_mut().poll(&mut ctx) { + Poll::Ready(result) => Some(result), + Poll::Pending => None, + } +} + +/// Polls the stream, performing any available work. If a new value is +/// ready, returns it. +/// +/// Note that this API hides the difference between value not being available +/// yet and stream being finished. +pub fn poll_stream_output(f:&mut Pin>) -> Option { + let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); + match f.as_mut().poll_next(&mut ctx) { + Poll::Ready(result) => result, + Poll::Pending => None, + } +}