mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-21 07:31:34 +03:00
Format Rust code using rustfmt
This commit is contained in:
parent
035b394a6c
commit
4990e83dee
@ -550,7 +550,6 @@ async fn main() {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
// gracefully abort all running processes in kernel
|
// gracefully abort all running processes in kernel
|
||||||
let _ = kernel_message_sender
|
let _ = kernel_message_sender
|
||||||
.send(KernelMessage {
|
.send(KernelMessage {
|
||||||
|
@ -1060,7 +1060,7 @@ pub enum VfsAction {
|
|||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub enum AddEntryType {
|
pub enum AddEntryType {
|
||||||
Dir,
|
Dir,
|
||||||
NewFile, // add a new file to fs and add name in vfs
|
NewFile, // add a new file to fs and add name in vfs
|
||||||
ZipArchive,
|
ZipArchive,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
110
src/vfs.rs
110
src/vfs.rs
@ -1,12 +1,12 @@
|
|||||||
|
use dashmap::DashMap;
|
||||||
use std::collections::{HashMap, VecDeque};
|
use std::collections::{HashMap, VecDeque};
|
||||||
use std::io::prelude::*;
|
use std::io::prelude::*;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::{Mutex, MutexGuard};
|
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::fs::OpenOptions;
|
use tokio::fs::OpenOptions;
|
||||||
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
|
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
|
||||||
use dashmap::DashMap;
|
use tokio::sync::{Mutex, MutexGuard};
|
||||||
|
|
||||||
use crate::types::*;
|
use crate::types::*;
|
||||||
|
|
||||||
@ -27,7 +27,7 @@ pub async fn vfs(
|
|||||||
let open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>> = Arc::new(DashMap::new());
|
let open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>> = Arc::new(DashMap::new());
|
||||||
let process_queues = Arc::new(Mutex::new(
|
let process_queues = Arc::new(Mutex::new(
|
||||||
HashMap::<ProcessId, VecDeque<KernelMessage>>::new(),
|
HashMap::<ProcessId, VecDeque<KernelMessage>>::new(),
|
||||||
));
|
));
|
||||||
// note: queues should be based on drive, not process
|
// note: queues should be based on drive, not process
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
@ -40,7 +40,7 @@ pub async fn vfs(
|
|||||||
);
|
);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// clone Arcs for thread
|
// clone Arcs for thread
|
||||||
let our_node = our_node.clone();
|
let our_node = our_node.clone();
|
||||||
let send_to_caps_oracle = send_to_caps_oracle.clone();
|
let send_to_caps_oracle = send_to_caps_oracle.clone();
|
||||||
@ -48,9 +48,9 @@ pub async fn vfs(
|
|||||||
let send_to_loop = send_to_loop.clone();
|
let send_to_loop = send_to_loop.clone();
|
||||||
let open_files = open_files.clone();
|
let open_files = open_files.clone();
|
||||||
let vfs_path = vfs_path.clone();
|
let vfs_path = vfs_path.clone();
|
||||||
|
|
||||||
let mut process_lock = process_queues.lock().await;
|
let mut process_lock = process_queues.lock().await;
|
||||||
|
|
||||||
if let Some(queue) = process_lock.get_mut(&km.source.process) {
|
if let Some(queue) = process_lock.get_mut(&km.source.process) {
|
||||||
queue.push_back(km.clone());
|
queue.push_back(km.clone());
|
||||||
} else {
|
} else {
|
||||||
@ -58,10 +58,10 @@ pub async fn vfs(
|
|||||||
new_queue.push_back(km.clone());
|
new_queue.push_back(km.clone());
|
||||||
process_lock.insert(km.source.process.clone(), new_queue);
|
process_lock.insert(km.source.process.clone(), new_queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
// clone Arc for thread
|
// clone Arc for thread
|
||||||
let process_queues_clone = process_queues.clone();
|
let process_queues_clone = process_queues.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut process_lock = process_queues_clone.lock().await;
|
let mut process_lock = process_queues_clone.lock().await;
|
||||||
if let Some(km) = process_lock.get_mut(&km.source.process).and_then(|q| q.pop_front()) {
|
if let Some(km) = process_lock.get_mut(&km.source.process).and_then(|q| q.pop_front()) {
|
||||||
@ -126,8 +126,13 @@ async fn handle_request(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
check_caps(our_node.clone(), source.clone(), send_to_caps_oracle.clone(), &request)
|
check_caps(
|
||||||
.await?;
|
our_node.clone(),
|
||||||
|
source.clone(),
|
||||||
|
send_to_caps_oracle.clone(),
|
||||||
|
&request,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let (ipc, bytes) = match request.action {
|
let (ipc, bytes) = match request.action {
|
||||||
VfsAction::New => {
|
VfsAction::New => {
|
||||||
@ -140,13 +145,17 @@ async fn handle_request(
|
|||||||
} => {
|
} => {
|
||||||
match entry_type {
|
match entry_type {
|
||||||
AddEntryType::Dir => {
|
AddEntryType::Dir => {
|
||||||
let path = validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
let path =
|
||||||
|
validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone())
|
||||||
|
.await?;
|
||||||
|
|
||||||
// create dir.
|
// create dir.
|
||||||
fs::create_dir_all(path).await.unwrap();
|
fs::create_dir_all(path).await.unwrap();
|
||||||
}
|
}
|
||||||
AddEntryType::NewFile => {
|
AddEntryType::NewFile => {
|
||||||
let path = validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
let path =
|
||||||
|
validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone())
|
||||||
|
.await?;
|
||||||
// open and create file
|
// open and create file
|
||||||
let file = open_file(open_files.clone(), path).await?;
|
let file = open_file(open_files.clone(), path).await?;
|
||||||
let mut file = file.lock().await;
|
let mut file = file.lock().await;
|
||||||
@ -187,7 +196,8 @@ async fn handle_request(
|
|||||||
if is_file {
|
if is_file {
|
||||||
// create file
|
// create file
|
||||||
println!("writing a file!, orig filename {:?}", path);
|
println!("writing a file!, orig filename {:?}", path);
|
||||||
let path = validate_path(vfs_path.clone(), request.drive.clone(), path).await?;
|
let path = validate_path(vfs_path.clone(), request.drive.clone(), path)
|
||||||
|
.await?;
|
||||||
println!("with the path: {:?}", path);
|
println!("with the path: {:?}", path);
|
||||||
println!("and original path: {:?}", full_path);
|
println!("and original path: {:?}", full_path);
|
||||||
let file = open_file(open_files.clone(), path).await?;
|
let file = open_file(open_files.clone(), path).await?;
|
||||||
@ -195,9 +205,13 @@ async fn handle_request(
|
|||||||
let mut file = file.lock().await;
|
let mut file = file.lock().await;
|
||||||
file.write_all(&file_contents).await.unwrap();
|
file.write_all(&file_contents).await.unwrap();
|
||||||
println!("actually wrote file!");
|
println!("actually wrote file!");
|
||||||
|
|
||||||
} else if is_dir {
|
} else if is_dir {
|
||||||
let path = validate_path(vfs_path.clone(), request.drive.clone(), path.clone()).await?;
|
let path = validate_path(
|
||||||
|
vfs_path.clone(),
|
||||||
|
request.drive.clone(),
|
||||||
|
path.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
// If it's a directory, create it
|
// If it's a directory, create it
|
||||||
fs::create_dir_all(path).await.unwrap();
|
fs::create_dir_all(path).await.unwrap();
|
||||||
@ -211,7 +225,8 @@ async fn handle_request(
|
|||||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||||
}
|
}
|
||||||
VfsAction::Delete(mut full_path) => {
|
VfsAction::Delete(mut full_path) => {
|
||||||
let path = validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
let path =
|
||||||
|
validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
||||||
|
|
||||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||||
}
|
}
|
||||||
@ -219,7 +234,8 @@ async fn handle_request(
|
|||||||
mut full_path,
|
mut full_path,
|
||||||
offset,
|
offset,
|
||||||
} => {
|
} => {
|
||||||
let path = validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
let path =
|
||||||
|
validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
||||||
let file = open_file(open_files.clone(), path).await?;
|
let file = open_file(open_files.clone(), path).await?;
|
||||||
let mut file = file.lock().await;
|
let mut file = file.lock().await;
|
||||||
file.seek(SeekFrom::Start(offset)).await.unwrap();
|
file.seek(SeekFrom::Start(offset)).await.unwrap();
|
||||||
@ -227,7 +243,8 @@ async fn handle_request(
|
|||||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||||
}
|
}
|
||||||
VfsAction::Append(mut full_path) => {
|
VfsAction::Append(mut full_path) => {
|
||||||
let path = validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
let path =
|
||||||
|
validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
||||||
let file = open_file(open_files.clone(), path).await?;
|
let file = open_file(open_files.clone(), path).await?;
|
||||||
let mut file = file.lock().await;
|
let mut file = file.lock().await;
|
||||||
file.seek(SeekFrom::End(0)).await.unwrap();
|
file.seek(SeekFrom::End(0)).await.unwrap();
|
||||||
@ -239,7 +256,8 @@ async fn handle_request(
|
|||||||
mut full_path,
|
mut full_path,
|
||||||
size,
|
size,
|
||||||
} => {
|
} => {
|
||||||
let path = validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
let path =
|
||||||
|
validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
||||||
let file = open_file(open_files.clone(), path).await?;
|
let file = open_file(open_files.clone(), path).await?;
|
||||||
let mut file = file.lock().await;
|
let mut file = file.lock().await;
|
||||||
file.set_len(size).await.unwrap();
|
file.set_len(size).await.unwrap();
|
||||||
@ -248,18 +266,26 @@ async fn handle_request(
|
|||||||
}
|
}
|
||||||
VfsAction::GetEntry(mut full_path) => {
|
VfsAction::GetEntry(mut full_path) => {
|
||||||
println!("getting entry for path: {:?}", full_path);
|
println!("getting entry for path: {:?}", full_path);
|
||||||
let path = validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
let path =
|
||||||
|
validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
||||||
println!("getentry path resolved to: {:?}", path);
|
println!("getentry path resolved to: {:?}", path);
|
||||||
let metadata = fs::metadata(&path).await.unwrap();
|
let metadata = fs::metadata(&path).await.unwrap();
|
||||||
if metadata.is_dir() {
|
if metadata.is_dir() {
|
||||||
let mut children = Vec::new();
|
let mut children = Vec::new();
|
||||||
let mut entries = fs::read_dir(&path).await.unwrap();
|
let mut entries = fs::read_dir(&path).await.unwrap();
|
||||||
|
|
||||||
while let Some(entry) = entries.next_entry().await.unwrap() {
|
while let Some(entry) = entries.next_entry().await.unwrap() {
|
||||||
children.push(entry.path().display().to_string());
|
children.push(entry.path().display().to_string());
|
||||||
}
|
}
|
||||||
|
|
||||||
(serde_json::to_vec(&VfsResponse::GetEntry { is_file: false, children }).unwrap(), None)
|
(
|
||||||
|
serde_json::to_vec(&VfsResponse::GetEntry {
|
||||||
|
is_file: false,
|
||||||
|
children,
|
||||||
|
})
|
||||||
|
.unwrap(),
|
||||||
|
None,
|
||||||
|
)
|
||||||
} else if metadata.is_file() {
|
} else if metadata.is_file() {
|
||||||
println!("is file!");
|
println!("is file!");
|
||||||
let file = open_file(open_files.clone(), path).await?;
|
let file = open_file(open_files.clone(), path).await?;
|
||||||
@ -268,10 +294,16 @@ async fn handle_request(
|
|||||||
let mut contents = Vec::new();
|
let mut contents = Vec::new();
|
||||||
file.read_to_end(&mut contents).await.unwrap();
|
file.read_to_end(&mut contents).await.unwrap();
|
||||||
println!("read contents to last part");
|
println!("read contents to last part");
|
||||||
(serde_json::to_vec(&VfsResponse::GetEntry { is_file: true, children: Vec::new() }).unwrap(), Some(contents))
|
(
|
||||||
|
serde_json::to_vec(&VfsResponse::GetEntry {
|
||||||
|
is_file: true,
|
||||||
|
children: Vec::new(),
|
||||||
|
})
|
||||||
|
.unwrap(),
|
||||||
|
Some(contents),
|
||||||
|
)
|
||||||
} else {
|
} else {
|
||||||
return Err(VfsError::InternalError)
|
return Err(VfsError::InternalError);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
VfsAction::GetFileChunk {
|
VfsAction::GetFileChunk {
|
||||||
@ -279,31 +311,33 @@ async fn handle_request(
|
|||||||
offset,
|
offset,
|
||||||
length,
|
length,
|
||||||
} => {
|
} => {
|
||||||
let path = validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
let path =
|
||||||
|
validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
||||||
let file = open_file(open_files.clone(), path).await?;
|
let file = open_file(open_files.clone(), path).await?;
|
||||||
let mut file = file.lock().await;
|
let mut file = file.lock().await;
|
||||||
let mut contents = vec![0; length as usize];
|
let mut contents = vec![0; length as usize];
|
||||||
|
|
||||||
file.seek(SeekFrom::Start(offset)).await.unwrap();
|
file.seek(SeekFrom::Start(offset)).await.unwrap();
|
||||||
file.read_exact(&mut contents).await.unwrap();
|
file.read_exact(&mut contents).await.unwrap();
|
||||||
(
|
(
|
||||||
serde_json::to_vec(&VfsResponse::GetFileChunk).unwrap(),
|
serde_json::to_vec(&VfsResponse::GetFileChunk).unwrap(),
|
||||||
Some(contents),
|
Some(contents),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
VfsAction::GetEntryLength(mut full_path) => {
|
VfsAction::GetEntryLength(mut full_path) => {
|
||||||
let path = validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
let path =
|
||||||
|
validate_path(vfs_path.clone(), request.drive.clone(), full_path.clone()).await?;
|
||||||
let file = open_file(open_files.clone(), path).await?;
|
let file = open_file(open_files.clone(), path).await?;
|
||||||
let mut file = file.lock().await;
|
let mut file = file.lock().await;
|
||||||
|
|
||||||
let length = file.metadata().await.unwrap().len();
|
let length = file.metadata().await.unwrap().len();
|
||||||
|
|
||||||
(
|
(
|
||||||
serde_json::to_vec(&VfsResponse::GetEntryLength(Some(length))).unwrap(),
|
serde_json::to_vec(&VfsResponse::GetEntryLength(Some(length))).unwrap(),
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(target) = km.rsvp.or_else(|| {
|
if let Some(target) = km.rsvp.or_else(|| {
|
||||||
expects_response.map(|_| Address {
|
expects_response.map(|_| Address {
|
||||||
@ -352,7 +386,11 @@ async fn handle_request(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn validate_path(vfs_path: String, drive: String, request_path: String) -> Result<PathBuf, VfsError> {
|
async fn validate_path(
|
||||||
|
vfs_path: String,
|
||||||
|
drive: String,
|
||||||
|
request_path: String,
|
||||||
|
) -> Result<PathBuf, VfsError> {
|
||||||
let drive_base = Path::new(&vfs_path).join(&drive);
|
let drive_base = Path::new(&vfs_path).join(&drive);
|
||||||
if let Err(e) = fs::create_dir_all(&drive_base).await {
|
if let Err(e) = fs::create_dir_all(&drive_base).await {
|
||||||
println!("failed creating drive dir! {:?}", e);
|
println!("failed creating drive dir! {:?}", e);
|
||||||
@ -526,4 +564,4 @@ fn make_error_message(
|
|||||||
payload: None,
|
payload: None,
|
||||||
signed_capabilities: None,
|
signed_capabilities: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user