File Manager Client Libary for Rust (#454)

ref #423
This commit is contained in:
Michał Wawrzyniec Urbańczyk 2020-01-25 02:38:59 +01:00 committed by GitHub
parent 053df2ac2b
commit 175212bf4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1014 additions and 286 deletions

View File

@ -2,11 +2,13 @@
members = [
"common/rust/ast/core",
"common/rust/json-rpc",
"common/rust/ast/macros",
"common/rust/file-manager",
"common/rust/json-rpc",
"common/rust/macro-utils",
"common/rust/parser",
"common/rust/prelude",
"common/rust/shapely/core",
"common/rust/shapely/macros"
"common/rust/shapely/macros",
"common/rust/utils",
]

View File

@ -0,0 +1,21 @@
[package]
name = "file-manager-client"
version = "0.1.0"
authors = ["Enso Team <contact@luna-lang.org>"]
edition = "2018"
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies]
futures = "0.3.1"
paste = "0.1.6"
serde_json = "1.0"
chrono = { version = "0.4" , features = ["serde"] }
serde = { version = "1.0" , features = ["derive"] }
uuid = { version = "0.8" , features = ["serde", "v5"] }
json-rpc = { version = "0.1.0" , path = "../json-rpc" }
prelude = { version = "0.1.0" , path = "../prelude" }
utils = { version = "0.1.0" , path = "../utils" }

View File

