Merge branch 'develop' into hf/modularize-build

This commit is contained in:
hosted-fornet 2024-10-14 10:16:10 -07:00
commit 2f4104051d
29 changed files with 1511 additions and 471 deletions

7
Cargo.lock generated
View File

@ -3641,7 +3641,7 @@ dependencies = [
[[package]]
name = "kinode"
version = "0.9.5"
version = "0.9.7"
dependencies = [
"aes-gcm",
"alloy 0.2.1",
@ -3665,6 +3665,7 @@ dependencies = [
"jwt",
"lazy_static",
"lib",
"libc",
"nohash-hasher",
"open",
"public-ip",
@ -3697,7 +3698,7 @@ dependencies = [
[[package]]
name = "kinode_lib"
version = "0.9.5"
version = "0.9.7"
dependencies = [
"lib",
]
@ -3857,7 +3858,7 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67"
[[package]]
name = "lib"
version = "0.9.5"
version = "0.9.7"
dependencies = [
"alloy 0.2.1",
"kit 0.7.6",

View File

@ -1,7 +1,7 @@
[package]
name = "kinode_lib"
authors = ["KinodeDAO"]
version = "0.9.5"
version = "0.9.7"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"

View File

@ -1,7 +1,7 @@
[package]
name = "kinode"
authors = ["KinodeDAO"]
version = "0.9.5"
version = "0.9.7"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"
@ -57,6 +57,7 @@ indexmap = "2.4"
jwt = "0.16"
lib = { path = "../lib" }
lazy_static = "1.4.0"
libc = "0.2"
nohash-hasher = "0.2.0"
open = "5.1.4"
public-ip = "0.2.2"

View File

@ -184,6 +184,8 @@ interface downloads {
auto-update(auto-update-request),
/// Notify that a download is complete
download-complete(download-complete-request),
/// Auto-update-download complete
auto-download-complete(auto-download-complete-request),
/// Get files for a package
get-files(option<package-id>),
/// Remove a file
@ -243,6 +245,12 @@ interface downloads {
err: option<download-error>,
}
/// Request for an auto-download complete
record auto-download-complete-request {
download-info: download-complete-request,
manifest-hash: string,
}
/// Represents a hash mismatch error
record hash-mismatch {
desired: string,

View File

@ -474,7 +474,10 @@ fn serve_paths(
&our.node().to_string(),
) {
Ok(_) => {
println!("successfully installed package: {:?}", process_package_id);
println!(
"successfully installed {}:{}",
process_package_id.package_name, process_package_id.publisher_node
);
Ok((StatusCode::CREATED, None, vec![]))
}
Err(e) => Ok((

View File

@ -30,7 +30,7 @@
//! It delegates these responsibilities to the downloads and chain processes respectively.
//!
use crate::kinode::process::downloads::{
DownloadCompleteRequest, DownloadResponses, ProgressUpdate,
AutoDownloadCompleteRequest, DownloadCompleteRequest, DownloadResponses, ProgressUpdate,
};
use crate::kinode::process::main::{
ApisResponse, GetApiResponse, InstallPackageRequest, InstallResponse, LocalRequest,
@ -65,6 +65,7 @@ pub enum Req {
LocalRequest(LocalRequest),
Progress(ProgressUpdate),
DownloadComplete(DownloadCompleteRequest),
AutoDownloadComplete(AutoDownloadCompleteRequest),
Http(http::server::HttpServerRequest),
}
@ -161,6 +162,40 @@ fn handle_message(
},
);
}
Req::AutoDownloadComplete(req) => {
if !message.is_local(&our) {
return Err(anyhow::anyhow!(
"auto download complete from non-local node"
));
}
// auto_install case:
// the downloads process has given us the new package manifest's
// capability hashes, and the old package's capability hashes.
// we can use these to determine if the new package has the same
// capabilities as the old one, and if so, auto-install it.
let manifest_hash = req.manifest_hash;
let package_id = req.download_info.package_id;
let version_hash = req.download_info.version_hash;
if let Some(package) = state.packages.get(&package_id.clone().to_process_lib()) {
if package.manifest_hash == Some(manifest_hash) {
print_to_terminal(1, "auto_install:main, manifest_hash match");
if let Err(e) =
utils::install(&package_id, None, &version_hash, state, &our.node)
{
print_to_terminal(1, &format!("error auto_installing package: {e}"));
} else {
println!(
"auto_installed update for package: {:?}",
&package_id.to_process_lib()
);
}
} else {
print_to_terminal(1, "auto_install:main, manifest_hash do not match");
}
}
}
Req::DownloadComplete(req) => {
if !message.is_local(&our) {
return Err(anyhow::anyhow!("download complete from non-local node"));
@ -182,41 +217,6 @@ fn handle_message(
.unwrap(),
},
);
// auto_install case:
// the downloads process has given us the new package manifest's
// capability hashes, and the old package's capability hashes.
// we can use these to determine if the new package has the same
// capabilities as the old one, and if so, auto-install it.
if let Some(context) = message.context() {
let manifest_hash = String::from_utf8(context.to_vec())?;
if let Some(package) =
state.packages.get(&req.package_id.clone().to_process_lib())
{
if package.manifest_hash == Some(manifest_hash) {
print_to_terminal(1, "auto_install:main, manifest_hash match");
if let Err(e) = utils::install(
&req.package_id,
None,
&req.version_hash,
state,
&our.node,
) {
print_to_terminal(
1,
&format!("error auto_installing package: {e}"),
);
} else {
println!(
"auto_installed update for package: {:?}",
&req.package_id.to_process_lib()
);
}
} else {
print_to_terminal(1, "auto_install:main, manifest_hash do not match");
}
}
}
}
}
} else {
@ -261,8 +261,8 @@ fn handle_local_request(
match utils::install(&package_id, metadata, &version_hash, state, &our.node) {
Ok(()) => {
println!(
"successfully installed package: {:?}",
&package_id.to_process_lib()
"successfully installed {}:{}",
package_id.package_name, package_id.publisher_node
);
LocalResponse::InstallResponse(InstallResponse::Success)
}

View File

@ -42,9 +42,9 @@
//! mechanism is implemented in the FT worker for improved modularity and performance.
//!
use crate::kinode::process::downloads::{
AutoUpdateRequest, DirEntry, DownloadCompleteRequest, DownloadError, DownloadRequests,
DownloadResponses, Entry, FileEntry, HashMismatch, LocalDownloadRequest, RemoteDownloadRequest,
RemoveFileRequest,
AutoDownloadCompleteRequest, AutoUpdateRequest, DirEntry, DownloadCompleteRequest,
DownloadError, DownloadRequests, DownloadResponses, Entry, FileEntry, HashMismatch,
LocalDownloadRequest, RemoteDownloadRequest, RemoveFileRequest,
};
use std::{collections::HashSet, io::Read, str::FromStr};
@ -245,7 +245,7 @@ fn handle_message(
// if we have a pending auto_install, forward that context to the main process.
// it will check if the caps_hashes match (no change in capabilities), and auto_install if it does.
let context = if auto_updates.remove(&(
let manifest_hash = if auto_updates.remove(&(
req.package_id.clone().to_process_lib(),
req.version_hash.clone(),
)) {
@ -253,7 +253,7 @@ fn handle_message(
req.package_id.clone().to_process_lib(),
req.version_hash.clone(),
) {
Ok(manifest_hash) => Some(manifest_hash.as_bytes().to_vec()),
Ok(manifest_hash) => Some(manifest_hash),
Err(e) => {
print_to_terminal(
1,
@ -267,13 +267,26 @@ fn handle_message(
};
// pushed to UI via websockets
let mut request = Request::to(("our", "main", "app_store", "sys"))
.body(serde_json::to_vec(&req)?);
Request::to(("our", "main", "app_store", "sys"))
.body(serde_json::to_vec(&req)?)
.send()?;
if let Some(ctx) = context {
request = request.context(ctx);
// trigger auto-update install trigger to main:app_store:sys
if let Some(manifest_hash) = manifest_hash {
let auto_download_complete_req = AutoDownloadCompleteRequest {
download_info: req.clone(),
manifest_hash,
};
print_to_terminal(
1,
&format!(
"auto_update download complete: triggering install on main:app_store:sys"
),
);
Request::to(("our", "main", "app_store", "sys"))
.body(serde_json::to_vec(&auto_download_complete_req)?)
.send()?;
}
request.send()?;
}
DownloadRequests::GetFiles(maybe_id) => {
// if not local, throw to the boonies.

View File

@ -6,7 +6,7 @@ interface PackageSelectorProps {
}
const PackageSelector: React.FC<PackageSelectorProps> = ({ onPackageSelect }) => {
const { installed } = useAppsStore();
const { installed, fetchInstalled } = useAppsStore();
const [selectedPackage, setSelectedPackage] = useState<string>("");
const [customPackage, setCustomPackage] = useState<string>("");
const [isCustomPackageSelected, setIsCustomPackageSelected] = useState(false);
@ -18,6 +18,10 @@ const PackageSelector: React.FC<PackageSelectorProps> = ({ onPackageSelect }) =>
}
}, [selectedPackage, onPackageSelect]);
useEffect(() => {
fetchInstalled();
}, []);
const handlePackageChange = (e: React.ChangeEvent<HTMLSelectElement>) => {
const value = e.target.value;
if (value === "custom") {

View File

@ -1,11 +1,10 @@
import React, { useState, useEffect, useCallback, useMemo } from "react";
import { useParams, useNavigate } from "react-router-dom";
import { useParams } from "react-router-dom";
import { FaDownload, FaSpinner, FaChevronDown, FaChevronUp, FaRocket, FaTrash, FaPlay } from "react-icons/fa";
import useAppsStore from "../store";
import { MirrorSelector } from '../components';
export default function DownloadPage() {
const navigate = useNavigate();
const { id } = useParams<{ id: string }>();
const {
listings,
@ -28,6 +27,9 @@ export default function DownloadPage() {
const [isMirrorOnline, setIsMirrorOnline] = useState<boolean | null>(null);
const [showCapApproval, setShowCapApproval] = useState(false);
const [manifest, setManifest] = useState<any>(null);
const [isInstalling, setIsInstalling] = useState(false);
const [isCheckingLaunch, setIsCheckingLaunch] = useState(false);
const [launchPath, setLaunchPath] = useState<string | null>(null);
const app = useMemo(() => listings[id || ""], [listings, id]);
const appDownloads = useMemo(() => downloads[id || ""] || [], [downloads, id]);
@ -101,6 +103,36 @@ export default function DownloadPage() {
return versionData ? installedApp.our_version_hash === versionData.hash : false;
}, [app, selectedVersion, installedApp, sortedVersions]);
const checkLaunchPath = useCallback(() => {
if (!app) return;
setIsCheckingLaunch(true);
const appId = `${app.package_id.package_name}:${app.package_id.publisher_node}`;
fetchHomepageApps().then(() => {
const path = getLaunchUrl(appId);
setLaunchPath(path);
setIsCheckingLaunch(false);
if (path) {
setIsInstalling(false);
}
});
}, [app, fetchHomepageApps, getLaunchUrl]);
useEffect(() => {
if (isInstalling) {
const checkInterval = setInterval(checkLaunchPath, 500);
const timeout = setTimeout(() => {
clearInterval(checkInterval);
setIsInstalling(false);
setIsCheckingLaunch(false);
}, 5000);
return () => {
clearInterval(checkInterval);
clearTimeout(timeout);
};
}
}, [isInstalling, checkLaunchPath]);
const handleDownload = useCallback(() => {
if (!id || !selectedMirror || !app || !selectedVersion) return;
const versionData = sortedVersions.find(v => v.version === selectedVersion);
@ -130,36 +162,87 @@ export default function DownloadPage() {
}
}, [id, app, appDownloads]);
const canDownload = useMemo(() => {
return selectedMirror && (isMirrorOnline === true || selectedMirror.startsWith('http')) && !isDownloading && !isDownloaded;
}, [selectedMirror, isMirrorOnline, isDownloading, isDownloaded]);
const confirmInstall = useCallback(() => {
if (!id || !selectedVersion) return;
const versionData = sortedVersions.find(v => v.version === selectedVersion);
if (versionData) {
setIsInstalling(true);
setLaunchPath(null);
installApp(id, versionData.hash).then(() => {
fetchData(id);
setShowCapApproval(false);
setManifest(null);
fetchData(id);
});
}
}, [id, selectedVersion, sortedVersions, installApp, fetchData]);
const handleLaunch = useCallback(() => {
if (app) {
const launchUrl = getLaunchUrl(`${app.package_id.package_name}:${app.package_id.publisher_node}`);
if (launchUrl) {
window.location.href = launchUrl;
}
if (launchPath) {
window.location.href = launchPath;
}
}, [app, getLaunchUrl]);
}, [launchPath]);
const canLaunch = useMemo(() => {
if (!app) return false;
return !!getLaunchUrl(`${app.package_id.package_name}:${app.package_id.publisher_node}`);
}, [app, getLaunchUrl]);
const canDownload = useMemo(() => {
return selectedMirror && (isMirrorOnline === true || selectedMirror.startsWith('http')) && !isDownloading && !isDownloaded;
}, [selectedMirror, isMirrorOnline, isDownloading, isDownloaded]);
const renderActionButton = () => {
if (isCurrentVersionInstalled || launchPath) {
return (
<button className="action-button installed-button" disabled>
<FaRocket /> Installed
</button>
);
}
if (isInstalling || isCheckingLaunch) {
return (
<button className="action-button installing-button" disabled>
<FaSpinner className="fa-spin" /> Installing...
</button>
);
}
if (isDownloaded) {
return (
<button
onClick={() => {
const versionData = sortedVersions.find(v => v.version === selectedVersion);
if (versionData) {
handleInstall(versionData.version, versionData.hash);
}
}}
className="action-button install-button"
>
<FaRocket /> Install
</button>
);
}
return (
<button
onClick={handleDownload}
disabled={!canDownload}
className="action-button download-button"
>
{isDownloading ? (
<>
<FaSpinner className="fa-spin" /> Downloading... {downloadProgress}%
</>
) : (
<>
<FaDownload /> Download
</>
)}
</button>
);
};
if (!app) {
return <div className="downloads-page"><h4>Loading app details...</h4></div>;
}
@ -176,15 +259,22 @@ export default function DownloadPage() {
<p className="app-id">{`${app.package_id.package_name}.${app.package_id.publisher_node}`}</p>
</div>
</div>
{installedApp && (
{launchPath ? (
<button
onClick={handleLaunch}
className="launch-button"
disabled={!canLaunch}
>
<FaPlay /> {canLaunch ? 'Launch' : 'No UI found for app'}
<FaPlay /> Launch
</button>
)}
) : isInstalling || isCheckingLaunch ? (
<button className="launch-button" disabled>
<FaSpinner className="fa-spin" /> Checking...
</button>
) : installedApp ? (
<button className="launch-button" disabled>
No UI found for app
</button>
) : null}
</div>
<p className="app-description">{app.metadata?.description}</p>
@ -207,39 +297,7 @@ export default function DownloadPage() {
onMirrorSelect={handleMirrorSelect}
/>
{isCurrentVersionInstalled ? (
<button className="action-button installed-button" disabled>
<FaRocket /> Installed
</button>
) : isDownloaded ? (
<button
onClick={() => {
const versionData = sortedVersions.find(v => v.version === selectedVersion);
if (versionData) {
handleInstall(versionData.version, versionData.hash);
}
}}
className="action-button install-button"
>
<FaRocket /> Install
</button>
) : (
<button
onClick={handleDownload}
disabled={!canDownload}
className="action-button download-button"
>
{isDownloading ? (
<>
<FaSpinner className="fa-spin" /> Downloading... {downloadProgress}%
</>
) : (
<>
<FaDownload /> Download
</>
)}
</button>
)}
{renderActionButton()}
</div>
<div className="my-downloads">

View File

@ -12,7 +12,7 @@ const NAME_INVALID = "Package name must contain only valid characters (a-z, 0-9,
export default function PublishPage() {
const { openConnectModal } = useConnectModal();
const { ourApps, fetchOurApps, installed, downloads } = useAppsStore();
const { ourApps, fetchOurApps, downloads } = useAppsStore();
const publicClient = usePublicClient();
const { address, isConnected, isConnecting } = useAccount();

View File

@ -218,12 +218,6 @@ const useAppsStore = create<AppsStore>()((set, get) => ({
});
if (res.status === HTTP_STATUS.CREATED) {
await get().fetchInstalled();
// hacky: a small delay (500ms) before fetching homepage apps
// to give the app time to add itself to the homepage
// might make sense to add more state and do retry logic instead.
await new Promise(resolve => setTimeout(resolve, 500));
await get().fetchHomepageApps();
}
} catch (error) {

View File

@ -5,12 +5,7 @@
"on_exit": "Restart",
"request_networking": true,
"request_capabilities": [
"net:distro:sys",
"filesystem:distro:sys",
"http_server:distro:sys",
"http_client:distro:sys",
"kernel:distro:sys",
"vfs:distro:sys",
"chess:chess:sys",
"eth:distro:sys",
{
"process": "eth:distro:sys",
@ -18,10 +13,16 @@
"root": true
}
},
"sqlite:distro:sys",
"kv:distro:sys",
"chess:chess:sys",
"fd_manager:distro:sys",
"filesystem:distro:sys",
"http_server:distro:sys",
"http_client:distro:sys",
"kernel:distro:sys",
"kns_indexer:kns_indexer:sys",
"kv:distro:sys",
"net:distro:sys",
"sqlite:distro:sys",
"vfs:distro:sys",
{
"process": "vfs:distro:sys",
"params": {
@ -30,6 +31,6 @@
}
],
"grant_capabilities": [],
"public": true
"public": false
}
]

358
kinode/src/fd_manager.rs Normal file
View File

@ -0,0 +1,358 @@
use lib::types::core::{
Address, FdManagerError, FdManagerRequest, FdManagerResponse, FdsLimit, KernelMessage, Message,
MessageReceiver, MessageSender, PrintSender, Printout, ProcessId, Request,
FD_MANAGER_PROCESS_ID,
};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
const DEFAULT_MAX_OPEN_FDS: u64 = 180;
const DEFAULT_FDS_AS_FRACTION_OF_ULIMIT_PERCENTAGE: u64 = 90;
const SYS_RESERVED_FDS: u64 = 30;
const DEFAULT_UPDATE_ULIMIT_SECS: u64 = 3600;
const _DEFAULT_CULL_FRACTION_DENOMINATOR: u64 = 2;
#[derive(Debug, Serialize, Deserialize)]
struct State {
fds_limits: HashMap<ProcessId, FdsLimit>,
mode: Mode,
max_fds: u64,
}
#[derive(Debug, Serialize, Deserialize)]
enum Mode {
/// don't update the max_fds except by user input
StaticMax,
/// check the system's ulimit periodically and update max_fds accordingly
DynamicMax {
max_fds_as_fraction_of_ulimit_percentage: u64,
update_ulimit_secs: u64,
},
}
impl State {
fn new(static_max_fds: Option<u64>) -> Self {
Self::default(static_max_fds)
}
fn default(static_max_fds: Option<u64>) -> Self {
Self {
fds_limits: HashMap::new(),
mode: Mode::default(static_max_fds),
max_fds: match static_max_fds {
Some(max) => max,
None => DEFAULT_MAX_OPEN_FDS,
},
}
}
fn update_max_fds_from_ulimit(&mut self, ulimit_max_fds: u64) {
let Mode::DynamicMax {
ref max_fds_as_fraction_of_ulimit_percentage,
..
} = self.mode
else {
return;
};
let min_ulimit = SYS_RESERVED_FDS + 10;
if ulimit_max_fds <= min_ulimit {
panic!(
"fatal: ulimit from system ({ulimit_max_fds}) is too small to operate Kinode. Please run Kinode with a larger ulimit (at least {min_ulimit}).",
);
}
self.max_fds =
ulimit_max_fds * max_fds_as_fraction_of_ulimit_percentage / 100 - SYS_RESERVED_FDS;
}
async fn update_all_fds_limits(&mut self, our_node: &str, send_to_loop: &MessageSender) {
let weights = self
.fds_limits
.values()
.map(|limit| limit.hit_count)
.sum::<u64>();
let statically_allocated = self.max_fds as f64 / 2.0;
let per_process_unweighted =
statically_allocated / std::cmp::max(self.fds_limits.len() as u64, 1) as f64;
let per_process_weighted = statically_allocated / std::cmp::max(weights, 1) as f64;
for limit in self.fds_limits.values_mut() {
limit.limit = (per_process_unweighted + per_process_weighted * limit.hit_count as f64)
.floor() as u64;
}
send_all_fds_limits(our_node, send_to_loop, self).await;
}
}
impl Mode {
fn default(static_max_fds: Option<u64>) -> Self {
match static_max_fds {
Some(_) => Self::StaticMax,
None => Self::DynamicMax {
max_fds_as_fraction_of_ulimit_percentage:
DEFAULT_FDS_AS_FRACTION_OF_ULIMIT_PERCENTAGE,
update_ulimit_secs: DEFAULT_UPDATE_ULIMIT_SECS,
},
}
}
}
/// The fd_manager entrypoint.
pub async fn fd_manager(
our_node: Arc<String>,
send_to_loop: MessageSender,
send_to_terminal: PrintSender,
mut recv_from_loop: MessageReceiver,
static_max_fds: Option<u64>,
) -> anyhow::Result<()> {
let mut state = State::new(static_max_fds);
let mut interval = {
// in code block to release the reference into state
let Mode::DynamicMax {
ref update_ulimit_secs,
..
} = state.mode
else {
return Ok(());
};
tokio::time::interval(tokio::time::Duration::from_secs(*update_ulimit_secs))
};
loop {
tokio::select! {
Some(message) = recv_from_loop.recv() => {
match handle_message(
&our_node,
message,
&mut interval,
&mut state,
&send_to_loop,
).await {
Ok(Some(to_print)) => {
Printout::new(2, to_print).send(&send_to_terminal).await;
}
Err(e) => {
Printout::new(1, &format!("handle_message error: {e:?}"))
.send(&send_to_terminal)
.await;
}
_ => {}
}
}
_ = interval.tick() => {
let old_max_fds = state.max_fds;
match update_max_fds(&mut state).await {
Ok(new) => {
if new != old_max_fds {
state.update_all_fds_limits(our_node.as_str(), &send_to_loop).await;
}
}
Err(e) => Printout::new(1, &format!("update_max_fds error: {e:?}"))
.send(&send_to_terminal)
.await,
}
}
}
}
}
async fn handle_message(
our_node: &str,
km: KernelMessage,
_interval: &mut tokio::time::Interval,
state: &mut State,
send_to_loop: &MessageSender,
) -> anyhow::Result<Option<String>> {
let Message::Request(Request {
body,
expects_response,
..
}) = km.message
else {
return Err(FdManagerError::NotARequest.into());
};
let request: FdManagerRequest =
serde_json::from_slice(&body).map_err(|_e| FdManagerError::BadRequest)?;
let return_value = match request {
FdManagerRequest::RequestFdsLimit => {
// divide max_fds by number of processes requesting fds limits,
// then send each process its new limit
// TODO can weight different processes differently
state.fds_limits.insert(
km.source.process,
FdsLimit {
limit: 0,
hit_count: 1, // starts with 1 to give initial weight
},
);
state.update_all_fds_limits(our_node, &send_to_loop).await;
None
}
FdManagerRequest::FdsLimitHit => {
// sender process hit its fd limit
// react to this by incrementing hit count and
// re-weighting all processes' limits
state.fds_limits.get_mut(&km.source.process).map(|limit| {
limit.hit_count += 1;
});
state.update_all_fds_limits(our_node, &send_to_loop).await;
Some(format!("{} hit its fd limit", km.source.process))
}
FdManagerRequest::FdsLimit(_) => {
// should only send this, never receive it
return Err(FdManagerError::FdManagerWasSentLimit.into());
}
FdManagerRequest::UpdateMaxFdsAsFractionOfUlimitPercentage(new) => {
match state.mode {
Mode::DynamicMax {
ref mut max_fds_as_fraction_of_ulimit_percentage,
..
} => *max_fds_as_fraction_of_ulimit_percentage = new,
_ => return Err(FdManagerError::BadRequest.into()),
}
None
}
FdManagerRequest::UpdateUpdateUlimitSecs(new) => {
match state.mode {
Mode::DynamicMax {
ref mut update_ulimit_secs,
..
} => *update_ulimit_secs = new,
_ => return Err(FdManagerError::BadRequest.into()),
}
None
}
FdManagerRequest::UpdateCullFractionDenominator(_new) => {
// state.cull_fraction_denominator = new;
None
}
FdManagerRequest::GetState => {
if expects_response.is_some() {
KernelMessage::builder()
.id(km.id)
.source(km.target)
.target(km.rsvp.unwrap_or(km.source))
.message(Message::Response((
lib::core::Response {
body: serde_json::to_vec(&FdManagerResponse::GetState(
state.fds_limits.clone(),
))
.unwrap(),
inherit: false,
metadata: None,
capabilities: vec![],
},
None,
)))
.build()
.unwrap()
.send(send_to_loop)
.await;
}
Some(format!("fd_manager: {:?}", state))
}
FdManagerRequest::GetProcessFdLimit(process) => {
if expects_response.is_some() {
KernelMessage::builder()
.id(km.id)
.source(km.target)
.target(km.rsvp.unwrap_or(km.source))
.message(Message::Response((
lib::core::Response {
body: serde_json::to_vec(&FdManagerResponse::GetProcessFdLimit(
state
.fds_limits
.get(&process)
.map(|limit| limit.limit)
.unwrap_or(0),
))
.unwrap(),
inherit: false,
metadata: None,
capabilities: vec![],
},
None,
)))
.build()
.unwrap()
.send(send_to_loop)
.await;
}
None
}
};
Ok(return_value)
}
async fn update_max_fds(state: &mut State) -> anyhow::Result<u64> {
let ulimit_max_fds = get_max_fd_limit()
.map_err(|_| anyhow::anyhow!("Couldn't update max fd limit: ulimit failed"))?;
state.update_max_fds_from_ulimit(ulimit_max_fds);
Ok(ulimit_max_fds)
}
async fn send_all_fds_limits(our_node: &str, send_to_loop: &MessageSender, state: &State) {
for (process_id, limit) in &state.fds_limits {
KernelMessage::builder()
.id(rand::random())
.source((our_node, FD_MANAGER_PROCESS_ID.clone()))
.target((our_node, process_id.clone()))
.message(Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&FdManagerRequest::FdsLimit(limit.limit)).unwrap(),
metadata: None,
capabilities: vec![],
}))
.build()
.unwrap()
.send(send_to_loop)
.await;
}
}
fn get_max_fd_limit() -> anyhow::Result<u64> {
let mut rlim = libc::rlimit {
rlim_cur: 0, // Current limit
rlim_max: 0, // Maximum limit value
};
// RLIMIT_NOFILE is the resource indicating the maximum file descriptor number.
if unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut rlim) } == 0 {
Ok(rlim.rlim_cur as u64)
} else {
Err(anyhow::anyhow!("Failed to get the resource limit."))
}
}
pub async fn send_fd_manager_request_fds_limit(our: &Address, send_to_loop: &MessageSender) {
let message = Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&FdManagerRequest::RequestFdsLimit).unwrap(),
metadata: None,
capabilities: vec![],
});
send_to_fd_manager(our, message, send_to_loop).await
}
pub async fn send_fd_manager_hit_fds_limit(our: &Address, send_to_loop: &MessageSender) {
let message = Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&FdManagerRequest::FdsLimitHit).unwrap(),
metadata: None,
capabilities: vec![],
});
send_to_fd_manager(our, message, send_to_loop).await
}
async fn send_to_fd_manager(our: &Address, message: Message, send_to_loop: &MessageSender) {
KernelMessage::builder()
.id(rand::random())
.source(our.clone())
.target((our.node.clone(), FD_MANAGER_PROCESS_ID.clone()))
.message(message)
.build()
.unwrap()
.send(send_to_loop)
.await
}

View File

@ -1,8 +1,10 @@
use crate::vfs::UniqueQueue;
use dashmap::DashMap;
use lib::types::core::{
Address, CapMessage, CapMessageSender, Capability, KernelMessage, KvAction, KvError, KvRequest,
KvResponse, LazyLoadBlob, Message, MessageReceiver, MessageSender, PackageId, PrintSender,
Printout, ProcessId, Request, Response, KV_PROCESS_ID,
Address, CapMessage, CapMessageSender, Capability, FdManagerRequest, KernelMessage, KvAction,
KvError, KvRequest, KvResponse, LazyLoadBlob, Message, MessageReceiver, MessageSender,
PackageId, PrintSender, Printout, ProcessId, Request, Response, FD_MANAGER_PROCESS_ID,
KV_PROCESS_ID,
};
use rocksdb::OptimisticTransactionDB;
use std::{
@ -11,6 +13,81 @@ use std::{
};
use tokio::{fs, sync::Mutex};
#[derive(Clone)]
struct KvState {
our: Arc<Address>,
kv_path: Arc<String>,
send_to_loop: MessageSender,
send_to_terminal: PrintSender,
open_kvs: Arc<DashMap<(PackageId, String), OptimisticTransactionDB>>,
/// access order of dbs, used to cull if we hit the fds limit
access_order: Arc<Mutex<UniqueQueue<(PackageId, String)>>>,
txs: Arc<DashMap<u64, Vec<(KvAction, Option<Vec<u8>>)>>>,
fds_limit: u64,
}
impl KvState {
pub fn new(
our: Address,
send_to_terminal: PrintSender,
send_to_loop: MessageSender,
home_directory_path: String,
) -> Self {
Self {
our: Arc::new(our),
kv_path: Arc::new(format!("{home_directory_path}/kv")),
send_to_loop,
send_to_terminal,
open_kvs: Arc::new(DashMap::new()),
access_order: Arc::new(Mutex::new(UniqueQueue::new())),
txs: Arc::new(DashMap::new()),
fds_limit: 10,
}
}
pub async fn open_db(&mut self, package_id: PackageId, db: String) -> Result<(), KvError> {
let key = (package_id.clone(), db.clone());
if self.open_kvs.contains_key(&key) {
let mut access_order = self.access_order.lock().await;
access_order.remove(&key);
access_order.push_back(key);
return Ok(());
}
if self.open_kvs.len() as u64 >= self.fds_limit {
// close least recently used db
let key = self.access_order.lock().await.pop_front().unwrap();
self.remove_db(key.0, key.1).await;
}
let db_path = format!("{}/{}/{}", self.kv_path.as_str(), package_id, db);
fs::create_dir_all(&db_path).await?;
self.open_kvs.insert(
key,
OptimisticTransactionDB::open_default(&db_path).map_err(rocks_to_kv_err)?,
);
let mut access_order = self.access_order.lock().await;
access_order.push_back((package_id, db));
Ok(())
}
pub async fn remove_db(&mut self, package_id: PackageId, db: String) {
self.open_kvs.remove(&(package_id.clone(), db.to_string()));
let mut access_order = self.access_order.lock().await;
access_order.remove(&(package_id, db));
}
pub async fn remove_least_recently_used_dbs(&mut self, n: u64) {
for _ in 0..n {
let mut lock = self.access_order.lock().await;
let key = lock.pop_front().unwrap();
drop(lock);
self.remove_db(key.0, key.1).await;
}
}
}
pub async fn kv(
our_node: Arc<String>,
send_to_loop: MessageSender,
@ -19,31 +96,44 @@ pub async fn kv(
send_to_caps_oracle: CapMessageSender,
home_directory_path: String,
) -> anyhow::Result<()> {
let kv_path = Arc::new(format!("{home_directory_path}/kv"));
if let Err(e) = fs::create_dir_all(&*kv_path).await {
let our = Address::new(our_node.as_str(), KV_PROCESS_ID.clone());
crate::fd_manager::send_fd_manager_request_fds_limit(&our, &send_to_loop).await;
let mut state = KvState::new(our, send_to_terminal, send_to_loop, home_directory_path);
if let Err(e) = fs::create_dir_all(state.kv_path.as_str()).await {
panic!("failed creating kv dir! {e:?}");
}
let open_kvs: Arc<DashMap<(PackageId, String), OptimisticTransactionDB>> =
Arc::new(DashMap::new());
let txs: Arc<DashMap<u64, Vec<(KvAction, Option<Vec<u8>>)>>> = Arc::new(DashMap::new());
let process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> = HashMap::new();
while let Some(km) = recv_from_loop.recv().await {
if *our_node != km.source.node {
if state.our.node != km.source.node {
Printout::new(
1,
format!(
"kv: got request from {}, but requests must come from our node {our_node}",
km.source.node
"kv: got request from {}, but requests must come from our node {}",
km.source.node, state.our.node,
),
)
.send(&send_to_terminal)
.send(&state.send_to_terminal)
.await;
continue;
}
if km.source.process == *FD_MANAGER_PROCESS_ID {
if let Err(e) = handle_fd_request(km, &mut state).await {
Printout::new(
1,
format!("kv: got request from fd_manager that errored: {e:?}"),
)
.send(&state.send_to_terminal)
.await;
};
continue;
}
let queue = process_queues
.get(&km.source.process)
.cloned()
@ -55,13 +145,8 @@ pub async fn kv(
}
// clone Arcs
let our_node = our_node.clone();
let send_to_loop = send_to_loop.clone();
let send_to_terminal = send_to_terminal.clone();
let mut state = state.clone();
let send_to_caps_oracle = send_to_caps_oracle.clone();
let open_kvs = open_kvs.clone();
let txs = txs.clone();
let kv_path = kv_path.clone();
tokio::spawn(async move {
let mut queue_lock = queue.lock().await;
@ -69,23 +154,13 @@ pub async fn kv(
let (km_id, km_rsvp) =
(km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone()));
if let Err(e) = handle_request(
&our_node,
km,
open_kvs,
txs,
&send_to_loop,
&send_to_caps_oracle,
&kv_path,
)
.await
{
if let Err(e) = handle_request(km, &mut state, &send_to_caps_oracle).await {
Printout::new(1, format!("kv: {e}"))
.send(&send_to_terminal)
.send(&state.send_to_terminal)
.await;
KernelMessage::builder()
.id(km_id)
.source((our_node.as_str(), KV_PROCESS_ID.clone()))
.source(state.our.as_ref().clone())
.target(km_rsvp)
.message(Message::Response((
Response {
@ -98,7 +173,7 @@ pub async fn kv(
)))
.build()
.unwrap()
.send(&send_to_loop)
.send(&state.send_to_loop)
.await;
}
}
@ -108,13 +183,9 @@ pub async fn kv(
}
async fn handle_request(
our_node: &str,
km: KernelMessage,
open_kvs: Arc<DashMap<(PackageId, String), OptimisticTransactionDB>>,
txs: Arc<DashMap<u64, Vec<(KvAction, Option<Vec<u8>>)>>>,
send_to_loop: &MessageSender,
state: &mut KvState,
send_to_caps_oracle: &CapMessageSender,
kv_path: &str,
) -> Result<(), KvError> {
let KernelMessage {
id,
@ -145,15 +216,12 @@ async fn handle_request(
}
};
check_caps(
our_node,
&source,
&open_kvs,
send_to_caps_oracle,
&request,
kv_path,
)
.await?;
check_caps(&source, state, send_to_caps_oracle, &request).await?;
// always open to ensure db exists
state
.open_db(request.package_id.clone(), request.db.clone())
.await?;
let (body, bytes) = match &request.action {
KvAction::Open => {
@ -165,7 +233,7 @@ async fn handle_request(
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
}
KvAction::Get { key } => {
let db = match open_kvs.get(&(request.package_id, request.db)) {
let db = match state.open_kvs.get(&(request.package_id, request.db)) {
None => {
return Err(KvError::NoDb);
}
@ -190,14 +258,14 @@ async fn handle_request(
}
KvAction::BeginTx => {
let tx_id = rand::random::<u64>();
txs.insert(tx_id, Vec::new());
state.txs.insert(tx_id, Vec::new());
(
serde_json::to_vec(&KvResponse::BeginTx { tx_id }).unwrap(),
None,
)
}
KvAction::Set { key, tx_id } => {
let db = match open_kvs.get(&(request.package_id, request.db)) {
let db = match state.open_kvs.get(&(request.package_id, request.db)) {
None => {
return Err(KvError::NoDb);
}
@ -214,7 +282,7 @@ async fn handle_request(
db.put(key, blob.bytes).map_err(rocks_to_kv_err)?;
}
Some(tx_id) => {
let mut tx = match txs.get_mut(tx_id) {
let mut tx = match state.txs.get_mut(tx_id) {
None => {
return Err(KvError::NoTx);
}
@ -227,7 +295,7 @@ async fn handle_request(
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
}
KvAction::Delete { key, tx_id } => {
let db = match open_kvs.get(&(request.package_id, request.db)) {
let db = match state.open_kvs.get(&(request.package_id, request.db)) {
None => {
return Err(KvError::NoDb);
}
@ -238,7 +306,7 @@ async fn handle_request(
db.delete(key).map_err(rocks_to_kv_err)?;
}
Some(tx_id) => {
let mut tx = match txs.get_mut(tx_id) {
let mut tx = match state.txs.get_mut(tx_id) {
None => {
return Err(KvError::NoTx);
}
@ -250,14 +318,14 @@ async fn handle_request(
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
}
KvAction::Commit { tx_id } => {
let db = match open_kvs.get(&(request.package_id, request.db)) {
let db = match state.open_kvs.get(&(request.package_id, request.db)) {
None => {
return Err(KvError::NoDb);
}
Some(db) => db,
};
let txs = match txs.remove(tx_id).map(|(_, tx)| tx) {
let txs = match state.txs.remove(tx_id).map(|(_, tx)| tx) {
None => {
return Err(KvError::NoTx);
}
@ -291,7 +359,7 @@ async fn handle_request(
}
KvAction::Backup => {
// looping through open dbs and flushing their memtables
for db_ref in open_kvs.iter() {
for db_ref in state.open_kvs.iter() {
let db = db_ref.value();
db.flush().map_err(rocks_to_kv_err)?;
}
@ -302,7 +370,7 @@ async fn handle_request(
if let Some(target) = km.rsvp.or_else(|| expects_response.map(|_| source)) {
KernelMessage::builder()
.id(id)
.source((our_node, KV_PROCESS_ID.clone()))
.source(state.our.as_ref().clone())
.target(target)
.message(Message::Response((
Response {
@ -319,7 +387,7 @@ async fn handle_request(
}))
.build()
.unwrap()
.send(send_to_loop)
.send(&state.send_to_loop)
.await;
}
@ -327,12 +395,10 @@ async fn handle_request(
}
async fn check_caps(
our_node: &str,
source: &Address,
open_kvs: &Arc<DashMap<(PackageId, String), OptimisticTransactionDB>>,
state: &mut KvState,
send_to_caps_oracle: &CapMessageSender,
request: &KvRequest,
kv_path: &str,
) -> Result<(), KvError> {
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
let src_package_id = PackageId::new(source.process.package(), source.process.publisher());
@ -345,17 +411,14 @@ async fn check_caps(
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.to_string(),
process: KV_PROCESS_ID.clone(),
},
params: serde_json::json!({
cap: Capability::new(
state.our.as_ref().clone(),
serde_json::json!({
"kind": "write",
"db": request.db.to_string(),
})
.to_string(),
},
),
responder: send_cap_bool,
})
.await?;
@ -371,17 +434,14 @@ async fn check_caps(
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.to_string(),
process: KV_PROCESS_ID.clone(),
},
params: serde_json::json!({
cap: Capability::new(
state.our.as_ref().clone(),
serde_json::json!({
"kind": "read",
"db": request.db.to_string(),
})
.to_string(),
},
),
responder: send_cap_bool,
})
.await?;
@ -403,7 +463,7 @@ async fn check_caps(
add_capability(
"read",
&request.db.to_string(),
&our_node,
&state.our,
&source,
send_to_caps_oracle,
)
@ -411,22 +471,22 @@ async fn check_caps(
add_capability(
"write",
&request.db.to_string(),
&our_node,
&state.our,
&source,
send_to_caps_oracle,
)
.await?;
if open_kvs.contains_key(&(request.package_id.clone(), request.db.clone())) {
if state
.open_kvs
.contains_key(&(request.package_id.clone(), request.db.clone()))
{
return Ok(());
}
let db_path = format!("{}/{}/{}", kv_path, request.package_id, request.db);
fs::create_dir_all(&db_path).await?;
let db = OptimisticTransactionDB::open_default(&db_path).map_err(rocks_to_kv_err)?;
open_kvs.insert((request.package_id.clone(), request.db.clone()), db);
state
.open_db(request.package_id.clone(), request.db.clone())
.await?;
Ok(())
}
KvAction::RemoveDb { .. } => {
@ -436,28 +496,57 @@ async fn check_caps(
});
}
let db_path = format!("{}/{}/{}", kv_path, request.package_id, request.db);
open_kvs.remove(&(request.package_id.clone(), request.db.clone()));
state
.remove_db(request.package_id.clone(), request.db.clone())
.await;
fs::remove_dir_all(format!(
"{}/{}/{}",
state.kv_path, request.package_id, request.db
))
.await?;
fs::remove_dir_all(&db_path).await?;
Ok(())
}
KvAction::Backup { .. } => Ok(()),
}
}
async fn handle_fd_request(km: KernelMessage, state: &mut KvState) -> anyhow::Result<()> {
let Message::Request(Request { body, .. }) = km.message else {
return Err(anyhow::anyhow!("not a request"));
};
let request: FdManagerRequest = serde_json::from_slice(&body)?;
match request {
FdManagerRequest::FdsLimit(new_fds_limit) => {
state.fds_limit = new_fds_limit;
if state.open_kvs.len() as u64 >= state.fds_limit {
crate::fd_manager::send_fd_manager_hit_fds_limit(&state.our, &state.send_to_loop)
.await;
state
.remove_least_recently_used_dbs(state.open_kvs.len() as u64 - state.fds_limit)
.await;
}
}
_ => {
return Err(anyhow::anyhow!("non-Cull FdManagerRequest"));
}
}
Ok(())
}
async fn add_capability(
kind: &str,
db: &str,
our_node: &str,
our: &Address,
source: &Address,
send_to_caps_oracle: &CapMessageSender,
) -> Result<(), KvError> {
let cap = Capability {
issuer: Address {
node: our_node.to_string(),
process: KV_PROCESS_ID.clone(),
},
issuer: our.clone(),
params: serde_json::json!({ "kind": kind, "db": db }).to_string(),
};
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();

View File

@ -15,6 +15,7 @@ use tokio::sync::mpsc;
mod eth;
#[cfg(feature = "simulation-mode")]
mod fakenet;
pub mod fd_manager;
mod http;
mod kernel;
mod keygen;
@ -40,10 +41,15 @@ const VFS_CHANNEL_CAPACITY: usize = 1_000;
const CAP_CHANNEL_CAPACITY: usize = 1_000;
const KV_CHANNEL_CAPACITY: usize = 1_000;
const SQLITE_CHANNEL_CAPACITY: usize = 1_000;
const FD_MANAGER_CHANNEL_CAPACITY: usize = 1_000;
const VERSION: &str = env!("CARGO_PKG_VERSION");
const WS_MIN_PORT: u16 = 9_000;
const TCP_MIN_PORT: u16 = 10_000;
const MAX_PORT: u16 = 65_535;
const DEFAULT_MAX_PEERS: u64 = 32;
const DEFAULT_MAX_PASSTHROUGHS: u64 = 0;
/// default routers as a eth-provider fallback
const DEFAULT_ETH_PROVIDERS: &str = include_str!("eth/default_providers_mainnet.json");
#[cfg(not(feature = "simulation-mode"))]
@ -184,6 +190,9 @@ async fn main() {
// vfs maintains metadata about files in fs for processes
let (vfs_message_sender, vfs_message_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(VFS_CHANNEL_CAPACITY);
// fd_manager makes sure we don't overrun the `ulimit -n`: max number of file descriptors
let (fd_manager_sender, fd_manager_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(FD_MANAGER_CHANNEL_CAPACITY);
// terminal receives prints via this channel, all other modules send prints
let (print_sender, print_receiver): (PrintSender, PrintReceiver) =
mpsc::channel(TERMINAL_CHANNEL_CAPACITY);
@ -291,6 +300,12 @@ async fn main() {
None,
false,
),
(
ProcessId::new(Some("fd_manager"), "distro", "sys"),
fd_manager_sender,
None,
false,
),
];
/*
@ -351,6 +366,12 @@ async fn main() {
print_sender.clone(),
net_message_receiver,
*matches.get_one::<bool>("reveal-ip").unwrap_or(&true),
*matches
.get_one::<u64>("max-peers")
.unwrap_or(&DEFAULT_MAX_PEERS),
*matches
.get_one::<u64>("max-passthroughs")
.unwrap_or(&DEFAULT_MAX_PASSTHROUGHS),
));
tasks.spawn(state::state_sender(
our_name_arc.clone(),
@ -360,6 +381,13 @@ async fn main() {
db,
home_directory_path.clone(),
));
tasks.spawn(fd_manager::fd_manager(
our_name_arc.clone(),
kernel_message_sender.clone(),
print_sender.clone(),
fd_manager_receiver,
matches.get_one::<u64>("soft-ulimit").copied(),
));
tasks.spawn(kv::kv(
our_name_arc.clone(),
kernel_message_sender.clone(),
@ -687,6 +715,18 @@ fn build_command() -> Command {
.arg(
arg!(--"number-log-files" <NUMBER_LOG_FILES> "Number of logs to rotate (default 4)")
.value_parser(value_parser!(u64)),
)
.arg(
arg!(--"max-peers" <MAX_PEERS> "Maximum number of peers to hold active connections with (default 32)")
.value_parser(value_parser!(u64)),
)
.arg(
arg!(--"max-passthroughs" <MAX_PASSTHROUGHS> "Maximum number of passthroughs serve as a router (default 0)")
.value_parser(value_parser!(u64)),
)
.arg(
arg!(--"soft-ulimit" <SOFT_ULIMIT> "Enforce a static maximum number of file descriptors (default fetched from system)")
.value_parser(value_parser!(u64)),
);
#[cfg(feature = "simulation-mode")]

View File

@ -6,31 +6,40 @@ use tokio::sync::mpsc;
/// if target is a peer, queue to be routed
/// otherwise, create peer and initiate routing
pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, km: KernelMessage) {
if let Some(peer) = data.peers.get_mut(&km.target.node) {
peer.sender.send(km).expect("net: peer sender was dropped");
} else {
let Some(peer_id) = data.pki.get(&km.target.node) else {
return utils::error_offline(km, &ext.network_error_tx).await;
};
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
// send message to be routed
peer_tx.send(km).unwrap();
data.peers.insert(
peer_id.name.clone(),
Peer {
identity: peer_id.clone(),
routing_for: false,
sender: peer_tx.clone(),
},
);
tokio::spawn(connect_to_peer(
ext.clone(),
data.clone(),
peer_id.clone(),
peer_rx,
));
pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, mut km: KernelMessage) {
if let Some(mut peer) = data.peers.get_mut(&km.target.node) {
match peer.send(km) {
Ok(()) => {
peer.set_last_message();
return;
}
Err(e_km) => {
// peer connection was closed, remove it and try to reconnect
data.peers.remove(&peer.identity.name).await;
km = e_km.0;
}
}
}
let Some(peer_id) = data.pki.get(&km.target.node) else {
return utils::error_offline(km, &ext.network_error_tx).await;
};
let (mut peer, peer_rx) = Peer::new(peer_id.clone(), false);
// send message to be routed
match peer.send(km) {
Ok(()) => {
peer.set_last_message();
}
Err(e_km) => {
return utils::error_offline(e_km.0, &ext.network_error_tx).await;
}
};
data.peers.insert(peer_id.name.clone(), peer).await;
tokio::spawn(connect_to_peer(
ext.clone(),
data.clone(),
peer_id.clone(),
peer_rx,
));
}
/// based on peer's identity, either use one of their
@ -157,7 +166,7 @@ pub async fn handle_failed_connection(
&format!("net: failed to connect to {}", peer_id.name),
)
.await;
drop(data.peers.remove(&peer_id.name));
data.peers.remove(&peer_id.name).await;
peer_rx.close();
while let Some(km) = peer_rx.recv().await {
utils::error_offline(km, &ext.network_error_tx).await;

View File

@ -1,7 +1,7 @@
use crate::net::types::{IdentityExt, NetData, Peer};
use crate::net::{connect, tcp, utils, ws};
use lib::types::core::{Identity, NodeRouting};
use tokio::{sync::mpsc, time};
use tokio::time;
pub async fn maintain_routers(ext: IdentityExt, data: NetData) -> anyhow::Result<()> {
let NodeRouting::Routers(ref routers) = ext.our.routing else {
@ -29,15 +29,8 @@ pub async fn connect_to_router(router_id: &Identity, ext: &IdentityExt, data: &N
&format!("net: attempting to connect to router {}", router_id.name),
)
.await;
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
data.peers.insert(
router_id.name.clone(),
Peer {
identity: router_id.clone(),
routing_for: false,
sender: peer_tx.clone(),
},
);
let (peer, peer_rx) = Peer::new(router_id.clone(), false);
data.peers.insert(router_id.name.clone(), peer).await;
if let Some((_ip, port)) = router_id.tcp_routing() {
match tcp::init_direct(ext, data, &router_id, *port, true, peer_rx).await {
Ok(()) => {

View File

@ -1,9 +1,13 @@
use lib::types::core::{
Identity, KernelMessage, MessageReceiver, MessageSender, NetAction, NetResponse,
NetworkErrorSender, NodeRouting, PrintSender,
use lib::{
core::Address,
types::core::{
Identity, KernelMessage, MessageReceiver, MessageSender, NetAction, NetResponse,
NetworkErrorSender, NodeRouting, PrintSender, NET_PROCESS_ID,
},
};
use types::{
IdentityExt, NetData, OnchainPKI, Peers, PendingPassthroughs, TCP_PROTOCOL, WS_PROTOCOL,
ActivePassthroughs, IdentityExt, NetData, OnchainPKI, Peers, PendingPassthroughs, TCP_PROTOCOL,
WS_PROTOCOL,
};
use {dashmap::DashMap, ring::signature::Ed25519KeyPair, std::sync::Arc, tokio::task::JoinSet};
@ -31,8 +35,18 @@ pub async fn networking(
network_error_tx: NetworkErrorSender,
print_tx: PrintSender,
kernel_message_rx: MessageReceiver,
_reveal_ip: bool, // only used if indirect
// only used if indirect -- TODO use
_reveal_ip: bool,
max_peers: u64,
// only used by routers
max_passthroughs: u64,
) -> anyhow::Result<()> {
crate::fd_manager::send_fd_manager_request_fds_limit(
&Address::new(&our.name, NET_PROCESS_ID.clone()),
&kernel_message_tx,
)
.await;
let ext = IdentityExt {
our: Arc::new(our),
our_ip: Arc::new(our_ip),
@ -45,14 +59,18 @@ pub async fn networking(
// start by initializing the structs where we'll store PKI in memory
// and store a mapping of peers we have an active route for
let pki: OnchainPKI = Arc::new(DashMap::new());
let peers: Peers = Arc::new(DashMap::new());
let peers: Peers = Peers::new(max_peers, ext.kernel_message_tx.clone());
// only used by routers
let pending_passthroughs: PendingPassthroughs = Arc::new(DashMap::new());
let active_passthroughs: ActivePassthroughs = Arc::new(DashMap::new());
let net_data = NetData {
pki,
peers,
pending_passthroughs,
active_passthroughs,
max_passthroughs,
fds_limit: 10, // small hardcoded limit that gets replaced by fd_manager soon after boot
};
let mut tasks = JoinSet::<anyhow::Result<()>>::new();
@ -107,12 +125,12 @@ pub async fn networking(
async fn local_recv(
ext: IdentityExt,
mut kernel_message_rx: MessageReceiver,
data: NetData,
mut data: NetData,
) -> anyhow::Result<()> {
while let Some(km) = kernel_message_rx.recv().await {
if km.target.node == ext.our.name {
// handle messages sent to us
handle_message(&ext, km, &data).await;
handle_message(&ext, km, &mut data).await;
} else {
connect::send_to_peer(&ext, &data, km).await;
}
@ -120,7 +138,7 @@ async fn local_recv(
Err(anyhow::anyhow!("net: kernel message channel was dropped"))
}
async fn handle_message(ext: &IdentityExt, km: KernelMessage, data: &NetData) {
async fn handle_message(ext: &IdentityExt, km: KernelMessage, data: &mut NetData) {
match &km.message {
lib::core::Message::Request(request) => handle_request(ext, &km, &request.body, data).await,
lib::core::Message::Response((response, _context)) => {
@ -133,7 +151,7 @@ async fn handle_request(
ext: &IdentityExt,
km: &KernelMessage,
request_body: &[u8],
data: &NetData,
data: &mut NetData,
) {
if km.source.node == ext.our.name {
handle_local_request(ext, km, request_body, data).await;
@ -149,11 +167,12 @@ async fn handle_local_request(
ext: &IdentityExt,
km: &KernelMessage,
request_body: &[u8],
data: &NetData,
data: &mut NetData,
) {
match rmp_serde::from_slice::<NetAction>(request_body) {
Err(_e) => {
// ignore
// only other possible message is from fd_manager -- handle here
handle_fdman(km, request_body, data).await;
}
Ok(NetAction::ConnectionRequest(_)) => {
// we shouldn't get these locally, ignore
@ -171,6 +190,7 @@ async fn handle_local_request(
NetAction::GetPeers => (
NetResponse::Peers(
data.peers
.peers()
.iter()
.map(|p| p.identity.clone())
.collect::<Vec<Identity>>(),
@ -189,19 +209,31 @@ async fn handle_local_request(
));
printout.push_str(&format!("our Identity: {:#?}\r\n", ext.our));
printout.push_str(&format!(
"we have connections with {} peers:\r\n",
data.peers.len()
"we have connections with {} peers ({} max):\r\n",
data.peers.peers().len(),
data.peers.max_peers(),
));
for peer in data.peers.iter() {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
for peer in data.peers.peers().iter() {
printout.push_str(&format!(
" {}, routing_for={}\r\n",
peer.identity.name, peer.routing_for,
" {},{} last message {}s ago\r\n",
peer.identity.name,
if peer.routing_for { " (routing)" } else { "" },
now.saturating_sub(peer.last_message)
));
}
printout.push_str(&format!(
"we have {} entries in the PKI\r\n",
data.pki.len()
));
if data.max_passthroughs > 0 {
printout.push_str(&format!(
"we allow {} max passthroughs\r\n",
data.max_passthroughs
));
}
if !data.pending_passthroughs.is_empty() {
printout.push_str(&format!(
"we have {} pending passthroughs:\r\n",
@ -212,6 +244,21 @@ async fn handle_local_request(
}
}
if !data.active_passthroughs.is_empty() {
printout.push_str(&format!(
"we have {} active passthroughs:\r\n",
data.active_passthroughs.len()
));
for p in data.active_passthroughs.iter() {
printout.push_str(&format!(" {} -> {}\r\n", p.key().0, p.key().1));
}
}
printout.push_str(&format!(
"we have {} entries in the PKI\r\n",
data.pki.len()
));
(NetResponse::Diagnostics(printout), None)
}
NetAction::Sign => (
@ -284,6 +331,34 @@ async fn handle_local_request(
}
}
async fn handle_fdman(km: &KernelMessage, request_body: &[u8], data: &mut NetData) {
if km.source.process != *lib::core::FD_MANAGER_PROCESS_ID {
return;
}
let Ok(req) = serde_json::from_slice::<lib::core::FdManagerRequest>(request_body) else {
return;
};
match req {
lib::core::FdManagerRequest::FdsLimit(fds_limit) => {
data.fds_limit = fds_limit;
data.peers.set_max_peers(fds_limit);
// TODO combine with max_peers check
// only update passthrough limit if it's higher than the new fds limit
// most nodes have passthroughs disabled, meaning this will keep it at 0
if data.max_passthroughs > fds_limit {
data.max_passthroughs = fds_limit;
}
// TODO cull passthroughs too
if data.peers.peers().len() >= data.fds_limit as usize {
let diff = data.peers.peers().len() - data.fds_limit as usize;
println!("net: culling {diff} peer(s)\r\n");
data.peers.cull(diff).await;
}
}
_ => return,
}
}
async fn handle_remote_request(
ext: &IdentityExt,
km: &KernelMessage,

View File

@ -170,16 +170,8 @@ async fn recv_connection(
if len != 32 {
let (from_id, target_id) =
validate_routing_request(&ext.our.name, &first_message, &data.pki)?;
return create_passthrough(
&ext.our,
&ext.our_ip,
from_id,
target_id,
&data.peers,
&data.pending_passthroughs,
PendingStream::Tcp(stream),
)
.await;
return create_passthrough(&ext, from_id, target_id, &data, PendingStream::Tcp(stream))
.await;
}
let mut buf = [0u8; 65535];
@ -215,15 +207,9 @@ async fn recv_connection(
&their_id,
)?;
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
data.peers.insert(
their_id.name.clone(),
Peer {
identity: their_id.clone(),
routing_for: their_handshake.proxy_request,
sender: peer_tx,
},
);
let (peer, peer_rx) = Peer::new(their_id.clone(), their_handshake.proxy_request);
data.peers.insert(their_id.name.clone(), peer).await;
tokio::spawn(utils::maintain_connection(
their_handshake.name,
data.peers,
@ -336,15 +322,8 @@ pub async fn recv_via_router(
};
match connect_with_handshake_via_router(&ext, &peer_id, &router_id, stream).await {
Ok(connection) => {
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
data.peers.insert(
peer_id.name.clone(),
Peer {
identity: peer_id.clone(),
routing_for: false,
sender: peer_tx,
},
);
let (peer, peer_rx) = Peer::new(peer_id.clone(), false);
data.peers.insert(peer_id.name.clone(), peer).await;
// maintain direct connection
tokio::spawn(utils::maintain_connection(
peer_id.name,

View File

@ -1,7 +1,7 @@
use crate::net::{
tcp::PeerConnection,
types::{HandshakePayload, IdentityExt, Peers},
utils::{print_debug, print_loud, MESSAGE_MAX_SIZE},
utils::{print_debug, print_loud, IDLE_TIMEOUT, MESSAGE_MAX_SIZE},
};
use lib::types::core::{KernelMessage, MessageSender, NodeId, PrintSender};
use {
@ -82,13 +82,18 @@ pub async fn maintain_connection(
}
};
let timeout = tokio::time::sleep(IDLE_TIMEOUT);
tokio::select! {
_ = write => (),
_ = read => (),
_ = timeout => {
print_debug(&print_tx, &format!("net: closing idle connection with {peer_name}")).await;
}
}
print_debug(&print_tx, &format!("net: connection lost with {peer_name}")).await;
peers.remove(&peer_name);
peers.remove(&peer_name).await;
}
async fn send_protocol_message(

View File

@ -1,13 +1,15 @@
use lib::types::core::{
Identity, KernelMessage, MessageSender, NetworkErrorSender, NodeId, PrintSender,
Address, Identity, KernelMessage, MessageSender, NetworkErrorSender, NodeId, PrintSender,
NET_PROCESS_ID,
};
use {
dashmap::DashMap,
ring::signature::Ed25519KeyPair,
serde::{Deserialize, Serialize},
std::sync::atomic::AtomicU64,
std::sync::Arc,
tokio::net::TcpStream,
tokio::sync::mpsc::UnboundedSender,
tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender},
tokio_tungstenite::{MaybeTlsStream, WebSocketStream},
};
@ -54,16 +56,108 @@ pub struct RoutingRequest {
pub target: NodeId,
}
pub type Peers = Arc<DashMap<String, Peer>>;
#[derive(Clone)]
pub struct Peers {
max_peers: Arc<AtomicU64>,
send_to_loop: MessageSender,
peers: Arc<DashMap<String, Peer>>,
}
impl Peers {
pub fn new(max_peers: u64, send_to_loop: MessageSender) -> Self {
Self {
max_peers: Arc::new(max_peers.into()),
send_to_loop,
peers: Arc::new(DashMap::new()),
}
}
pub fn peers(&self) -> &DashMap<String, Peer> {
&self.peers
}
pub fn max_peers(&self) -> u64 {
self.max_peers.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn set_max_peers(&self, max_peers: u64) {
self.max_peers
.store(max_peers, std::sync::atomic::Ordering::Relaxed);
}
pub fn get(&self, name: &str) -> Option<dashmap::mapref::one::Ref<'_, String, Peer>> {
self.peers.get(name)
}
pub fn get_mut(
&self,
name: &str,
) -> std::option::Option<dashmap::mapref::one::RefMut<'_, String, Peer>> {
self.peers.get_mut(name)
}
pub fn contains_key(&self, name: &str) -> bool {
self.peers.contains_key(name)
}
/// when a peer is inserted, if the total number of peers exceeds the limit,
/// remove the one with the oldest last_message.
pub async fn insert(&self, name: String, peer: Peer) {
self.peers.insert(name, peer);
if self.peers.len() as u64 > self.max_peers.load(std::sync::atomic::Ordering::Relaxed) {
let oldest = self
.peers
.iter()
.min_by_key(|p| p.last_message)
.unwrap()
.key()
.clone();
self.remove(&oldest).await;
crate::fd_manager::send_fd_manager_hit_fds_limit(
&Address::new("our", NET_PROCESS_ID.clone()),
&self.send_to_loop,
)
.await;
}
}
pub async fn remove(&self, name: &str) -> Option<(String, Peer)> {
self.peers.remove(name)
}
/// close the n oldest connections
pub async fn cull(&self, n: usize) {
let mut to_remove = Vec::with_capacity(n);
let mut sorted_peers: Vec<_> = self.peers.iter().collect();
sorted_peers.sort_by_key(|p| p.last_message);
to_remove.extend(sorted_peers.iter().take(n));
for peer in to_remove {
self.remove(&peer.identity.name).await;
}
crate::fd_manager::send_fd_manager_hit_fds_limit(
&Address::new("our", NET_PROCESS_ID.clone()),
&self.send_to_loop,
)
.await;
}
}
pub type OnchainPKI = Arc<DashMap<String, Identity>>;
/// (from, target) -> from's socket
pub type PendingPassthroughs = Arc<DashMap<(NodeId, NodeId), PendingStream>>;
///
/// only used by routers
pub type PendingPassthroughs = Arc<DashMap<(NodeId, NodeId), (PendingStream, u64)>>;
pub enum PendingStream {
WebSocket(WebSocketStream<MaybeTlsStream<TcpStream>>),
Tcp(TcpStream),
}
/// (from, target)
///
/// only used by routers
pub type ActivePassthroughs = Arc<DashMap<(NodeId, NodeId), (u64, KillSender)>>;
impl PendingStream {
pub fn is_ws(&self) -> bool {
matches!(self, PendingStream::WebSocket(_))
@ -73,15 +167,55 @@ impl PendingStream {
}
}
#[derive(Clone)]
type KillSender = tokio::sync::mpsc::Sender<()>;
pub struct Peer {
pub identity: Identity,
/// If true, we are routing for them and have a RoutingClientConnection
/// associated with them. We can send them prompts to establish Passthroughs.
pub routing_for: bool,
pub sender: UnboundedSender<KernelMessage>,
/// unix timestamp of last message sent *or* received
pub last_message: u64,
}
impl Peer {
/// Create a new Peer.
/// If `routing_for` is true, we are routing for them.
pub fn new(identity: Identity, routing_for: bool) -> (Self, UnboundedReceiver<KernelMessage>) {
let (peer_tx, peer_rx) = tokio::sync::mpsc::unbounded_channel();
(
Self {
identity,
routing_for,
sender: peer_tx,
last_message: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
},
peer_rx,
)
}
/// Send a message to the peer.
pub fn send(
&mut self,
km: KernelMessage,
) -> Result<(), tokio::sync::mpsc::error::SendError<KernelMessage>> {
self.sender.send(km)?;
self.set_last_message();
Ok(())
}
/// Update the last message time to now.
pub fn set_last_message(&mut self) {
self.last_message = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
}
}
/// [`Identity`], with additional fields for networking.
#[derive(Clone)]
pub struct IdentityExt {
@ -98,5 +232,10 @@ pub struct IdentityExt {
pub struct NetData {
pub pki: OnchainPKI,
pub peers: Peers,
/// only used by routers
pub pending_passthroughs: PendingPassthroughs,
/// only used by routers
pub active_passthroughs: ActivePassthroughs,
pub max_passthroughs: u64,
pub fds_limit: u64,
}

View File

@ -1,10 +1,10 @@
use crate::net::types::{
HandshakePayload, OnchainPKI, Peers, PendingPassthroughs, PendingStream, RoutingRequest,
TCP_PROTOCOL, WS_PROTOCOL,
ActivePassthroughs, HandshakePayload, IdentityExt, NetData, OnchainPKI, PendingStream,
RoutingRequest, TCP_PROTOCOL, WS_PROTOCOL,
};
use lib::types::core::{
Identity, KernelMessage, KnsUpdate, Message, MessageSender, NetAction, NetworkErrorSender,
NodeRouting, PrintSender, Printout, Request, Response, SendError, SendErrorKind,
NodeId, NodeRouting, PrintSender, Printout, Request, Response, SendError, SendErrorKind,
WrappedSendError,
};
use {
@ -27,27 +27,82 @@ pub const MESSAGE_MAX_SIZE: u32 = 10_485_800;
pub const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
/// 30 minute idle timeout for connections
pub const IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1800);
pub async fn create_passthrough(
our: &Identity,
our_ip: &str,
ext: &IdentityExt,
from_id: Identity,
target_id: Identity,
peers: &Peers,
pending_passthroughs: &PendingPassthroughs,
data: &NetData,
socket_1: PendingStream,
) -> anyhow::Result<()> {
// if we already are at the max number of passthroughs, reject
if data.max_passthroughs == 0 {
return Err(anyhow::anyhow!(
"passthrough denied: this node has disallowed passthroughs. Start node with `--max-passthroughs <VAL>` to allow passthroughs"
));
}
// remove pending before checking bound because otherwise we stop
// ourselves from matching pending if this connection will be
// the max_passthroughs passthrough
let maybe_pending = data
.pending_passthroughs
.remove(&(target_id.name.clone(), from_id.name.clone()));
if data.active_passthroughs.len() + data.pending_passthroughs.len()
>= data.max_passthroughs as usize
{
let oldest_active = data.active_passthroughs.iter().min_by_key(|p| p.0);
let (oldest_active_key, oldest_active_time, oldest_active_kill_sender) = match oldest_active
{
None => (None, get_now(), None),
Some(oldest_active) => {
let (oldest_active_key, oldest_active_val) = oldest_active.pair();
let oldest_active_key = oldest_active_key.clone();
let (oldest_active_time, oldest_active_kill_sender) = oldest_active_val.clone();
(
Some(oldest_active_key),
oldest_active_time,
Some(oldest_active_kill_sender),
)
}
};
let oldest_pending = data.pending_passthroughs.iter().min_by_key(|p| p.1);
let (oldest_pending_key, oldest_pending_time) = match oldest_pending {
None => (None, get_now()),
Some(oldest_pending) => {
let (oldest_pending_key, oldest_pending_val) = oldest_pending.pair();
let oldest_pending_key = oldest_pending_key.clone();
let (_, oldest_pending_time) = oldest_pending_val;
(Some(oldest_pending_key), oldest_pending_time.clone())
}
};
if oldest_active_time < oldest_pending_time {
// active key is oldest
oldest_active_kill_sender.unwrap().send(()).await.unwrap();
data.active_passthroughs.remove(&oldest_active_key.unwrap());
} else {
// pending key is oldest
data.pending_passthroughs
.remove(&oldest_pending_key.unwrap());
}
}
// if the target has already generated a pending passthrough for this source,
// immediately match them
if let Some(((_target, _from), pending_stream)) =
pending_passthroughs.remove(&(target_id.name.clone(), from_id.name.clone()))
{
tokio::spawn(maintain_passthrough(socket_1, pending_stream));
if let Some(((from, target), (pending_stream, _))) = maybe_pending {
tokio::spawn(maintain_passthrough(
from,
target,
socket_1,
pending_stream,
data.active_passthroughs.clone(),
));
return Ok(());
}
if socket_1.is_tcp() {
if let Some((ip, tcp_port)) = target_id.tcp_routing() {
// create passthrough to direct node over tcp
let tcp_url = make_conn_url(our_ip, ip, tcp_port, TCP_PROTOCOL)?;
let tcp_url = make_conn_url(&ext.our_ip, ip, tcp_port, TCP_PROTOCOL)?;
let Ok(Ok(stream_2)) =
time::timeout(TIMEOUT, tokio::net::TcpStream::connect(tcp_url.to_string())).await
else {
@ -57,13 +112,19 @@ pub async fn create_passthrough(
from_id.name
));
};
tokio::spawn(maintain_passthrough(socket_1, PendingStream::Tcp(stream_2)));
tokio::spawn(maintain_passthrough(
from_id.name,
target_id.name,
socket_1,
PendingStream::Tcp(stream_2),
data.active_passthroughs.clone(),
));
return Ok(());
}
} else if socket_1.is_ws() {
if let Some((ip, ws_port)) = target_id.ws_routing() {
// create passthrough to direct node over websocket
let ws_url = make_conn_url(our_ip, ip, ws_port, WS_PROTOCOL)?;
let ws_url = make_conn_url(&ext.our_ip, ip, ws_port, WS_PROTOCOL)?;
let Ok(Ok((socket_2, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await
else {
return Err(anyhow::anyhow!(
@ -73,14 +134,17 @@ pub async fn create_passthrough(
));
};
tokio::spawn(maintain_passthrough(
from_id.name,
target_id.name,
socket_1,
PendingStream::WebSocket(socket_2),
data.active_passthroughs.clone(),
));
return Ok(());
}
}
// create passthrough to indirect node that we do routing for
let target_peer = peers.get(&target_id.name).ok_or(anyhow::anyhow!(
let target_peer = data.peers.get(&target_id.name).ok_or(anyhow::anyhow!(
"can't route to {}, not a peer, for passthrough requested by {}",
target_id.name,
from_id.name
@ -97,7 +161,7 @@ pub async fn create_passthrough(
target_peer.sender.send(
KernelMessage::builder()
.id(rand::random())
.source((our.name.as_str(), "net", "distro", "sys"))
.source((ext.our.name.as_str(), "net", "distro", "sys"))
.target((target_id.name.as_str(), "net", "distro", "sys"))
.message(Message::Request(Request {
inherit: false,
@ -113,12 +177,23 @@ pub async fn create_passthrough(
// or if the target node connects to us with a matching passthrough.
// TODO it is currently possible to have dangling passthroughs in the map
// if the target is "connected" to us but nonresponsive.
pending_passthroughs.insert((from_id.name, target_id.name), socket_1);
let now = get_now();
data.pending_passthroughs
.insert((from_id.name, target_id.name), (socket_1, now));
Ok(())
}
/// cross the streams -- spawn on own task
pub async fn maintain_passthrough(socket_1: PendingStream, socket_2: PendingStream) {
pub async fn maintain_passthrough(
from: NodeId,
target: NodeId,
socket_1: PendingStream,
socket_2: PendingStream,
active_passthroughs: ActivePassthroughs,
) {
let now = get_now();
let (kill_sender, mut kill_receiver) = tokio::sync::mpsc::channel(1);
active_passthroughs.insert((from.clone(), target.clone()), (now, kill_sender));
match (socket_1, socket_2) {
(PendingStream::Tcp(socket_1), PendingStream::Tcp(socket_2)) => {
// do not use bidirectional because if one side closes,
@ -129,6 +204,7 @@ pub async fn maintain_passthrough(socket_1: PendingStream, socket_2: PendingStre
tokio::select! {
_ = copy(&mut r1, &mut w2) => {},
_ = copy(&mut r2, &mut w1) => {},
_ = kill_receiver.recv() => {},
}
}
(PendingStream::WebSocket(mut socket_1), PendingStream::WebSocket(mut socket_2)) => {
@ -163,6 +239,7 @@ pub async fn maintain_passthrough(socket_1: PendingStream, socket_2: PendingStre
break
}
}
_ = kill_receiver.recv() => break,
}
}
let _ = socket_1.close(None).await;
@ -170,9 +247,9 @@ pub async fn maintain_passthrough(socket_1: PendingStream, socket_2: PendingStre
}
_ => {
// these foolish combinations must never occur
return;
}
}
active_passthroughs.remove(&(from, target));
}
pub fn ingest_log(log: KnsUpdate, pki: &OnchainPKI) {
@ -359,3 +436,11 @@ pub async fn print_loud(print_tx: &PrintSender, content: &str) {
pub async fn print_debug(print_tx: &PrintSender, content: &str) {
Printout::new(2, content).send(print_tx).await;
}
pub fn get_now() -> u64 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
now
}

View File

@ -187,15 +187,8 @@ pub async fn recv_via_router(
};
match connect_with_handshake_via_router(&ext, &peer_id, &router_id, socket).await {
Ok(connection) => {
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
data.peers.insert(
peer_id.name.clone(),
Peer {
identity: peer_id.clone(),
routing_for: false,
sender: peer_tx,
},
);
let (peer, peer_rx) = Peer::new(peer_id.clone(), false);
data.peers.insert(peer_id.name.clone(), peer).await;
// maintain direct connection
tokio::spawn(utils::maintain_connection(
peer_id.name,
@ -228,12 +221,10 @@ async fn recv_connection(
let (from_id, target_id) =
validate_routing_request(&ext.our.name, first_message, &data.pki)?;
return create_passthrough(
&ext.our,
&ext.our_ip,
&ext,
from_id,
target_id,
&data.peers,
&data.pending_passthroughs,
&data,
PendingStream::WebSocket(socket),
)
.await;
@ -272,15 +263,9 @@ async fn recv_connection(
&their_id,
)?;
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
data.peers.insert(
their_id.name.clone(),
Peer {
identity: their_id.clone(),
routing_for: their_handshake.proxy_request,
sender: peer_tx,
},
);
let (peer, peer_rx) = Peer::new(their_id.clone(), their_handshake.proxy_request);
data.peers.insert(their_id.name.clone(), peer).await;
tokio::spawn(utils::maintain_connection(
their_handshake.name,
data.peers,

View File

@ -1,6 +1,6 @@
use crate::net::{
types::{HandshakePayload, IdentityExt, Peers},
utils::{print_debug, print_loud, MESSAGE_MAX_SIZE},
utils::{print_debug, print_loud, IDLE_TIMEOUT, MESSAGE_MAX_SIZE},
ws::{PeerConnection, WebSocket},
};
use lib::core::{KernelMessage, MessageSender, NodeId, PrintSender};
@ -103,13 +103,18 @@ pub async fn maintain_connection(
}
};
let timeout = tokio::time::sleep(IDLE_TIMEOUT);
tokio::select! {
_ = write => (),
_ = read => (),
_ = timeout => {
print_debug(&print_tx, &format!("net: closing idle connection with {peer_name}")).await;
}
}
print_debug(&print_tx, &format!("net: connection lost with {peer_name}")).await;
peers.remove(&peer_name);
peers.remove(&peer_name).await;
}
async fn send_protocol_message(

View File

@ -1,9 +1,11 @@
use crate::vfs::UniqueQueue;
use base64::{engine::general_purpose::STANDARD as base64_standard, Engine};
use dashmap::DashMap;
use lib::types::core::{
Address, CapMessage, CapMessageSender, Capability, KernelMessage, LazyLoadBlob, Message,
MessageReceiver, MessageSender, PackageId, PrintSender, Printout, ProcessId, Request, Response,
SqlValue, SqliteAction, SqliteError, SqliteRequest, SqliteResponse, SQLITE_PROCESS_ID,
Address, CapMessage, CapMessageSender, Capability, FdManagerRequest, KernelMessage,
LazyLoadBlob, Message, MessageReceiver, MessageSender, PackageId, PrintSender, Printout,
ProcessId, Request, Response, SqlValue, SqliteAction, SqliteError, SqliteRequest,
SqliteResponse, FD_MANAGER_PROCESS_ID, SQLITE_PROCESS_ID,
};
use rusqlite::Connection;
use std::{
@ -20,6 +22,83 @@ lazy_static::lazy_static! {
HashSet::from(["ALTER", "ANALYZE", "COMMIT", "CREATE", "DELETE", "DETACH", "DROP", "END", "INSERT", "REINDEX", "RELEASE", "RENAME", "REPLACE", "ROLLBACK", "SAVEPOINT", "UPDATE", "VACUUM"]);
}
#[derive(Clone)]
struct SqliteState {
our: Arc<Address>,
sqlite_path: Arc<String>,
send_to_loop: MessageSender,
send_to_terminal: PrintSender,
open_dbs: Arc<DashMap<(PackageId, String), Mutex<Connection>>>,
access_order: Arc<Mutex<UniqueQueue<(PackageId, String)>>>,
txs: Arc<DashMap<u64, Vec<(String, Vec<SqlValue>)>>>,
fds_limit: u64,
}
impl SqliteState {
pub fn new(
our: Address,
send_to_terminal: PrintSender,
send_to_loop: MessageSender,
home_directory_path: String,
) -> Self {
Self {
our: Arc::new(our),
sqlite_path: Arc::new(format!("{home_directory_path}/sqlite")),
send_to_loop,
send_to_terminal,
open_dbs: Arc::new(DashMap::new()),
access_order: Arc::new(Mutex::new(UniqueQueue::new())),
txs: Arc::new(DashMap::new()),
fds_limit: 10,
}
}
pub async fn open_db(&mut self, package_id: PackageId, db: String) -> Result<(), SqliteError> {
let key = (package_id.clone(), db.clone());
if self.open_dbs.contains_key(&key) {
let mut access_order = self.access_order.lock().await;
access_order.remove(&key);
access_order.push_back(key);
return Ok(());
}
if self.open_dbs.len() as u64 >= self.fds_limit {
// close least recently used db
let key = self.access_order.lock().await.pop_front().unwrap();
self.remove_db(key.0, key.1).await;
}
let db_path = format!("{}/{}/{}", self.sqlite_path.as_str(), package_id, db);
fs::create_dir_all(&db_path).await?;
let db_file_path = format!("{}/{}.db", db_path, db);
let db_conn = Connection::open(db_file_path)?;
let _ = db_conn.execute("PRAGMA journal_mode=WAL", []);
self.open_dbs.insert(key, Mutex::new(db_conn));
let mut access_order = self.access_order.lock().await;
access_order.push_back((package_id, db));
Ok(())
}
pub async fn remove_db(&mut self, package_id: PackageId, db: String) {
self.open_dbs.remove(&(package_id.clone(), db.to_string()));
let mut access_order = self.access_order.lock().await;
access_order.remove(&(package_id, db));
}
pub async fn remove_least_recently_used_dbs(&mut self, n: u64) {
for _ in 0..n {
let mut lock = self.access_order.lock().await;
let key = lock.pop_front().unwrap();
drop(lock);
self.remove_db(key.0, key.1).await;
}
}
}
pub async fn sqlite(
our_node: Arc<String>,
send_to_loop: MessageSender,
@ -28,30 +107,44 @@ pub async fn sqlite(
send_to_caps_oracle: CapMessageSender,
home_directory_path: String,
) -> anyhow::Result<()> {
let sqlite_path = Arc::new(format!("{home_directory_path}/sqlite"));
if let Err(e) = fs::create_dir_all(&*sqlite_path).await {
let our = Address::new(our_node.as_str(), SQLITE_PROCESS_ID.clone());
crate::fd_manager::send_fd_manager_request_fds_limit(&our, &send_to_loop).await;
let mut state = SqliteState::new(our, send_to_terminal, send_to_loop, home_directory_path);
if let Err(e) = fs::create_dir_all(state.sqlite_path.as_str()).await {
panic!("failed creating sqlite dir! {e:?}");
}
let open_dbs: Arc<DashMap<(PackageId, String), Mutex<Connection>>> = Arc::new(DashMap::new());
let txs: Arc<DashMap<u64, Vec<(String, Vec<SqlValue>)>>> = Arc::new(DashMap::new());
let process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> = HashMap::new();
while let Some(km) = recv_from_loop.recv().await {
if *our_node != km.source.node {
if state.our.node != km.source.node {
Printout::new(
1,
format!(
"sqlite: got request from {}, but requests must come from our node {our_node}",
km.source.node
"sqlite: got request from {}, but requests must come from our node {}",
km.source.node, state.our.node
),
)
.send(&send_to_terminal)
.send(&state.send_to_terminal)
.await;
continue;
}
if km.source.process == *FD_MANAGER_PROCESS_ID {
if let Err(e) = handle_fd_request(km, &mut state).await {
Printout::new(
1,
format!("sqlite: got request from fd_manager that errored: {e:?}"),
)
.send(&state.send_to_terminal)
.await;
};
continue;
}
let queue = process_queues
.get(&km.source.process)
.cloned()
@ -63,13 +156,8 @@ pub async fn sqlite(
}
// clone Arcs
let our_node = our_node.clone();
let send_to_loop = send_to_loop.clone();
let send_to_terminal = send_to_terminal.clone();
let mut state = state.clone();
let send_to_caps_oracle = send_to_caps_oracle.clone();
let open_dbs = open_dbs.clone();
let txs = txs.clone();
let sqlite_path = sqlite_path.clone();
tokio::spawn(async move {
let mut queue_lock = queue.lock().await;
@ -77,23 +165,13 @@ pub async fn sqlite(
let (km_id, km_rsvp) =
(km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone()));
if let Err(e) = handle_request(
&our_node,
km,
open_dbs,
txs,
&send_to_loop,
&send_to_caps_oracle,
&sqlite_path,
)
.await
{
if let Err(e) = handle_request(km, &mut state, &send_to_caps_oracle).await {
Printout::new(1, format!("sqlite: {e}"))
.send(&send_to_terminal)
.send(&state.send_to_terminal)
.await;
KernelMessage::builder()
.id(km_id)
.source((our_node.as_str(), SQLITE_PROCESS_ID.clone()))
.source(state.our.as_ref().clone())
.target(km_rsvp)
.message(Message::Response((
Response {
@ -107,7 +185,7 @@ pub async fn sqlite(
)))
.build()
.unwrap()
.send(&send_to_loop)
.send(&state.send_to_loop)
.await;
}
}
@ -117,13 +195,9 @@ pub async fn sqlite(
}
async fn handle_request(
our_node: &str,
km: KernelMessage,
open_dbs: Arc<DashMap<(PackageId, String), Mutex<Connection>>>,
txs: Arc<DashMap<u64, Vec<(String, Vec<SqlValue>)>>>,
send_to_loop: &MessageSender,
state: &mut SqliteState,
send_to_caps_oracle: &CapMessageSender,
sqlite_path: &str,
) -> Result<(), SqliteError> {
let KernelMessage {
id,
@ -154,15 +228,12 @@ async fn handle_request(
}
};
check_caps(
our_node,
&source,
&open_dbs,
send_to_caps_oracle,
&request,
sqlite_path,
)
.await?;
check_caps(&source, state, send_to_caps_oracle, &request).await?;
// always open to ensure db exists
state
.open_db(request.package_id.clone(), request.db.clone())
.await?;
let (body, bytes) = match request.action {
SqliteAction::Open => {
@ -174,7 +245,7 @@ async fn handle_request(
(serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None)
}
SqliteAction::Read { query } => {
let db = match open_dbs.get(&(request.package_id, request.db)) {
let db = match state.open_dbs.get(&(request.package_id, request.db)) {
Some(db) => db,
None => {
return Err(SqliteError::NoDb);
@ -230,7 +301,7 @@ async fn handle_request(
)
}
SqliteAction::Write { statement, tx_id } => {
let db = match open_dbs.get(&(request.package_id, request.db)) {
let db = match state.open_dbs.get(&(request.package_id, request.db)) {
Some(db) => db,
None => {
return Err(SqliteError::NoDb);
@ -252,7 +323,9 @@ async fn handle_request(
match tx_id {
Some(tx_id) => {
txs.entry(tx_id)
state
.txs
.entry(tx_id)
.or_default()
.push((statement.clone(), parameters));
}
@ -265,7 +338,7 @@ async fn handle_request(
}
SqliteAction::BeginTx => {
let tx_id = rand::random::<u64>();
txs.insert(tx_id, Vec::new());
state.txs.insert(tx_id, Vec::new());
(
serde_json::to_vec(&SqliteResponse::BeginTx { tx_id }).unwrap(),
@ -273,7 +346,7 @@ async fn handle_request(
)
}
SqliteAction::Commit { tx_id } => {
let db = match open_dbs.get(&(request.package_id, request.db)) {
let db = match state.open_dbs.get(&(request.package_id, request.db)) {
Some(db) => db,
None => {
return Err(SqliteError::NoDb);
@ -281,7 +354,7 @@ async fn handle_request(
};
let mut db = db.lock().await;
let txs = match txs.remove(&tx_id).map(|(_, tx)| tx) {
let txs = match state.txs.remove(&tx_id).map(|(_, tx)| tx) {
None => {
return Err(SqliteError::NoTx);
}
@ -297,7 +370,7 @@ async fn handle_request(
(serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None)
}
SqliteAction::Backup => {
for db_ref in open_dbs.iter() {
for db_ref in state.open_dbs.iter() {
let db = db_ref.value().lock().await;
let result: rusqlite::Result<()> = db
.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |_| Ok(()))
@ -315,7 +388,7 @@ async fn handle_request(
if let Some(target) = km.rsvp.or_else(|| expects_response.map(|_| source)) {
KernelMessage::builder()
.id(id)
.source((our_node, SQLITE_PROCESS_ID.clone()))
.source(state.our.as_ref().clone())
.target(target)
.message(Message::Response((
Response {
@ -332,7 +405,7 @@ async fn handle_request(
}))
.build()
.unwrap()
.send(send_to_loop)
.send(&state.send_to_loop)
.await;
}
@ -340,12 +413,10 @@ async fn handle_request(
}
async fn check_caps(
our_node: &str,
source: &Address,
open_dbs: &Arc<DashMap<(PackageId, String), Mutex<Connection>>>,
state: &mut SqliteState,
send_to_caps_oracle: &CapMessageSender,
request: &SqliteRequest,
sqlite_path: &str,
) -> Result<(), SqliteError> {
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
let src_package_id = PackageId::new(source.process.package(), source.process.publisher());
@ -356,7 +427,7 @@ async fn check_caps(
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability::new(
(our_node, SQLITE_PROCESS_ID.clone()),
state.our.as_ref().clone(),
serde_json::json!({
"kind": "write",
"db": request.db.to_string(),
@ -379,7 +450,7 @@ async fn check_caps(
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability::new(
(our_node, SQLITE_PROCESS_ID.clone()),
state.our.as_ref().clone(),
serde_json::json!({
"kind": "read",
"db": request.db.to_string(),
@ -407,7 +478,7 @@ async fn check_caps(
add_capability(
"read",
&request.db.to_string(),
&our_node,
&state.our,
&source,
send_to_caps_oracle,
)
@ -415,28 +486,22 @@ async fn check_caps(
add_capability(
"write",
&request.db.to_string(),
&our_node,
&state.our,
&source,
send_to_caps_oracle,
)
.await?;
if open_dbs.contains_key(&(request.package_id.clone(), request.db.clone())) {
if state
.open_dbs
.contains_key(&(request.package_id.clone(), request.db.clone()))
{
return Ok(());
}
let db_path = format!("{}/{}/{}", sqlite_path, request.package_id, request.db);
fs::create_dir_all(&db_path).await?;
let db_file_path = format!("{}/{}.db", db_path, request.db);
let db = Connection::open(db_file_path)?;
let _ = db.execute("PRAGMA journal_mode=WAL", []);
open_dbs.insert(
(request.package_id.clone(), request.db.clone()),
Mutex::new(db),
);
state
.open_db(request.package_id.clone(), request.db.clone())
.await?;
Ok(())
}
SqliteAction::RemoveDb => {
@ -446,10 +511,16 @@ async fn check_caps(
});
}
let db_path = format!("{}/{}/{}", sqlite_path, request.package_id, request.db);
open_dbs.remove(&(request.package_id.clone(), request.db.clone()));
state
.remove_db(request.package_id.clone(), request.db.clone())
.await;
fs::remove_dir_all(format!(
"{}/{}/{}",
state.sqlite_path, request.package_id, request.db
))
.await?;
fs::remove_dir_all(&db_path).await?;
Ok(())
}
SqliteAction::Backup => {
@ -459,18 +530,41 @@ async fn check_caps(
}
}
async fn handle_fd_request(km: KernelMessage, state: &mut SqliteState) -> anyhow::Result<()> {
let Message::Request(Request { body, .. }) = km.message else {
return Err(anyhow::anyhow!("not a request"));
};
let request: FdManagerRequest = serde_json::from_slice(&body)?;
match request {
FdManagerRequest::FdsLimit(new_fds_limit) => {
state.fds_limit = new_fds_limit;
if state.open_dbs.len() as u64 >= state.fds_limit {
crate::fd_manager::send_fd_manager_hit_fds_limit(&state.our, &state.send_to_loop)
.await;
state
.remove_least_recently_used_dbs(state.open_dbs.len() as u64 - state.fds_limit)
.await;
}
}
_ => {
return Err(anyhow::anyhow!("non-Cull FdManagerRequest"));
}
}
Ok(())
}
async fn add_capability(
kind: &str,
db: &str,
our_node: &str,
our: &Address,
source: &Address,
send_to_caps_oracle: &CapMessageSender,
) -> Result<(), SqliteError> {
let cap = Capability {
issuer: Address {
node: our_node.to_string(),
process: SQLITE_PROCESS_ID.clone(),
},
issuer: our.clone(),
params: serde_json::json!({ "kind": kind, "db": db }).to_string(),
};
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();

View File

@ -392,7 +392,9 @@ async fn handle_event(
cursor::MoveTo(0, height),
terminal::Clear(ClearType::CurrentLine)
)?;
*win_cols = width - 1;
// since we subtract prompt_len from win_cols, win_cols must always
// be >= prompt_len
*win_cols = std::cmp::max(width - 1, current_line.prompt_len as u16);
*win_rows = height;
if current_line.cursor_col + current_line.prompt_len as u16 > *win_cols {
current_line.cursor_col = *win_cols - current_line.prompt_len as u16;

View File

@ -1,9 +1,9 @@
use dashmap::DashMap;
use lib::types::core::{
Address, CapMessage, CapMessageSender, Capability, DirEntry, FileMetadata, FileType,
KernelMessage, LazyLoadBlob, Message, MessageReceiver, MessageSender, PackageId, PrintSender,
Printout, ProcessId, Request, Response, VfsAction, VfsError, VfsRequest, VfsResponse,
KERNEL_PROCESS_ID, VFS_PROCESS_ID,
Address, CapMessage, CapMessageSender, Capability, DirEntry, FdManagerRequest, FileMetadata,
FileType, KernelMessage, LazyLoadBlob, Message, MessageReceiver, MessageSender, PackageId,
PrintSender, Printout, ProcessId, Request, Response, VfsAction, VfsError, VfsRequest,
VfsResponse, FD_MANAGER_PROCESS_ID, KERNEL_PROCESS_ID, VFS_PROCESS_ID,
};
use std::{
collections::{HashMap, HashSet, VecDeque},
@ -19,9 +19,6 @@ use tokio::{
sync::Mutex,
};
// Constants for file cleanup
const MAX_OPEN_FILES: usize = 180;
/// The main VFS service function.
///
/// This function sets up the VFS, handles incoming requests, and manages file operations.
@ -52,11 +49,16 @@ pub async fn vfs(
.map_err(|e| anyhow::anyhow!("failed creating vfs dir! {e:?}"))?;
let vfs_path = Arc::new(fs::canonicalize(&vfs_path).await?);
let files = Files::new();
let mut files = Files::new(
Address::new(our_node.as_str(), VFS_PROCESS_ID.clone()),
send_to_loop,
);
let process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> =
HashMap::default();
crate::fd_manager::send_fd_manager_request_fds_limit(&files.our, &files.send_to_loop).await;
while let Some(km) = recv_from_loop.recv().await {
if *our_node != km.source.node {
Printout::new(
@ -71,6 +73,18 @@ pub async fn vfs(
continue;
}
if km.source.process == *FD_MANAGER_PROCESS_ID {
if let Err(e) = handle_fd_request(km, &mut files).await {
Printout::new(
1,
format!("vfs: got request from fd_manager that errored: {e:?}"),
)
.send(&send_to_terminal)
.await;
};
continue;
}
let queue = process_queues
.get(&km.source.process)
.cloned()
@ -83,9 +97,8 @@ pub async fn vfs(
// Clone Arcs for the new task
let our_node = our_node.clone();
let send_to_loop = send_to_loop.clone();
let send_to_caps_oracle = send_to_caps_oracle.clone();
let files = files.clone();
let mut files = files.clone();
let vfs_path = vfs_path.clone();
tokio::spawn(async move {
@ -94,15 +107,8 @@ pub async fn vfs(
let (km_id, km_rsvp) =
(km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone()));
if let Err(e) = handle_request(
&our_node,
km,
files,
&send_to_loop,
&send_to_caps_oracle,
&vfs_path,
)
.await
if let Err(e) =
handle_request(&our_node, km, &mut files, &send_to_caps_oracle, &vfs_path).await
{
KernelMessage::builder()
.id(km_id)
@ -119,7 +125,7 @@ pub async fn vfs(
)))
.build()
.unwrap()
.send(&send_to_loop)
.send(&files.send_to_loop)
.await;
}
}
@ -137,6 +143,9 @@ struct Files {
cursor_positions: Arc<DashMap<PathBuf, u64>>,
/// access order of files
access_order: Arc<Mutex<UniqueQueue<PathBuf>>>,
pub our: Address,
pub send_to_loop: MessageSender,
pub fds_limit: u64,
}
struct FileEntry {
@ -145,11 +154,14 @@ struct FileEntry {
}
impl Files {
pub fn new() -> Self {
pub fn new(our: Address, send_to_loop: MessageSender) -> Self {
Self {
open_files: Arc::new(DashMap::new()),
cursor_positions: Arc::new(DashMap::new()),
access_order: Arc::new(Mutex::new(UniqueQueue::new())),
our,
send_to_loop,
fds_limit: 10, // small hardcoded limit that gets replaced by fd_manager soon after boot
}
}
@ -167,10 +179,6 @@ impl Files {
return Ok(entry.value().file.clone());
}
if self.open_files.len() >= MAX_OPEN_FILES {
self.close_least_recently_used_files().await?;
}
let mut file = self.try_open_file(&path, create, truncate).await?;
if let Some(position) = self.cursor_positions.get(&path) {
file.seek(SeekFrom::Start(*position)).await?;
@ -184,18 +192,30 @@ impl Files {
},
);
self.update_access_order(&path).await;
// if open files >= fds_limit, close the (limit/2) least recently used files
if self.open_files.len() as u64 >= self.fds_limit {
crate::fd_manager::send_fd_manager_hit_fds_limit(&self.our, &self.send_to_loop).await;
self.close_least_recently_used_files(self.fds_limit / 2)
.await?;
}
Ok(file)
}
async fn remove_file(&self, path: &Path) -> Result<(), VfsError> {
self.open_files.remove(path);
Ok(())
}
async fn update_access_order(&self, path: &Path) {
let mut access_order = self.access_order.lock().await;
access_order.push_back(path.to_path_buf());
}
async fn close_least_recently_used_files(&self) -> Result<(), VfsError> {
async fn close_least_recently_used_files(&self, to_close: u64) -> Result<(), VfsError> {
let mut access_order = self.access_order.lock().await;
let mut closed = 0;
let to_close = MAX_OPEN_FILES / 3; // close 33% of max open files
while closed < to_close {
if let Some(path) = access_order.pop_front() {
@ -254,8 +274,7 @@ impl Files {
async fn handle_request(
our_node: &str,
km: KernelMessage,
files: Files,
send_to_loop: &MessageSender,
files: &mut Files,
send_to_caps_oracle: &CapMessageSender,
vfs_path: &PathBuf,
) -> Result<(), VfsError> {
@ -311,7 +330,7 @@ async fn handle_request(
)))
.build()
.unwrap()
.send(send_to_loop)
.send(&files.send_to_loop)
.await;
return Ok(());
} else {
@ -361,7 +380,7 @@ async fn handle_request(
}
VfsAction::CreateFile => {
// create truncates any file that might've existed before
files.open_files.remove(&path);
files.remove_file(&path).await?;
let _file = files.open_file(&path, true, true).await?;
(VfsResponse::Ok, None)
}
@ -373,7 +392,7 @@ async fn handle_request(
}
VfsAction::CloseFile => {
// removes file from scope, resets file_handle and cursor.
files.open_files.remove(&path);
files.remove_file(&path).await?;
(VfsResponse::Ok, None)
}
VfsAction::WriteAll => {
@ -470,7 +489,7 @@ async fn handle_request(
}
VfsAction::RemoveFile => {
fs::remove_file(&path).await?;
files.open_files.remove(&path);
files.remove_file(&path).await?;
(VfsResponse::Ok, None)
}
VfsAction::RemoveDir => {
@ -625,7 +644,7 @@ async fn handle_request(
}))
.build()
.unwrap()
.send(send_to_loop)
.send(&files.send_to_loop)
.await;
}
@ -902,7 +921,6 @@ fn get_file_type(metadata: &std::fs::Metadata) -> FileType {
}
/// helper cache for most recently used paths
pub struct UniqueQueue<T>
where
T: Eq + Hash,
@ -993,3 +1011,29 @@ fn join_paths_safely(base: &PathBuf, extension: &str) -> PathBuf {
let extension_path = Path::new(extension_str);
base.join(extension_path)
}
async fn handle_fd_request(km: KernelMessage, files: &mut Files) -> anyhow::Result<()> {
let Message::Request(Request { body, .. }) = km.message else {
return Err(anyhow::anyhow!("not a request"));
};
let request: FdManagerRequest = serde_json::from_slice(&body)?;
match request {
FdManagerRequest::FdsLimit(fds_limit) => {
files.fds_limit = fds_limit;
if files.open_files.len() as u64 >= fds_limit {
crate::fd_manager::send_fd_manager_hit_fds_limit(&files.our, &files.send_to_loop)
.await;
files
.close_least_recently_used_files(files.open_files.len() as u64 - fds_limit)
.await?;
}
}
_ => {
return Err(anyhow::anyhow!("non-Cull FdManagerRequest"));
}
}
Ok(())
}

View File

@ -1,7 +1,7 @@
[package]
name = "lib"
authors = ["KinodeDAO"]
version = "0.9.5"
version = "0.9.7"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"

View File

@ -8,15 +8,17 @@ use thiserror::Error;
lazy_static::lazy_static! {
pub static ref ETH_PROCESS_ID: ProcessId = ProcessId::new(Some("eth"), "distro", "sys");
pub static ref FD_MANAGER_PROCESS_ID: ProcessId = ProcessId::new(Some("fd_manager"), "distro", "sys");
pub static ref HTTP_CLIENT_PROCESS_ID: ProcessId = ProcessId::new(Some("http_client"), "distro", "sys");
pub static ref HTTP_SERVER_PROCESS_ID: ProcessId = ProcessId::new(Some("http_server"), "distro", "sys");
pub static ref KERNEL_PROCESS_ID: ProcessId = ProcessId::new(Some("kernel"), "distro", "sys");
pub static ref KV_PROCESS_ID: ProcessId = ProcessId::new(Some("kv"), "distro", "sys");
pub static ref NET_PROCESS_ID: ProcessId = ProcessId::new(Some("net"), "distro", "sys");
pub static ref STATE_PROCESS_ID: ProcessId = ProcessId::new(Some("state"), "distro", "sys");
pub static ref SQLITE_PROCESS_ID: ProcessId = ProcessId::new(Some("sqlite"), "distro", "sys");
pub static ref TERMINAL_PROCESS_ID: ProcessId = ProcessId::new(Some("terminal"), "terminal", "sys");
pub static ref TIMER_PROCESS_ID: ProcessId = ProcessId::new(Some("timer"), "distro", "sys");
pub static ref VFS_PROCESS_ID: ProcessId = ProcessId::new(Some("vfs"), "distro", "sys");
pub static ref STATE_PROCESS_ID: ProcessId = ProcessId::new(Some("state"), "distro", "sys");
pub static ref KV_PROCESS_ID: ProcessId = ProcessId::new(Some("kv"), "distro", "sys");
pub static ref SQLITE_PROCESS_ID: ProcessId = ProcessId::new(Some("sqlite"), "distro", "sys");
}
//
@ -1723,6 +1725,8 @@ pub enum VfsError {
NotFound { path: String },
#[error("Creating directory failed at path: {path}: {error}")]
CreateDirError { path: String, error: String },
#[error("Other error: {error}")]
Other { error: String },
}
impl VfsError {
@ -1737,6 +1741,7 @@ impl VfsError {
VfsError::BadJson { .. } => "NoJson",
VfsError::NotFound { .. } => "NotFound",
VfsError::CreateDirError { .. } => "CreateDirError",
VfsError::Other { .. } => "Other",
}
}
}
@ -2072,3 +2077,53 @@ impl KnsUpdate {
self.ports.get(protocol)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum FdManagerRequest {
/// other process -> fd_manager
/// must send this to fd_manager to get an initial fds_limit
RequestFdsLimit,
/// other process -> fd_manager
/// send this to notify fd_manager that limit was hit,
/// which may or may not be reacted to
FdsLimitHit,
/// fd_manager -> other process
FdsLimit(u64),
/// administrative
UpdateMaxFdsAsFractionOfUlimitPercentage(u64),
/// administrative
UpdateUpdateUlimitSecs(u64),
/// administrative
UpdateCullFractionDenominator(u64),
/// get a `HashMap` of all `ProcessId`s to their number of allocated file descriptors.
GetState,
/// get the `u64` number of file descriptors allocated to `ProcessId`.
GetProcessFdLimit(ProcessId),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum FdManagerResponse {
/// response to [`FdManagerRequest::GetState`]
GetState(HashMap<ProcessId, FdsLimit>),
/// response to [`FdManagerRequest::GetProcessFdLimit`]
GetProcessFdLimit(u64),
}
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct FdsLimit {
pub limit: u64,
pub hit_count: u64,
}
#[derive(Debug, Error)]
pub enum FdManagerError {
#[error("fd_manager: received a non-Request message")]
NotARequest,
#[error("fd_manager: received a non-FdManangerRequest")]
BadRequest,
#[error("fd_manager: received a FdManagerRequest::FdsLimit, but I am the one who sets limits")]
FdManagerWasSentLimit,
}