mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-23 16:43:24 +03:00
vfs: add file cleanup
This commit is contained in:
parent
da90cc49c7
commit
44664807b0
@ -10,13 +10,34 @@ use std::{
|
||||
io::Read,
|
||||
path::{Component, Path, PathBuf},
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::{
|
||||
fs,
|
||||
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom},
|
||||
sync::Mutex,
|
||||
time::interval,
|
||||
};
|
||||
|
||||
// Constants for file cleanup
|
||||
const FILE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
|
||||
const FILE_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
|
||||
|
||||
/// The main VFS service function.
|
||||
///
|
||||
/// This function sets up the VFS, handles incoming requests, and manages file operations.
|
||||
/// It also implements a file cleanup mechanism to close idle files.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `our_node` - The identifier for the current node
|
||||
/// * `send_to_loop` - Sender for kernel messages
|
||||
/// * `send_to_terminal` - Sender for print messages
|
||||
/// * `recv_from_loop` - Receiver for incoming messages
|
||||
/// * `send_to_caps_oracle` - Sender for capability messages
|
||||
/// * `home_directory_path` - Path to the home directory
|
||||
///
|
||||
/// # Returns
|
||||
/// * `anyhow::Result<()>` - Should never return Ok, but will return fatal errors.
|
||||
pub async fn vfs(
|
||||
our_node: Arc<String>,
|
||||
send_to_loop: MessageSender,
|
||||
@ -27,14 +48,27 @@ pub async fn vfs(
|
||||
) -> anyhow::Result<()> {
|
||||
let vfs_path = format!("{home_directory_path}/vfs");
|
||||
|
||||
if let Err(e) = fs::create_dir_all(&vfs_path).await {
|
||||
panic!("failed creating vfs dir! {e:?}");
|
||||
}
|
||||
fs::create_dir_all(&vfs_path)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("failed creating vfs dir! {e:?}"))?;
|
||||
let vfs_path = Arc::new(fs::canonicalize(&vfs_path).await?);
|
||||
|
||||
let open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>> = Arc::new(DashMap::new());
|
||||
let open_files: Arc<DashMap<PathBuf, (Arc<Mutex<fs::File>>, Instant)>> =
|
||||
Arc::new(DashMap::new());
|
||||
|
||||
let process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> = HashMap::new();
|
||||
let process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> =
|
||||
HashMap::default();
|
||||
|
||||
// Start the file cleanup task
|
||||
let cleanup_open_files = open_files.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut interval = interval(FILE_CLEANUP_INTERVAL);
|
||||
loop {
|
||||
interval.tick().await;
|
||||
cleanup_open_files
|
||||
.retain(|_, (_, last_accessed)| last_accessed.elapsed() < FILE_IDLE_TIMEOUT);
|
||||
}
|
||||
});
|
||||
|
||||
while let Some(km) = recv_from_loop.recv().await {
|
||||
if *our_node != km.source.node {
|
||||
@ -60,7 +94,7 @@ pub async fn vfs(
|
||||
queue_lock.push_back(km);
|
||||
}
|
||||
|
||||
// clone Arcs
|
||||
// Clone Arcs for the new task
|
||||
let our_node = our_node.clone();
|
||||
let send_to_loop = send_to_loop.clone();
|
||||
let send_to_terminal = send_to_terminal.clone();
|
||||
@ -111,10 +145,24 @@ pub async fn vfs(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handles individual VFS requests.
|
||||
///
|
||||
/// This function processes various VFS actions such as file operations, directory listings, etc.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `our_node` - The identifier for the current node
|
||||
/// * `km` - The incoming kernel message
|
||||
/// * `open_files` - A map of currently open files
|
||||
/// * `send_to_loop` - Sender for kernel messages
|
||||
/// * `send_to_caps_oracle` - Sender for capability messages
|
||||
/// * `vfs_path` - The base path for the VFS
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<(), VfsError>` - Result indicating success or a VFS-specific error
|
||||
async fn handle_request(
|
||||
our_node: &str,
|
||||
km: KernelMessage,
|
||||
open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>>,
|
||||
open_files: Arc<DashMap<PathBuf, (Arc<Mutex<fs::File>>, Instant)>>,
|
||||
send_to_loop: &MessageSender,
|
||||
send_to_caps_oracle: &CapMessageSender,
|
||||
vfs_path: &PathBuf,
|
||||
@ -131,14 +179,9 @@ async fn handle_request(
|
||||
});
|
||||
};
|
||||
|
||||
let request: VfsRequest = match serde_json::from_slice(&body) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
return Err(VfsError::BadJson {
|
||||
error: e.to_string(),
|
||||
});
|
||||
}
|
||||
};
|
||||
let request: VfsRequest = serde_json::from_slice(&body).map_err(|e| VfsError::BadJson {
|
||||
error: e.to_string(),
|
||||
})?;
|
||||
|
||||
// special case for root reading list of all drives.
|
||||
if request.action == VfsAction::ReadDir && request.path == "/" {
|
||||
@ -191,9 +234,9 @@ async fn handle_request(
|
||||
let (package_id, drive, rest) = parse_package_and_drive(&request.path, &vfs_path).await?;
|
||||
let drive = format!("/{package_id}/{drive}");
|
||||
let action = request.action;
|
||||
let path = PathBuf::from(request.path);
|
||||
let path = PathBuf::from(&request.path);
|
||||
|
||||
if &km.source.process != &*KERNEL_PROCESS_ID {
|
||||
if km.source.process != *KERNEL_PROCESS_ID {
|
||||
check_caps(
|
||||
our_node,
|
||||
&km.source,
|
||||
@ -202,7 +245,7 @@ async fn handle_request(
|
||||
&path,
|
||||
&drive,
|
||||
&package_id,
|
||||
&vfs_path,
|
||||
vfs_path,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@ -212,27 +255,26 @@ async fn handle_request(
|
||||
|
||||
let (response_body, bytes) = match action {
|
||||
VfsAction::CreateDrive => {
|
||||
let drive_path = join_paths_safely(&vfs_path, &drive);
|
||||
let drive_path = join_paths_safely(vfs_path, &drive);
|
||||
fs::create_dir_all(drive_path).await?;
|
||||
(VfsResponse::Ok, None)
|
||||
}
|
||||
VfsAction::CreateDir => {
|
||||
fs::create_dir(path).await?;
|
||||
fs::create_dir(&path).await?;
|
||||
(VfsResponse::Ok, None)
|
||||
}
|
||||
VfsAction::CreateDirAll => {
|
||||
fs::create_dir_all(path).await?;
|
||||
fs::create_dir_all(&path).await?;
|
||||
(VfsResponse::Ok, None)
|
||||
}
|
||||
VfsAction::CreateFile => {
|
||||
// create truncates any file that might've existed before
|
||||
open_files.remove(&path);
|
||||
let _file = open_file(open_files, path, true, true).await?;
|
||||
let _file = open_file(open_files, &path, true, true).await?;
|
||||
(VfsResponse::Ok, None)
|
||||
}
|
||||
VfsAction::OpenFile { create } => {
|
||||
// open file opens an existing file, or creates a new one if create is true
|
||||
let file = open_file(open_files, path, create, false).await?;
|
||||
let file = open_file(open_files, &path, create, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
(VfsResponse::Ok, None)
|
||||
@ -249,7 +291,7 @@ async fn handle_request(
|
||||
error: "blob needs to exist for WriteAll".into(),
|
||||
});
|
||||
};
|
||||
let file = open_file(open_files, path, false, false).await?;
|
||||
let file = open_file(open_files, &path, false, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
file.write_all(&blob.bytes).await?;
|
||||
(VfsResponse::Ok, None)
|
||||
@ -260,7 +302,7 @@ async fn handle_request(
|
||||
error: "blob needs to exist for Write".into(),
|
||||
});
|
||||
};
|
||||
fs::write(path, &blob.bytes).await?;
|
||||
fs::write(&path, &blob.bytes).await?;
|
||||
(VfsResponse::Ok, None)
|
||||
}
|
||||
VfsAction::Append => {
|
||||
@ -269,14 +311,14 @@ async fn handle_request(
|
||||
error: "blob needs to exist for Append".into(),
|
||||
});
|
||||
};
|
||||
let file = open_file(open_files, path, false, false).await?;
|
||||
let file = open_file(open_files, &path, false, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
file.seek(SeekFrom::End(0)).await?;
|
||||
file.write_all(&blob.bytes).await?;
|
||||
(VfsResponse::Ok, None)
|
||||
}
|
||||
VfsAction::SyncAll => {
|
||||
let file = open_file(open_files, path, false, false).await?;
|
||||
let file = open_file(open_files, &path, false, false).await?;
|
||||
let file = file.lock().await;
|
||||
file.sync_all().await?;
|
||||
(VfsResponse::Ok, None)
|
||||
@ -286,25 +328,25 @@ async fn handle_request(
|
||||
(VfsResponse::Read, Some(contents))
|
||||
}
|
||||
VfsAction::ReadToEnd => {
|
||||
let file = open_file(open_files, path.clone(), false, false).await?;
|
||||
let file = open_file(open_files, &path, false, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
let mut contents = Vec::new();
|
||||
file.read_to_end(&mut contents).await?;
|
||||
(VfsResponse::Read, Some(contents))
|
||||
}
|
||||
VfsAction::ReadExact(length) => {
|
||||
let file = open_file(open_files, path, false, false).await?;
|
||||
let file = open_file(open_files, &path, false, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
let mut contents = vec![0; length as usize];
|
||||
file.read_exact(&mut contents).await?;
|
||||
(VfsResponse::Read, Some(contents))
|
||||
}
|
||||
VfsAction::ReadDir => {
|
||||
let mut dir = fs::read_dir(path).await?;
|
||||
let mut dir = fs::read_dir(&path).await?;
|
||||
let mut entries = Vec::new();
|
||||
while let Some(entry) = dir.next_entry().await? {
|
||||
let entry_path = entry.path();
|
||||
let relative_path = entry_path.strip_prefix(&vfs_path).unwrap_or(&entry_path);
|
||||
let relative_path = entry_path.strip_prefix(vfs_path).unwrap_or(&entry_path);
|
||||
|
||||
let metadata = entry.metadata().await?;
|
||||
let file_type = get_file_type(&metadata);
|
||||
@ -317,14 +359,14 @@ async fn handle_request(
|
||||
(VfsResponse::ReadDir(entries), None)
|
||||
}
|
||||
VfsAction::ReadToString => {
|
||||
let file = open_file(open_files, path, false, false).await?;
|
||||
let file = open_file(open_files, &path, false, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
let mut contents = String::new();
|
||||
file.read_to_string(&mut contents).await?;
|
||||
(VfsResponse::ReadToString(contents), None)
|
||||
}
|
||||
VfsAction::Seek { seek_from } => {
|
||||
let file = open_file(open_files, path, false, false).await?;
|
||||
let file = open_file(open_files, &path, false, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
let seek_from = match seek_from {
|
||||
lib::types::core::SeekFrom::Start(offset) => std::io::SeekFrom::Start(offset),
|
||||
@ -340,21 +382,21 @@ async fn handle_request(
|
||||
(VfsResponse::Ok, None)
|
||||
}
|
||||
VfsAction::RemoveDir => {
|
||||
fs::remove_dir(path).await?;
|
||||
fs::remove_dir(&path).await?;
|
||||
(VfsResponse::Ok, None)
|
||||
}
|
||||
VfsAction::RemoveDirAll => {
|
||||
fs::remove_dir_all(path).await?;
|
||||
fs::remove_dir_all(&path).await?;
|
||||
(VfsResponse::Ok, None)
|
||||
}
|
||||
VfsAction::Rename { new_path } => {
|
||||
let new_path = join_paths_safely(&vfs_path, &new_path);
|
||||
fs::rename(path, new_path).await?;
|
||||
let new_path = join_paths_safely(vfs_path, &new_path);
|
||||
fs::rename(&path, new_path).await?;
|
||||
(VfsResponse::Ok, None)
|
||||
}
|
||||
VfsAction::CopyFile { new_path } => {
|
||||
let new_path = join_paths_safely(&vfs_path, &new_path);
|
||||
fs::copy(path, new_path).await?;
|
||||
let new_path = join_paths_safely(vfs_path, &new_path);
|
||||
fs::copy(&path, new_path).await?;
|
||||
(VfsResponse::Ok, None)
|
||||
}
|
||||
VfsAction::Metadata => {
|
||||
@ -367,7 +409,7 @@ async fn handle_request(
|
||||
(VfsResponse::Metadata(meta), None)
|
||||
}
|
||||
VfsAction::Len => {
|
||||
let file = open_file(open_files, path, false, false).await?;
|
||||
let file = open_file(open_files, &path, false, false).await?;
|
||||
let file = file.lock().await;
|
||||
let len = file.metadata().await?.len();
|
||||
(VfsResponse::Len(len), None)
|
||||
@ -527,14 +569,14 @@ async fn parse_package_and_drive(
|
||||
}
|
||||
|
||||
async fn open_file<P: AsRef<Path>>(
|
||||
open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>>,
|
||||
open_files: Arc<DashMap<PathBuf, (Arc<Mutex<fs::File>>, Instant)>>,
|
||||
path: P,
|
||||
create: bool,
|
||||
truncate: bool,
|
||||
) -> Result<Arc<Mutex<fs::File>>, VfsError> {
|
||||
let path = path.as_ref().to_path_buf();
|
||||
Ok(match open_files.get(&path) {
|
||||
Some(file) => file.value().clone(),
|
||||
Some(file) => file.value().0.clone(),
|
||||
None => {
|
||||
let file = Arc::new(Mutex::new(
|
||||
tokio::fs::OpenOptions::new()
|
||||
@ -549,7 +591,7 @@ async fn open_file<P: AsRef<Path>>(
|
||||
path: path.display().to_string(),
|
||||
})?,
|
||||
));
|
||||
open_files.insert(path, file.clone());
|
||||
open_files.insert(path, (file.clone(), Instant::now()));
|
||||
file
|
||||
}
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user