@ -0,0 +1,136 @@
File Manager consists of pair server and client. They communicate by exchaning
messages, following JSON-RPC 2.0.
# Setup
Establish websocket connection.
In future it is expected that some kind of authorization will be required by the
server. As of now, its details remain unspecified.
# General protocol
Remote calls made between File Manager client and server follow [JSON-RPC 2.0
protocol](https://www.jsonrpc.org/specification).
There are two primary cases:
* RPC calls from client to [server methods](#Methods);
* [Notifications](#Notifications) sent from server to client.
All messages are text with JSON-encoded values.
File Manager accepts only method calls (request objects).
File Manager responds with call results and may send notifications.
# Methods
| Method | Input | Result |
|---------------|------------------------------|------------|
| copyDirectory | {from:Path, to:Path} | () |
| copyFile | {from:Path, to:Path} | () |
| deleteFile | {path:Path} | () |
| exists | {path:Path} | Boolean |
| list | {path:Path} | [Path] |
| moveDirectory | {from:Path, to:Path} | () |
| moveFile | {from:Path, to:Path} | () |
| read | {path:Path} | String |
| status | {path:Path} | Attributes |
| touch | {path:Path} | () |
| write | {path:Path, contents:String} | () |
| createWatch | {path:Path} | UUID |
| deleteWatch | {watchId:UUID} | () |
Where `()` is a unit value.
# Notifications
Notifications are emitted by the server.
| Method | Input | Result |
|-----------------|-----------------------------|--------|
| filesystemEvent | {path:Path, kind:EventKind} | N/A |
`filesystemEvent` notification is emitted for paths that are tracked by watch
(i.e. subtree of a location passed to `createWatch` method).
It should be noted that watch notifications are not reliable and significantly
os-dependent.
# Types
```
Attributes = struct {
creationTime : FileTime,
lastAccess_time : FileTime,
lastModified_time : FileTime,
fileKind : FileKind,
byteSize : u64,
}
EventKind = enum { Created, Deleted, Modified, Overflow }
FileKind = enum { Directory, RegularFile, SymbolicLink, Other }
```
# JSON Encoding
Struct values are serialized as map, e.g. `{ "field_name" : field_value }`.
Enum values are serialized as map `{ "variant_name" : variant_value }` or just
`"variant_name"` if there variants have no inner value.
Transitive enums (i.e. enums of enums) are flattened, no intermediate variant
names shall appear.
`()` (unit value) is serialized as `null`.
`FileTime` value is serialized as a string compliant with RFC3339 / ISO8601 text
format, e.g. `"2020-01-07T21:25:26Z"`.
`Path` is serialized as JSON string value, e.g. `"./Main.luna"`.
`UUID` is serialzied as string using 8-4-4-4-12 format, e.g.
`"02723954-fbb0-4641-af53-cec0883f260a"`.
`u64` is an unsigned 64-bit integer value.
## Examples
### Call to `exists` method
#### Request (call)
```json
{
"jsonrpc" : "2.0",
"id" : 0,
"method" : "exists",
"input" : { "path" : "./Main.luna" }
}
```
#### Response
```json
{
"jsonrpc" : "2.0",
"id" : 0,
"result" : true
}
```
### Filesystem Event Notification
#### Request (notification)
```json
{
"jsonrpc" : "2.0",
"method" : "filesystemEvent",
"params" : { "path" : "./Main.luna", "kind" : "Modified" }
}
```
Notification requests gets no response.
### `Attributes` structure
`Attributes` value may be serialized to a following JSON:
```json
{
"creationTime" : "2020-01-07T21:25:26Z",
"lastAccessTime" : "2020-01-21T22:16:51.123994500+00:00",
"lastModifiedTime" : "2020-01-07T21:25:26Z",
"fileKind" : "RegularFile",
"sizeInBytes" : 125125
}
```

View File

@ -0,0 +1,442 @@
//! Client library for the JSON-RPC-based File Manager service.
#![warn(missing_docs)]
#![warn(trivial_casts)]
#![warn(trivial_numeric_casts)]
#![warn(unused_import_braces)]
#![warn(unused_qualifications)]
#![warn(unsafe_code)]
#![warn(missing_copy_implementations)]
#![warn(missing_debug_implementations)]
use prelude::*;
use json_rpc::api::Result;
use json_rpc::Handler;
use futures::Stream;
use serde::Serialize;
use serde::Deserialize;
use std::future::Future;
use uuid::Uuid;
// =============
// === Event ===
// =============
/// Event emitted by the File Manager `Client`.
pub type Event = json_rpc::handler::Event<Notification>;
// ============
// === Path ===
// ============
/// Path to a file.
#[derive(Clone,Debug,Display,Eq,Hash,PartialEq,PartialOrd,Ord)]
#[derive(Serialize, Deserialize)]
#[derive(Shrinkwrap)]
pub struct Path(pub String);
impl Path {
/// Wraps a `String`-like entity into a new `Path`.
pub fn new<S>(s:S) -> Path where S:Into<String> {
Path(s.into())
}
}
// ====================
// === Notification ===
// ====================
/// Notification generated by the File Manager.
#[derive(Clone,Debug,PartialEq)]
#[derive(Serialize, Deserialize)]
#[serde(tag="method", content="params")]
pub enum Notification {
/// Filesystem event occurred for a watched path.
#[serde(rename = "filesystemEvent")]
FilesystemEvent(FilesystemEvent),
}
// =======================
// === FilesystemEvent ===
// =======================
/// Filesystem event notification, generated by an active file watch.
#[derive(Clone,Debug,PartialEq)]
#[derive(Serialize, Deserialize)]
pub struct FilesystemEvent {
/// Path of the file that the event is about.
pub path : Path,
/// What kind of event is it.
pub kind : FilesystemEventKind
}
/// Describes kind of filesystem event (was the file created or deleted, etc.)
#[derive(Clone,Copy,Debug,PartialEq)]
#[derive(Serialize, Deserialize)]
pub enum FilesystemEventKind {
/// A new file under path was created.
Created,
/// Existing file under path was deleted.
Deleted,
/// File under path was modified.
Modified,
/// An overflow occurred and some events were lost,
Overflow
}
// ==================
// === Attributes ===
// ==================
/// Attributes of the file in the filesystem.
#[derive(Clone,Copy,Debug,PartialEq)]
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Attributes{
/// When the file was created.
pub creation_time : FileTime,
/// When the file was last accessed.
pub last_access_time : FileTime,
/// When the file was last modified.
pub last_modified_time : FileTime,
/// What kind of file is this.
pub file_kind : FileKind,
/// Size of the file in bytes.
/// (size of files not being `RegularFile`s is unspecified).
pub byte_size : u64
}
/// A filesystem's timestamp.
pub type FileTime = chrono::DateTime<chrono::FixedOffset>;
/// What kind of file (regular, directory, symlink) is this.
#[derive(Clone,Copy,Debug,PartialEq)]
#[derive(Serialize, Deserialize)]
pub enum FileKind {
/// File being a directory.
Directory,
/// File being a symbolic link.
SymbolicLink,
/// File being a regular file with opaque content.
RegularFile,
/// File being none of the above, e.g. a physical device or a pipe.
Other
}
// ==============
// === Client ===
// ==============
/// File Manager client. Contains numerous asynchronous methods for remote calls
/// on File Manager server. Also, allows obtaining events stream by calling
/// `events`.
#[derive(Debug)]
pub struct Client {
/// JSON-RPC protocol handler.
handler : Handler<Notification>,
}
impl Client {
/// Create a new File Manager client that will use given transport.
pub fn new(transport:impl json_rpc::Transport + 'static) -> Client {
let handler = Handler::new(transport);
Client { handler }
}
/// Asynchronous event stream with notification and errors.
///
/// On a repeated call, previous stream is closed.
pub fn events(&mut self) -> impl Stream<Item = Event> {
self.handler.events()
}
/// Method that should be called on each frame.
///
/// Processes incoming transport events, generating File Manager events and
/// driving asynchronous calls to completion.
pub fn process_events(&mut self) {
self.handler.process_events()
}
}
// ===================
// === RPC Methods ===
// ===================
// === Helper macro ===
/// Macro that generates a asynchronous method making relevant RPC call to the
/// server. First three args is the name appropriately in CamelCase,
/// snake_case, camelCase. Then goes the function signature, in form of
/// `(arg:Arg) -> Ret`.
///
/// Macro generates:
/// * a method in Client named `snake_case` that takes `(arg:Arg)` and returns
/// `Future<Ret>`.
/// * a structure named `CamelCase` that stores function arguments as fields and
/// its JSON serialization conforms to JSON-RPC (yielding `method` and
/// `params` fields).
/// * `snakeCase` is the name of the remote method.
macro_rules! make_rpc_method {
( $name_typename:ident
$name:ident
$name_ext:ident
($($arg:ident : $type:ty),* $(,)?) -> $out:ty ) => {
paste::item! {
impl Client {
/// Remote call to the method on the File Manager Server.
pub fn $name
(&mut self, $($arg:$type),*) -> impl Future<Output=Result<$out>> {
let input = [<$name_typename Input>] { $($arg:$arg),* };
self.handler.open_request(input)
}
}
/// Structure transporting method arguments.
#[derive(Serialize,Deserialize,Debug,PartialEq)]
#[serde(rename_all = "camelCase")]
struct [<$name_typename Input>] {
$($arg : $type),*
}
impl json_rpc::RemoteMethodCall for [<$name_typename Input>] {
const NAME:&'static str = stringify!($name_ext);
type Returned = $out;
}
}}
}
// === Remote API definition ===
make_rpc_method!(CopyDirectory copy_directory copyDirectory (from:Path, to:Path) -> () );
make_rpc_method!(CopyFile copy_file copyFile (from:Path, to:Path) -> () );
make_rpc_method!(DeleteFile delete_file deleteFile (path:Path) -> () );
make_rpc_method!(Exists exists exists (path:Path) -> bool );
make_rpc_method!(List list list (path:Path) -> Vec<Path> );
make_rpc_method!(MoveDirectory move_directory moveDirectory (from:Path, to:Path) -> () );
make_rpc_method!(MoveFile move_file moveFile (from:Path, to:Path) -> () );
make_rpc_method!(Read read read (path:Path) -> String );
make_rpc_method!(Status status status (path:Path) -> Attributes);
make_rpc_method!(Touch touch touch (path:Path) -> () );
make_rpc_method!(Write write write (path:Path, contents:String) -> () );
make_rpc_method!(CreateWatch create_watch createWatch (path:Path) -> Uuid );
make_rpc_method!(DeleteWatch delete_watch deleteWatch (watch_id:Uuid) -> () );
// =============
// === Tests ===
// =============
#[cfg(test)]
mod tests {
use super::*;
use super::FileKind::RegularFile;
use json_rpc::messages::Message;
use json_rpc::messages::RequestMessage;
use json_rpc::test_util::transport::mock::MockTransport;
use serde_json::json;
use serde_json::Value;
use std::future::Future;
use utils::poll_future_output;
use utils::poll_stream_output;
fn setup_fm() -> (MockTransport, Client) {
let transport = MockTransport::new();
let client = Client::new(transport.clone());
(transport,client)
}
#[test]
fn test_notification() {
let (mut transport, mut client) = setup_fm();
let mut events = Box::pin(client.events());
assert!(poll_stream_output(&mut events).is_none());
let expected_notification = FilesystemEvent {
path : Path::new("./Main.luna"),
kind : FilesystemEventKind::Modified,
};
let notification_text = r#"{
"jsonrpc": "2.0",
"method": "filesystemEvent",
"params": {"path" : "./Main.luna", "kind" : "Modified"}
}"#;
transport.mock_peer_message_text(notification_text);
assert!(poll_stream_output(&mut events).is_none());
client.process_events();
let event = poll_stream_output(&mut events);
if let Some(Event::Notification(n)) = event {
assert_eq!(n, Notification::FilesystemEvent(expected_notification));
} else {
panic!("expected notification event");
}
}
/// Tests making a request using file manager:
/// * creates FM client and uses `make_request` to make a request
/// * checks that request is made for `expected_method`
/// * checks that request input is `expected_input`
/// * mocks receiving a response from server with `result`
/// * checks that FM-returned Future yields `expected_output`
fn test_request<Fun, Fut, T>
( make_request:Fun
, expected_method:&str
, expected_input:Value
, result:Value
, expected_output:T )
where Fun : FnOnce(&mut Client) -> Fut,
Fut : Future<Output = Result<T>>,
T : Debug + PartialEq {
let (mut transport, mut client) = setup_fm();
let mut fut = Box::pin(make_request(&mut client));
let request = transport.expect_message::<RequestMessage<Value>>();
assert_eq!(request.method, expected_method);
assert_eq!(request.input, expected_input);
let response = Message::new_success(request.id, result);
transport.mock_peer_message(response);
client.process_events();
let output = poll_future_output(&mut fut).unwrap().unwrap();
assert_eq!(output, expected_output);
}
#[test]
fn test_requests() {
let main = Path::new("./Main.luna");
let target = Path::new("./Target.luna");
let path_main = json!({"path" : "./Main.luna"});
let from_main_to_target = json!({
"from" : "./Main.luna",
"to" : "./Target.luna"
});
let true_json = json!(true);
let unit_json = json!(null);
test_request(
|client| client.copy_directory(main.clone(), target.clone()),
"copyDirectory",
from_main_to_target.clone(),
unit_json.clone(),
());
test_request(
|client| client.copy_file(main.clone(), target.clone()),
"copyFile",
from_main_to_target.clone(),
unit_json.clone(),
());
test_request(
|client| client.delete_file(main.clone()),
"deleteFile",
path_main.clone(),
unit_json.clone(),
());
test_request(
|client| client.exists(main.clone()),
"exists",
path_main.clone(),
true_json,
true);
let list_response_json = json!([ "Bar.luna", "Foo.luna" ]);
let list_response_value = vec! [Path::new("Bar.luna"),Path::new("Foo.luna")];
test_request(
|client| client.list(main.clone()),
"list",
path_main.clone(),
list_response_json,
list_response_value);
test_request(
|client| client.move_directory(main.clone(), target.clone()),
"moveDirectory",
from_main_to_target.clone(),
unit_json.clone(),
());
test_request(
|client| client.move_file(main.clone(), target.clone()),
"moveFile",
from_main_to_target.clone(),
unit_json.clone(),
());
test_request(
|client| client.read(main.clone()),
"read",
path_main.clone(),
json!("Hello world!"),
"Hello world!".into());
let parse_rfc3339 = |s| {
chrono::DateTime::parse_from_rfc3339(s).unwrap()
};
let expected_attributes = Attributes {
creation_time : parse_rfc3339("2020-01-07T21:25:26Z"),
last_access_time : parse_rfc3339("2020-01-21T22:16:51.123994500+00:00"),
last_modified_time : parse_rfc3339("2020-01-07T21:25:26Z"),
file_kind : RegularFile,
byte_size : 125125,
};
let sample_attributes_json = json!({
"creationTime" : "2020-01-07T21:25:26Z",
"lastAccessTime" : "2020-01-21T22:16:51.123994500+00:00",
"lastModifiedTime" : "2020-01-07T21:25:26Z",
"fileKind" : "RegularFile",
"byteSize" : 125125
});
test_request(
|client| client.status(main.clone()),
"status",
path_main.clone(),
sample_attributes_json,
expected_attributes);
test_request(
|client| client.touch(main.clone()),
"touch",
path_main.clone(),
unit_json.clone(),
());
test_request(
|client| client.write(main.clone(), "Hello world!".into()),
"write",
json!({"path" : "./Main.luna", "contents" : "Hello world!"}),
unit_json.clone(),
());
let uuid_value = uuid::Uuid::parse_str("02723954-fbb0-4641-af53-cec0883f260a").unwrap();
let uuid_json = json!("02723954-fbb0-4641-af53-cec0883f260a");
test_request(
|client| client.create_watch(main.clone()),
"createWatch",
path_main.clone(),
uuid_json.clone(),
uuid_value);
let watch_id = json!({
"watchId" : "02723954-fbb0-4641-af53-cec0883f260a"
});
test_request(
|client| client.delete_watch(uuid_value.clone()),
"deleteWatch",
watch_id.clone(),
unit_json.clone(),
());
}
}

View File

@ -9,8 +9,9 @@ crate-type = ["cdylib", "rlib"]
[dependencies]
prelude = { version = "0.1.0" , path = "../prelude" }
utils = { version = "0.1.0" , path = "../utils" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
shrinkwraprs = "0.3.0"
futures = "0.3.1"
failure = "0.1.6"
serde_json = "1.0"
shrinkwraprs = "0.3.0"

View File

@ -8,12 +8,13 @@ use crate::error::HandlingError;
use crate::error::RpcError;
use crate::messages;
use crate::messages::Id;
use crate::messages::IncomingMessage;
use crate::messages::Message;
use crate::transport::Transport;
use crate::transport::TransportEvent;
use futures::FutureExt;
use futures::Stream;
use futures::channel::mpsc::unbounded;
use futures::channel::mpsc::UnboundedSender;
use futures::channel::oneshot;
use serde::de::DeserializeOwned;
use std::future::Future;
@ -117,47 +118,19 @@ impl SharedBuffer {
// ================
// === Callback ===
// ================
// =============
// === Event ===
// =============
/// An optional callback procedure taking `T`.
pub struct Callback<T> {
cb: Option<Box<dyn Fn(T) -> ()>>
}
impl<T> Debug for Callback<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{}", match self.cb {
Some(_) => "Some(<function>)",
None => "None",
})
}
}
impl<T> Callback<T> {
/// Create a new, empty callaback.
pub fn new() -> Callback<T> {
Callback { cb:None }
}
/// Sets callback to the given callable.
pub fn set<F : Fn(T) -> () + 'static>(&mut self, cb:F) {
self.cb = Some(Box::new(cb));
}
/// Clears the previously set callback (if any).
pub fn unset(&mut self) {
self.cb = None;
}
/// Calls the provided callback with `t`. Does nothing, if no callback was
/// provided.
pub fn try_call(&mut self, t:T) {
if let Some(cb) = &self.cb {
cb(t);
}
}
/// Event emitted by the `Handler<N>`.
#[derive(Debug)]
pub enum Event<N> {
/// Transport has been closed.
Closed,
/// Error occurred.
Error(HandlingError),
/// Notification received.
Notification(N),
}
@ -177,10 +150,13 @@ pub type OngoingCalls = HashMap<Id,oneshot::Sender<ReplyMessage>>;
/// It allows making request, where method calls are described by values
/// implementing `RemoteMethodCall`. The response is returned as a `Future`.
///
/// Notifications and internal messages are emitted using an optionally set
/// callbacks.
/// Notifications and internal messages are emitted using the `events` stream.
///
/// `Notification` is a type for notifications. It should implement
/// `DeserializeOwned` and deserialize from JSON maps with `method` and `params`
/// fields.
#[derive(Debug)]
pub struct Handler {
pub struct Handler<Notification> {
/// Contains handles to calls that were made but no response has came.
pub ongoing_calls : OngoingCalls,
/// Provides identifiers for requests.
@ -189,25 +165,22 @@ pub struct Handler {
pub transport : Box<dyn Transport>,
/// Allows receiving events from the `Transport`.
pub incoming_events : std::sync::mpsc::Receiver<TransportEvent>,
/// Callback called when internal error happens.
pub on_error : Callback<HandlingError>,
/// Callback called when notification from server is received.
pub on_notification : Callback<messages::Notification<serde_json::Value>>,
/// Handle to send outgoing events.
pub outgoing_events : Option<UnboundedSender<Event<Notification>>>,
}
impl Handler {
impl<Notification> Handler<Notification> {
/// Creates a new handler working on a given `Transport`.
///
/// `Transport` must be functional (e.g. not in the process of opening).
pub fn new(transport:impl Transport + 'static) -> Handler {
pub fn new(transport:impl Transport + 'static) -> Handler<Notification> {
let (event_tx, event_rx) = std::sync::mpsc::channel();
let mut ret = Handler {
ongoing_calls : OngoingCalls::new(),
id_generator : IdGenerator::new(),
transport : Box::new(transport),
incoming_events : event_rx,
on_error : Callback::new(),
on_notification : Callback::new(),
outgoing_events : None,
};
ret.transport.set_event_tx(event_tx);
ret
@ -243,7 +216,7 @@ impl Handler {
if let Some(sender) = self.ongoing_calls.remove(&message.id) {
// Disregard any error. We do not care if RPC caller already
// dropped the future.
let _ = sender.send(message.result);
sender.send(message.result).ok();
} else {
self.error_occurred(HandlingError::UnexpectedResponse(message));
}
@ -251,32 +224,30 @@ impl Handler {
/// Deal with `Notification` message from the peer.
///
/// It shall be announced using `on_notification` callback, allowing a
/// specific API client to properly deal with message details.
/// If possible, emits a message with notification. In case of failure,
/// emits relevant error.
pub fn process_notification
(&mut self, message:messages::Notification<serde_json::Value>) {
self.on_notification.try_call(message);
(&mut self, message:messages::Notification<serde_json::Value>)
where Notification: DeserializeOwned {
match serde_json::from_value(message.0) {
Ok(notification) => {
let event = Event::Notification(notification);
self.send_event(event);
},
Err(e) => {
let err = HandlingError::InvalidNotification(e);
self.error_occurred(err);
}
}
/// Partially decodes incoming message.
///
/// This checks if has `jsonrpc` version string, and whetehr it is a
/// response or a notification.
pub fn decode_incoming_message(&mut self, message:String)
-> serde_json::Result<IncomingMessage> {
use serde_json::Value;
use serde_json::from_str;
use serde_json::from_value;
let message = from_str::<Message<Value>>(&message)?;
from_value::<IncomingMessage>(message.payload)
}
/// Deal with incoming text message from the peer.
///
/// The message must conform either to the `Response` or to the
/// `Notification` JSON-serialized format. Otherwise, an error is raised.
pub fn process_incoming_message(&mut self, message:String) {
match self.decode_incoming_message(message) {
pub fn process_incoming_message(&mut self, message:String)
where Notification: DeserializeOwned {
match messages::decode_incoming_message(message) {
Ok(messages::IncomingMessage::Response(response)) =>
self.process_response(response),
Ok(messages::IncomingMessage::Notification(notification)) =>
@ -289,11 +260,14 @@ impl Handler {
/// With with a handling error. Uses `on_error` callback to notify the
/// owner.
pub fn error_occurred(&mut self, error: HandlingError) {
self.on_error.try_call(error);
self.send_event(Event::Error(error))
}
/// Processes a single transport event.
pub fn process_event(&mut self, event:TransportEvent) {
///
/// Each event either completes a requests or is translated into `Event`.
pub fn process_event(&mut self, event:TransportEvent)
where Notification: DeserializeOwned {
match event {
TransportEvent::TextMessage(msg) =>
self.process_incoming_message(msg),
@ -301,6 +275,7 @@ impl Handler {
// Dropping all ongoing calls will mark their futures as
// cancelled.
self.ongoing_calls.clear();
self.send_event(Event::Closed);
}
}
}
@ -311,7 +286,8 @@ impl Handler {
/// This will decode the incoming messages, providing input to the futures
/// returned from RPC calls.
/// Also this cancels any ongoing calls if the connection was lost.
pub fn process_events(&mut self) {
pub fn process_events(&mut self)
where Notification: DeserializeOwned {
loop {
match self.incoming_events.try_recv() {
Ok(event) => self.process_event(event),
@ -322,19 +298,32 @@ impl Handler {
}
}
/// Decode expected notification type from JSON.
/// Sends a handler event to the event stream.
pub fn send_event(&mut self, event:Event<Notification>) {
if let Some(tx) = self.outgoing_events.as_mut() {
match tx.unbounded_send(event) {
Ok(()) => {},
Err(e) =>
if e.is_full() {
// Impossible, as per `futures` library docs.
panic!("unbounded channel should never be full")
} else if e.is_disconnected() {
// It is ok for receiver to disconnect and ignore events.
} else {
// Never happens unless `futures` library changes API.
panic!("unknown unexpected error")
}
}
}
}
/// Creates a new stream with events from this handler.
///
/// Returns Some on success and None on failure. Addittionaly, a handling
/// error is raised in case of failure.
pub fn decode_notification<N:DeserializeOwned>
(&mut self, json:serde_json::Value) -> Option<N> {
match serde_json::from_value(json) {
Ok(ret) => ret,
Err(e) => {
let err = HandlingError::InvalidNotification(e);
self.error_occurred(err);
None
}
}
/// If such stream was already existing, it will be finished (and
/// continuations should be able to process any remaining events).
pub fn events(&mut self) -> impl Stream<Item = Event<Notification>> {
let (tx,rx) = unbounded();
self.outgoing_events = Some(tx);
rx
}
}

View File

@ -15,8 +15,12 @@ pub mod api;
pub mod error;
pub mod handler;
pub mod messages;
pub mod test_util;
pub mod transport;
pub use api::RemoteMethodCall;
pub use api::Result;
pub use transport::Transport;
pub use transport::TransportEvent;
pub use handler::Event;
pub use handler::Handler;

View File

@ -186,6 +186,19 @@ pub enum IncomingMessage {
Notification(Notification<serde_json::Value>),
}
/// Partially decodes incoming message.
///
/// This checks if has `jsonrpc` version string, and whether it is a
/// response or a notification.
pub fn decode_incoming_message
(message:String) -> serde_json::Result<IncomingMessage> {
use serde_json::Value;
use serde_json::from_str;
use serde_json::from_value;
let message = from_str::<Message<Value>>(&message)?;
from_value::<IncomingMessage>(message.payload)
}
/// Message from server to client.
///
/// `In` is any serializable (or already serialized) representation of the

View File

@ -0,0 +1,5 @@
//! Test utilities. Should not be used in production code.
//!
//! Reusable code for other crates that want to test usage of this crate.
pub mod transport;

View File

@ -0,0 +1,151 @@
//! Module provides a `MockTransport` that implements `Transport`.
//!
//! It is meant to be used in tests.
use prelude::*;
use crate::transport::Transport;
use crate::transport::TransportEvent;
use std::collections::VecDeque;
use failure::Error;
use serde::de::DeserializeOwned;
use serde::Serialize;
// ====================
// === SendingError ===
// ====================
/// Errors emitted by the `MockTransport`.
#[derive(Clone,Copy,Debug,Fail)]
pub enum SendError {
/// Cannot send message while the connection is closed.
#[fail(display = "Cannot send message when socket is closed.")]
TransportClosed,
}
// ========================
// === Transport Status ===
// ========================
/// Status of the `MockTransport`.
#[derive(Clone,Copy,Debug)]
pub enum Status {
/// Transport is functional, can send messages.
Open,
/// Transport is not functional at the moment, cannot send messages.
Closed
}
// ======================
// === Transport Data ===
// ======================
/// Mock transport shared data. Collects all the messages sent by the owner.
///
/// Allows mocking messages from the peer.
#[derive(Debug,Default)]
pub struct MockTransportData {
/// Events sink.
pub event_tx : Option<std::sync::mpsc::Sender<TransportEvent>>,
/// Messages sent by the user.
pub sent_msgs : VecDeque<String>,
/// Transport status.
pub is_closed : bool,
}
// ======================
// === Mock Transport ===
// ======================
/// Shareable wrapper over `MockTransportData`.
#[derive(Clone,Debug,Default)]
pub struct MockTransport(Rc<RefCell<MockTransportData>>);
impl Transport for MockTransport {
fn send_text(&mut self, text:String) -> Result<(), Error> {
self.with_mut_data(|data| {
if data.is_closed {
Err(SendError::TransportClosed)?
} else {
data.sent_msgs.push_back(text.clone());
Ok(())
}
})
}
fn set_event_tx(&mut self, tx:std::sync::mpsc::Sender<TransportEvent>) {
self.with_mut_data(|data| {
data.event_tx = Some(tx);
})
}
}
impl MockTransport {
/// Create a new `MockTransport`.
pub fn new() -> MockTransport {
MockTransport::default()
}
/// Executes given function with access to borrowed mutable data reference.
pub fn with_mut_data<R,F>(&mut self, f:F) -> R
where F: FnOnce(&mut MockTransportData) -> R {
let mut data = self.0.borrow_mut();
f(&mut data)
}
/// Generates event that mocks receiving a text message from a peer.
pub fn mock_peer_message_text<S:Into<String>>(&mut self, message:S) {
let message = message.into();
if let Some(ref tx) = self.0.borrow_mut().event_tx {
let _ = tx.send(TransportEvent::TextMessage(message));
}
}
/// Generates event that mocks receiving a text message from a peer with
/// serialized JSON contents.
pub fn mock_peer_message<T:Serialize>(&mut self, message:T) {
let text = serde_json::to_string(&message);
let text = text.expect("failed to serialize mock message");
self.mock_peer_message_text(text)
}
/// Mocks event generated when peer closes the socket (or connection is lost
/// for any other reason).
pub fn mock_connection_closed(&mut self) {
self.with_mut_data(|data| {
if let Some(ref tx) = data.event_tx {
data.is_closed = true;
let _ = tx.send(TransportEvent::Closed);
}
})
}
/// Takes the message sent by the client and returns its texts.
///
/// If the client did not sent any messages, panics.
/// If the client sent multiple messages, the first one is returned.
/// Further messages can be obtained by subsequent calls.
pub fn expect_message_text(&mut self) -> String {
self.with_mut_data(|data| {
data.sent_msgs.pop_front().expect("client should have sent request")
})
}
/// Similar to `expect_message_text` but deserializes the message into
/// given type `T` from JSON.
pub fn expect_message<T:DeserializeOwned>(&mut self) -> T {
let text = self.expect_message_text();
let res = serde_json::from_str(&text);
res.expect("failed to deserialize client's message")
}
}

View File

@ -0,0 +1,3 @@
//! Test utilities for the Transport.
pub mod mock;

View File

@ -19,15 +19,6 @@ pub trait Transport : Debug {
fn set_event_tx(&mut self, tx:std::sync::mpsc::Sender<TransportEvent>);
}
impl<T: Transport> Transport for Rc<RefCell<T>> {
fn send_text(&mut self, message:String) -> Result<(), Error> {
self.borrow_mut().send_text(message)
}
fn set_event_tx(&mut self, tx:std::sync::mpsc::Sender<TransportEvent>) {
self.borrow_mut().set_event_tx(tx)
}
}
/// An event generated by the `Transport`.
#[derive(Debug)]
pub enum TransportEvent {

View File

@ -1,5 +1,7 @@
use prelude::*;
use futures::FutureExt;
use futures::Stream;
use json_rpc::*;
use json_rpc::api::RemoteMethodCall;
use json_rpc::api::Result;
@ -7,20 +9,17 @@ use json_rpc::error::RpcError;
use json_rpc::error::HandlingError;
use json_rpc::messages::Id;
use json_rpc::messages::Message;
use json_rpc::messages::Notification;
use json_rpc::messages::Version;
use json_rpc::transport::TransportEvent;
use failure::Error;
use futures::FutureExt;
use futures::task::Context;
use serde::de::DeserializeOwned;
use serde::Serialize;
use json_rpc::test_util::transport::mock::MockTransport;
use serde::Deserialize;
use serde::Serialize;
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use std::sync::mpsc::TryRecvError;
use utils::poll_future_output;
use utils::poll_stream_output;
type MockEvent = json_rpc::handler::Event<MockNotification>;
@ -66,126 +65,23 @@ type MockResponseMessage = messages::ResponseMessage<MockResponse>;
// ======================
// === Mock Transport ===
// ======================
#[derive(Debug, Fail)]
enum MockError {
#[fail(display = "Cannot send message when socket is closed.")]
TransportClosed,
}
#[derive(Debug)]
struct MockTransport {
pub event_tx : Option<std::sync::mpsc::Sender<TransportEvent>>,
pub sent_msgs : Vec<String>,
pub is_closed : bool,
}
impl Transport for MockTransport {
fn set_event_tx(&mut self, tx:std::sync::mpsc::Sender<TransportEvent>) {
self.event_tx = Some(tx);
}
fn send_text(&mut self, text:String) -> std::result::Result<(), Error> {
if self.is_closed {
Err(MockError::TransportClosed)?
} else {
self.sent_msgs.push(text.clone());
Ok(())
}
}
}
impl MockTransport {
pub fn new() -> MockTransport {
MockTransport {
event_tx : None,
sent_msgs : Vec::new(),
is_closed : false,
}
}
pub fn mock_peer_message_text<S:Into<String>>(&mut self, message:S) {
let message = message.into();
if let Some(ref tx) = self.event_tx {
let _ = tx.send(TransportEvent::TextMessage(message));
}
}
pub fn mock_peer_message<T:Serialize>(&mut self, message:T) {
let text = serde_json::to_string(&message).expect("failed to serialize");
self.mock_peer_message_text(text)
}
pub fn mock_connection_closed(&mut self) {
if let Some(ref tx) = self.event_tx {
self.is_closed = true;
let _ = tx.send(TransportEvent::Closed);
}
}
pub fn expect_message_text(&mut self) -> String {
self.sent_msgs.pop().expect("client should have sent request")
}
pub fn expect_message<T:DeserializeOwned>(&mut self) -> T {
let text = self.expect_message_text();
let res = serde_json::from_str(&text);
res.expect("failed to deserialize client's message")
}
}
// ================
// === Executor ===
// ================
/// Polls the future, performing any available work. If future is complete,
/// returns result. Otherwise, returns control when stalled.
fn poll_for_output<F : Future>(f:&mut Pin<Box<F>>) -> Option<F::Output> {
let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
match f.as_mut().poll(&mut ctx) {
Poll::Ready(result) => Some(result),
Poll::Pending => None,
}
}
// ===================
// === Mock Client ===
// ===================
#[derive(Debug)]
pub struct Client {
pub handler :Handler,
pub errors :Rc<RefCell<Vec<HandlingError>>>,
pub notifications:Rc<RefCell<Vec<Notification<serde_json::Value>>>>,
pub handler : Handler<MockNotification>,
pub events_stream : Pin<Box<dyn Stream<Item = MockEvent>>>,
}
impl Client {
pub fn new(transport:impl Transport + 'static) -> Client {
let mut handler = Handler::new(transport);
let errors = Rc::new(RefCell::new(Vec::new()));
let notifications = Rc::new(RefCell::new(Vec::new()));
let errors2 = errors.clone();
handler.on_error.set(move |e| {
errors2.borrow_mut().push(e);
});
let notifications2 = notifications.clone();
handler.on_notification.set(move |e| {
notifications2.borrow_mut().push(e);
});
let events_stream = Box::pin(handler.events());
Client {
handler,
errors,
notifications,
events_stream,
}
}
@ -198,18 +94,30 @@ impl Client {
self.handler.process_events()
}
pub fn try_get_event(&mut self) -> Option<MockEvent> {
poll_stream_output(&mut self.events_stream)
}
pub fn try_get_notification(&mut self) -> Option<MockNotification> {
let n = self.notifications.borrow_mut().pop()?;
self.handler.decode_notification::<MockNotification>(n.0)
let event = self.try_get_event()?;
if let MockEvent::Notification(n) = event {
Some(n)
} else {
None
}
}
pub fn expect_notification(&mut self) -> MockNotification {
self.try_get_notification().unwrap()
self.try_get_notification().expect("expected notification event")
}
pub fn expect_handling_error(&mut self) -> HandlingError {
let handling_error = self.errors.borrow_mut().pop();
handling_error.expect("there should be an error")
let event = self.try_get_event().expect("no events, while expected error event");
if let json_rpc::handler::Event::Error(err) = event {
err
} else {
panic!("expected error event, encountered: {:?}", event)
}
}
}
@ -219,44 +127,44 @@ impl Client {
// === Test ===
// ============
fn setup() -> (Rc<RefCell<MockTransport>>, Client) {
let ws = Rc::new(RefCell::new(MockTransport::new()));
let fm = Client::new(ws.clone());
(ws,fm)
fn setup() -> (MockTransport, Client) {
let transport = MockTransport::new();
let client = Client::new(transport.clone());
(transport,client)
}
#[test]
fn test_success_call() {
let (ws, mut fm) = setup();
let (mut transport, mut client) = setup();
let call_input = 8;
let mut fut = Box::pin(fm.pow(8));
let mut fut = Box::pin(client.pow(8));
let expected_first_request_id = Id(0);
// validate request sent
let req_msg = ws.borrow_mut().expect_message::<MockRequestMessage>();
let req_msg = transport.expect_message::<MockRequestMessage>();
assert_eq!(req_msg.id, expected_first_request_id);
assert_eq!(req_msg.method, MockRequest::NAME);
assert_eq!(req_msg.i, call_input);
assert_eq!(req_msg.jsonrpc, Version::V2);
assert!(poll_for_output(&mut fut).is_none()); // no reply
assert!(poll_future_output(&mut fut).is_none()); // no reply
// let's reply
let reply = pow_impl(req_msg);
ws.borrow_mut().mock_peer_message(reply);
transport.mock_peer_message(reply);
// before tick message should be in buffer and callbacks should not
// complete
assert!(poll_for_output(&mut fut).is_none()); // not ticked
assert!(poll_future_output(&mut fut).is_none()); // not ticked
// now tick
fm.process_events();
if let Err(TryRecvError::Empty) = fm.handler.incoming_events.try_recv() {
client.process_events();
if let Err(TryRecvError::Empty) = client.handler.incoming_events.try_recv() {
// ok
} else {
panic!("All messages from the buffer should be already processed");
}
let result = poll_for_output(&mut fut);
let result = poll_future_output(&mut fut);
let result = result.expect("result should be present");
let result = result.expect("result should be a success");
assert_eq!(result, 8*8);
@ -264,12 +172,12 @@ fn test_success_call() {
#[test]
fn test_error_call() {
let (ws, mut fm) = setup();
let mut fut = Box::pin(fm.pow(8));
assert!(poll_for_output(&mut fut).is_none()); // no reply
let (mut transport, mut client) = setup();
let mut fut = Box::pin(client.pow(8));
assert!(poll_future_output(&mut fut).is_none()); // no reply
// reply with error
let req_msg = ws.borrow_mut().expect_message::<MockRequestMessage>();
let req_msg = transport.expect_message::<MockRequestMessage>();
let error_code = 5;
let error_description = "wrong!";
let error_data = None;
@ -279,11 +187,11 @@ fn test_error_call() {
error_description.into(),
error_data.clone(),
);
ws.borrow_mut().mock_peer_message(error_msg);
transport.mock_peer_message(error_msg);
// receive error
fm.process_events();
let result = poll_for_output(&mut fut);
client.process_events();
let result = poll_future_output(&mut fut);
let result = result.expect("result should be present");
let result = result.expect_err("result should be a failure");
if let RpcError::RemoteError(e) = result {
@ -297,13 +205,13 @@ fn test_error_call() {
#[test]
fn test_garbage_reply_error() {
let (ws, mut fm) = setup();
let mut fut = Box::pin(fm.pow(8));
assert!(poll_for_output(&mut fut).is_none()); // no reply
ws.borrow_mut().mock_peer_message_text("hello, nice to meet you");
fm.process_events();
assert!(poll_for_output(&mut fut).is_none()); // no valid reply
let internal_error = fm.expect_handling_error();
let (mut transport, mut client) = setup();
let mut fut = Box::pin(client.pow(8));
assert!(poll_future_output(&mut fut).is_none()); // no reply
transport.mock_peer_message_text("hello, nice to meet you");
client.process_events();
assert!(poll_future_output(&mut fut).is_none()); // no valid reply
let internal_error = client.expect_handling_error();
if let HandlingError::InvalidMessage(_) = internal_error {
} else {
panic!("Expected an error to be InvalidMessage");
@ -312,13 +220,13 @@ fn test_garbage_reply_error() {
#[test]
fn test_disconnect_error() {
let (ws, mut fm) = setup();
let mut fut = Box::pin(fm.pow(8));
assert!(poll_for_output(&mut fut).is_none()); // no reply
ws.borrow_mut().mock_connection_closed();
assert!(poll_for_output(&mut fut).is_none()); // no reply
fm.process_events();
let result = poll_for_output(&mut fut);
let (mut transport, mut client) = setup();
let mut fut = Box::pin(client.pow(8));
assert!(poll_future_output(&mut fut).is_none()); // no reply
transport.mock_connection_closed();
assert!(poll_future_output(&mut fut).is_none()); // no reply
client.process_events();
let result = poll_future_output(&mut fut);
let result = result.expect("result should be present");
let result = result.expect_err("result should be a failure");
if let RpcError::LostConnection = result {} else {
@ -328,23 +236,23 @@ fn test_disconnect_error() {
#[test]
fn test_sending_while_disconnected() {
let (ws, mut fm) = setup();
ws.borrow_mut().mock_connection_closed();
let mut fut = Box::pin(fm.pow(8));
let result = poll_for_output(&mut fut).unwrap();
let (mut transport, mut client) = setup();
transport.mock_connection_closed();
let mut fut = Box::pin(client.pow(8));
let result = poll_future_output(&mut fut).unwrap();
assert!(result.is_err())
}
fn test_notification(mock_notif:MockNotification) {
let (ws, mut fm) = setup();
let (mut transport, mut client) = setup();
let message = Message::new(mock_notif.clone());
assert!(fm.notifications.borrow().is_empty());
ws.borrow_mut().mock_peer_message(message.clone());
assert!(fm.notifications.borrow().is_empty());
fm.process_events();
assert_eq!(fm.notifications.borrow().is_empty(), false);
let notif = fm.expect_notification();
assert_eq!(notif, mock_notif);
assert!(client.try_get_notification().is_none());
transport.mock_peer_message(message.clone());
assert!(client.try_get_notification().is_none());
client.process_events();
let notification = client.try_get_notification();
assert_eq!(notification.is_none(), false);
assert_eq!(notification, Some(mock_notif));
}
#[test]
@ -364,18 +272,14 @@ fn test_handling_invalid_notification() {
"params": [1,2,3,4,5]
}"#;
let (ws, mut fm) = setup();
assert!(fm.notifications.borrow().is_empty());
ws.borrow_mut().mock_peer_message_text(other_notification);
assert!(fm.notifications.borrow().is_empty());
fm.process_events();
assert_eq!(fm.notifications.borrow().is_empty(), false);
assert_eq!(fm.try_get_notification(), None);
let internal_error = fm.expect_handling_error();
let (mut transport, mut client) = setup();
assert!(client.try_get_notification().is_none());
transport.mock_peer_message_text(other_notification);
assert!(client.try_get_notification().is_none());
client.process_events();
let internal_error = client.expect_handling_error();
if let HandlingError::InvalidNotification(_) = internal_error {}
else {
panic!("expected InvalidNotification error");
}
}

View File

@ -41,6 +41,7 @@ where G: Generator<Return = ()> + Unpin {
pub struct EmptyIterator<T>(PhantomData<T>);
impl<T> EmptyIterator<T> {
/// Create a new empty iterator.
pub fn new() -> Self {
Self(PhantomData)
}

View File

@ -6,6 +6,7 @@ extern crate proc_macro;
mod derive_iterator;
#[allow(unused)]
use prelude::*;
use crate::derive_iterator::IsMut;

View File

@ -0,0 +1,12 @@
[package]
name = "utils"
version = "0.1.0"
authors = ["Enso Team <contact@luna-lang.org>"]
edition = "2018"
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies]
futures = "0.3.1"
prelude = { version = "0.1.0" , path = "../prelude" }

View File

@ -0,0 +1,52 @@
//! General purpose functions to be reused between components, not belonging to
//! any other crate and yet not worth of being split into their own creates.
#![warn(missing_docs)]
#![warn(trivial_casts)]
#![warn(trivial_numeric_casts)]
#![warn(unused_import_braces)]
#![warn(unused_qualifications)]
#![warn(unsafe_code)]
#![warn(missing_copy_implementations)]
#![warn(missing_debug_implementations)]
#[allow(unused)]
use prelude::*;
use futures::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
// ======================
// === Task Execution ===
// ======================
/// Polls the future, performing any available work.
///
/// If future is complete, returns result. Otherwise, returns control when
/// stalled.
/// It is not legal to call this on future that already completed.
pub fn poll_future_output<F : Future>(f:&mut Pin<Box<F>>) -> Option<F::Output> {
let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
match f.as_mut().poll(&mut ctx) {
Poll::Ready(result) => Some(result),
Poll::Pending => None,
}
}
/// Polls the stream, performing any available work. If a new value is
/// ready, returns it.
///
/// Note that this API hides the difference between value not being available
/// yet and stream being finished.
pub fn poll_stream_output<S : Stream + ?Sized>(f:&mut Pin<Box<S>>) -> Option<S::Item> {
let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
match f.as_mut().poll_next(&mut ctx) {
Poll::Ready(result) => result,
Poll::Pending => None,
}
}