Merge pull request #567 from kinode-dao/release-candidate

Release candidate 0.9.5
This commit is contained in:
doria 2024-10-11 05:29:37 +09:00 committed by GitHub
commit 76c312acd6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 1545 additions and 459 deletions

7
Cargo.lock generated
View File

@ -3639,7 +3639,7 @@ dependencies = [
[[package]] [[package]]
name = "kinode" name = "kinode"
version = "0.9.4" version = "0.9.5"
dependencies = [ dependencies = [
"aes-gcm", "aes-gcm",
"alloy 0.2.1", "alloy 0.2.1",
@ -3665,6 +3665,7 @@ dependencies = [
"kit", "kit",
"lazy_static", "lazy_static",
"lib", "lib",
"libc",
"nohash-hasher", "nohash-hasher",
"open", "open",
"public-ip", "public-ip",
@ -3700,7 +3701,7 @@ dependencies = [
[[package]] [[package]]
name = "kinode_lib" name = "kinode_lib"
version = "0.9.4" version = "0.9.5"
dependencies = [ dependencies = [
"lib", "lib",
] ]
@ -3823,7 +3824,7 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67"
[[package]] [[package]]
name = "lib" name = "lib"
version = "0.9.4" version = "0.9.5"
dependencies = [ dependencies = [
"alloy 0.2.1", "alloy 0.2.1",
"kit", "kit",

View File

@ -1,7 +1,7 @@
[package] [package]
name = "kinode_lib" name = "kinode_lib"
authors = ["KinodeDAO"] authors = ["KinodeDAO"]
version = "0.9.4" version = "0.9.5"
edition = "2021" edition = "2021"
description = "A general-purpose sovereign cloud computing platform" description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org" homepage = "https://kinode.org"

47
Dockerfile.buildbase Normal file
View File

@ -0,0 +1,47 @@
FROM ubuntu:22.04
# Set environment variables to avoid interactive dialog from APT
ENV DEBIAN_FRONTEND=noninteractive
ENV NVM_DIR=/root/.nvm
# Install all necessary packages in one layer and clean up in the same layer
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
pkg-config \
ca-certificates \
libssl-dev \
cmake \
llvm-dev \
libclang-dev \
clang \
curl \
git \
python3 \
&& update-ca-certificates \
&& rm -rf /var/lib/apt/lists/*
# Install Rust and wasm tools
RUN curl https://sh.rustup.rs -sSf | sh -s -- -y \
&& . $HOME/.cargo/env \
&& rustup install nightly \
&& rustup target add wasm32-wasi \
&& rustup target add wasm32-wasi --toolchain nightly \
&& rustup target add wasm32-wasip1 \
&& rustup target add wasm32-wasip1 --toolchain nightly \
&& cargo install wasm-tools \
&& cargo install cargo-wasi \
&& rm -rf ~/.cargo/git \
&& rm -rf ~/.cargo/registry
# Install NVM, Node.js
RUN curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.0/install.sh | bash \
&& . "$NVM_DIR/nvm.sh" \
&& nvm install node \
&& nvm use node
# Set up environment variables
ENV DEBIAN_FRONTEND=dialog \
PATH="/root/.nvm/versions/node/$(node -v)/bin:${PATH}"
# Set the default command to bash
CMD ["bash"]

14
Dockerfile.buildruntime Normal file
View File

@ -0,0 +1,14 @@
FROM nick1udwig/buildbase:latest
ARG DOCKER_BUILD_IMAGE_VERSION=latest
ENV NVM_DIR=/root/.nvm \
PATH="/root/.nvm/versions/node/$(node -v)/bin:${PATH}" \
DOCKER_BUILD_IMAGE_VERSION=$DOCKER_BUILD_IMAGE_VERSION
# Bind readonly & copy files in to avoid modifying host files
WORKDIR /input
# Set the default command to run the build script
# TODO: once build is idempotent, remove the `rm -rf` line
CMD ["/bin/bash", "-c", ". ~/.bashrc && . ~/.cargo/env && . $NVM_DIR/nvm.sh && rm -rf target/ kinode/packages/*/pkg/*wasm kinode/packages/*/*/target/ kinode/packages/*/pkg/api.zip kinode/packages/*/*/wit kinode/packages/app_store/pkg/ui kinode/packages/homepage/pkg/ui kinode/src/register-ui/build && ./scripts/build-release.py && cp -r /tmp/kinode-release/* /output && chmod 664 /output/* && rm -rf target/ kinode/packages/*/pkg/*wasm kinode/packages/*/*/target/ kinode/packages/*/pkg/api.zip kinode/packages/*/*/wit kinode/packages/app_store/pkg/ui kinode/packages/homepage/pkg/ui kinode/src/register-ui/build"]

View File

@ -1,7 +1,7 @@
[package] [package]
name = "kinode" name = "kinode"
authors = ["KinodeDAO"] authors = ["KinodeDAO"]
version = "0.9.4" version = "0.9.5"
edition = "2021" edition = "2021"
description = "A general-purpose sovereign cloud computing platform" description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org" homepage = "https://kinode.org"
@ -62,6 +62,7 @@ indexmap = "2.4"
jwt = "0.16" jwt = "0.16"
lib = { path = "../lib" } lib = { path = "../lib" }
lazy_static = "1.4.0" lazy_static = "1.4.0"
libc = "0.2"
nohash-hasher = "0.2.0" nohash-hasher = "0.2.0"
open = "5.1.4" open = "5.1.4"
public-ip = "0.2.2" public-ip = "0.2.2"

View File

@ -58,7 +58,7 @@ fn output_reruns(dir: &Path) {
} }
} }
fn untar_gz_file(path: &Path, dest: &Path) -> std::io::Result<()> { fn _untar_gz_file(path: &Path, dest: &Path) -> std::io::Result<()> {
// Open the .tar.gz file // Open the .tar.gz file
let tar_gz = File::open(path)?; let tar_gz = File::open(path)?;
let tar_gz_reader = BufReader::new(tar_gz); let tar_gz_reader = BufReader::new(tar_gz);
@ -222,5 +222,13 @@ fn main() -> anyhow::Result<()> {
let bootstrapped_processes_path = target_dir.join("bootstrapped_processes.rs"); let bootstrapped_processes_path = target_dir.join("bootstrapped_processes.rs");
fs::write(&bootstrapped_processes_path, bootstrapped_processes)?; fs::write(&bootstrapped_processes_path, bootstrapped_processes)?;
let version = if let Ok(version) = std::env::var("DOCKER_BUILD_IMAGE_VERSION") {
// embed the DOCKER_BUILD_IMAGE_VERSION
version
} else {
"none".to_string()
};
println!("cargo:rustc-env=DOCKER_BUILD_IMAGE_VERSION={version}");
Ok(()) Ok(())
} }

View File

@ -184,6 +184,8 @@ interface downloads {
auto-update(auto-update-request), auto-update(auto-update-request),
/// Notify that a download is complete /// Notify that a download is complete
download-complete(download-complete-request), download-complete(download-complete-request),
/// Auto-update-download complete
auto-download-complete(auto-download-complete-request),
/// Get files for a package /// Get files for a package
get-files(option<package-id>), get-files(option<package-id>),
/// Remove a file /// Remove a file
@ -243,6 +245,12 @@ interface downloads {
err: option<download-error>, err: option<download-error>,
} }
/// Request for an auto-download complete
record auto-download-complete-request {
download-info: download-complete-request,
manifest-hash: string,
}
/// Represents a hash mismatch error /// Represents a hash mismatch error
record hash-mismatch { record hash-mismatch {
desired: string, desired: string,

View File

@ -474,7 +474,10 @@ fn serve_paths(
&our.node().to_string(), &our.node().to_string(),
) { ) {
Ok(_) => { Ok(_) => {
println!("successfully installed package: {:?}", process_package_id); println!(
"successfully installed {}:{}",
process_package_id.package_name, process_package_id.publisher_node
);
Ok((StatusCode::CREATED, None, vec![])) Ok((StatusCode::CREATED, None, vec![]))
} }
Err(e) => Ok(( Err(e) => Ok((

View File

@ -30,7 +30,7 @@
//! It delegates these responsibilities to the downloads and chain processes respectively. //! It delegates these responsibilities to the downloads and chain processes respectively.
//! //!
use crate::kinode::process::downloads::{ use crate::kinode::process::downloads::{
DownloadCompleteRequest, DownloadResponses, ProgressUpdate, AutoDownloadCompleteRequest, DownloadCompleteRequest, DownloadResponses, ProgressUpdate,
}; };
use crate::kinode::process::main::{ use crate::kinode::process::main::{
ApisResponse, GetApiResponse, InstallPackageRequest, InstallResponse, LocalRequest, ApisResponse, GetApiResponse, InstallPackageRequest, InstallResponse, LocalRequest,
@ -65,6 +65,7 @@ pub enum Req {
LocalRequest(LocalRequest), LocalRequest(LocalRequest),
Progress(ProgressUpdate), Progress(ProgressUpdate),
DownloadComplete(DownloadCompleteRequest), DownloadComplete(DownloadCompleteRequest),
AutoDownloadComplete(AutoDownloadCompleteRequest),
Http(http::server::HttpServerRequest), Http(http::server::HttpServerRequest),
} }
@ -161,6 +162,40 @@ fn handle_message(
}, },
); );
} }
Req::AutoDownloadComplete(req) => {
if !message.is_local(&our) {
return Err(anyhow::anyhow!(
"auto download complete from non-local node"
));
}
// auto_install case:
// the downloads process has given us the new package manifest's
// capability hashes, and the old package's capability hashes.
// we can use these to determine if the new package has the same
// capabilities as the old one, and if so, auto-install it.
let manifest_hash = req.manifest_hash;
let package_id = req.download_info.package_id;
let version_hash = req.download_info.version_hash;
if let Some(package) = state.packages.get(&package_id.clone().to_process_lib()) {
if package.manifest_hash == Some(manifest_hash) {
print_to_terminal(1, "auto_install:main, manifest_hash match");
if let Err(e) =
utils::install(&package_id, None, &version_hash, state, &our.node)
{
print_to_terminal(1, &format!("error auto_installing package: {e}"));
} else {
println!(
"auto_installed update for package: {:?}",
&package_id.to_process_lib()
);
}
} else {
print_to_terminal(1, "auto_install:main, manifest_hash do not match");
}
}
}
Req::DownloadComplete(req) => { Req::DownloadComplete(req) => {
if !message.is_local(&our) { if !message.is_local(&our) {
return Err(anyhow::anyhow!("download complete from non-local node")); return Err(anyhow::anyhow!("download complete from non-local node"));
@ -182,41 +217,6 @@ fn handle_message(
.unwrap(), .unwrap(),
}, },
); );
// auto_install case:
// the downloads process has given us the new package manifest's
// capability hashes, and the old package's capability hashes.
// we can use these to determine if the new package has the same
// capabilities as the old one, and if so, auto-install it.
if let Some(context) = message.context() {
let manifest_hash = String::from_utf8(context.to_vec())?;
if let Some(package) =
state.packages.get(&req.package_id.clone().to_process_lib())
{
if package.manifest_hash == Some(manifest_hash) {
print_to_terminal(1, "auto_install:main, manifest_hash match");
if let Err(e) = utils::install(
&req.package_id,
None,
&req.version_hash,
state,
&our.node,
) {
print_to_terminal(
1,
&format!("error auto_installing package: {e}"),
);
} else {
println!(
"auto_installed update for package: {:?}",
&req.package_id.to_process_lib()
);
}
} else {
print_to_terminal(1, "auto_install:main, manifest_hash do not match");
}
}
}
} }
} }
} else { } else {
@ -261,8 +261,8 @@ fn handle_local_request(
match utils::install(&package_id, metadata, &version_hash, state, &our.node) { match utils::install(&package_id, metadata, &version_hash, state, &our.node) {
Ok(()) => { Ok(()) => {
println!( println!(
"successfully installed package: {:?}", "successfully installed {}:{}",
&package_id.to_process_lib() package_id.package_name, package_id.publisher_node
); );
LocalResponse::InstallResponse(InstallResponse::Success) LocalResponse::InstallResponse(InstallResponse::Success)
} }

View File

@ -42,9 +42,9 @@
//! mechanism is implemented in the FT worker for improved modularity and performance. //! mechanism is implemented in the FT worker for improved modularity and performance.
//! //!
use crate::kinode::process::downloads::{ use crate::kinode::process::downloads::{
AutoUpdateRequest, DirEntry, DownloadCompleteRequest, DownloadError, DownloadRequests, AutoDownloadCompleteRequest, AutoUpdateRequest, DirEntry, DownloadCompleteRequest,
DownloadResponses, Entry, FileEntry, HashMismatch, LocalDownloadRequest, RemoteDownloadRequest, DownloadError, DownloadRequests, DownloadResponses, Entry, FileEntry, HashMismatch,
RemoveFileRequest, LocalDownloadRequest, RemoteDownloadRequest, RemoveFileRequest,
}; };
use std::{collections::HashSet, io::Read, str::FromStr}; use std::{collections::HashSet, io::Read, str::FromStr};
@ -245,7 +245,7 @@ fn handle_message(
// if we have a pending auto_install, forward that context to the main process. // if we have a pending auto_install, forward that context to the main process.
// it will check if the caps_hashes match (no change in capabilities), and auto_install if it does. // it will check if the caps_hashes match (no change in capabilities), and auto_install if it does.
let context = if auto_updates.remove(&( let manifest_hash = if auto_updates.remove(&(
req.package_id.clone().to_process_lib(), req.package_id.clone().to_process_lib(),
req.version_hash.clone(), req.version_hash.clone(),
)) { )) {
@ -253,7 +253,7 @@ fn handle_message(
req.package_id.clone().to_process_lib(), req.package_id.clone().to_process_lib(),
req.version_hash.clone(), req.version_hash.clone(),
) { ) {
Ok(manifest_hash) => Some(manifest_hash.as_bytes().to_vec()), Ok(manifest_hash) => Some(manifest_hash),
Err(e) => { Err(e) => {
print_to_terminal( print_to_terminal(
1, 1,
@ -267,13 +267,26 @@ fn handle_message(
}; };
// pushed to UI via websockets // pushed to UI via websockets
let mut request = Request::to(("our", "main", "app_store", "sys")) Request::to(("our", "main", "app_store", "sys"))
.body(serde_json::to_vec(&req)?); .body(serde_json::to_vec(&req)?)
.send()?;
if let Some(ctx) = context { // trigger auto-update install trigger to main:app_store:sys
request = request.context(ctx); if let Some(manifest_hash) = manifest_hash {
let auto_download_complete_req = AutoDownloadCompleteRequest {
download_info: req.clone(),
manifest_hash,
};
print_to_terminal(
1,
&format!(
"auto_update download complete: triggering install on main:app_store:sys"
),
);
Request::to(("our", "main", "app_store", "sys"))
.body(serde_json::to_vec(&auto_download_complete_req)?)
.send()?;
} }
request.send()?;
} }
DownloadRequests::GetFiles(maybe_id) => { DownloadRequests::GetFiles(maybe_id) => {
// if not local, throw to the boonies. // if not local, throw to the boonies.

View File

@ -6,7 +6,7 @@ interface PackageSelectorProps {
} }
const PackageSelector: React.FC<PackageSelectorProps> = ({ onPackageSelect }) => { const PackageSelector: React.FC<PackageSelectorProps> = ({ onPackageSelect }) => {
const { installed } = useAppsStore(); const { installed, fetchInstalled } = useAppsStore();
const [selectedPackage, setSelectedPackage] = useState<string>(""); const [selectedPackage, setSelectedPackage] = useState<string>("");
const [customPackage, setCustomPackage] = useState<string>(""); const [customPackage, setCustomPackage] = useState<string>("");
const [isCustomPackageSelected, setIsCustomPackageSelected] = useState(false); const [isCustomPackageSelected, setIsCustomPackageSelected] = useState(false);
@ -18,6 +18,10 @@ const PackageSelector: React.FC<PackageSelectorProps> = ({ onPackageSelect }) =>
} }
}, [selectedPackage, onPackageSelect]); }, [selectedPackage, onPackageSelect]);
useEffect(() => {
fetchInstalled();
}, []);
const handlePackageChange = (e: React.ChangeEvent<HTMLSelectElement>) => { const handlePackageChange = (e: React.ChangeEvent<HTMLSelectElement>) => {
const value = e.target.value; const value = e.target.value;
if (value === "custom") { if (value === "custom") {

View File

@ -1,11 +1,10 @@
import React, { useState, useEffect, useCallback, useMemo } from "react"; import React, { useState, useEffect, useCallback, useMemo } from "react";
import { useParams, useNavigate } from "react-router-dom"; import { useParams } from "react-router-dom";
import { FaDownload, FaSpinner, FaChevronDown, FaChevronUp, FaRocket, FaTrash, FaPlay } from "react-icons/fa"; import { FaDownload, FaSpinner, FaChevronDown, FaChevronUp, FaRocket, FaTrash, FaPlay } from "react-icons/fa";
import useAppsStore from "../store"; import useAppsStore from "../store";
import { MirrorSelector } from '../components'; import { MirrorSelector } from '../components';
export default function DownloadPage() { export default function DownloadPage() {
const navigate = useNavigate();
const { id } = useParams<{ id: string }>(); const { id } = useParams<{ id: string }>();
const { const {
listings, listings,
@ -28,6 +27,9 @@ export default function DownloadPage() {
const [isMirrorOnline, setIsMirrorOnline] = useState<boolean | null>(null); const [isMirrorOnline, setIsMirrorOnline] = useState<boolean | null>(null);
const [showCapApproval, setShowCapApproval] = useState(false); const [showCapApproval, setShowCapApproval] = useState(false);
const [manifest, setManifest] = useState<any>(null); const [manifest, setManifest] = useState<any>(null);
const [isInstalling, setIsInstalling] = useState(false);
const [isCheckingLaunch, setIsCheckingLaunch] = useState(false);
const [launchPath, setLaunchPath] = useState<string | null>(null);
const app = useMemo(() => listings[id || ""], [listings, id]); const app = useMemo(() => listings[id || ""], [listings, id]);
const appDownloads = useMemo(() => downloads[id || ""] || [], [downloads, id]); const appDownloads = useMemo(() => downloads[id || ""] || [], [downloads, id]);
@ -101,6 +103,36 @@ export default function DownloadPage() {
return versionData ? installedApp.our_version_hash === versionData.hash : false; return versionData ? installedApp.our_version_hash === versionData.hash : false;
}, [app, selectedVersion, installedApp, sortedVersions]); }, [app, selectedVersion, installedApp, sortedVersions]);
const checkLaunchPath = useCallback(() => {
if (!app) return;
setIsCheckingLaunch(true);
const appId = `${app.package_id.package_name}:${app.package_id.publisher_node}`;
fetchHomepageApps().then(() => {
const path = getLaunchUrl(appId);
setLaunchPath(path);
setIsCheckingLaunch(false);
if (path) {
setIsInstalling(false);
}
});
}, [app, fetchHomepageApps, getLaunchUrl]);
useEffect(() => {
if (isInstalling) {
const checkInterval = setInterval(checkLaunchPath, 500);
const timeout = setTimeout(() => {
clearInterval(checkInterval);
setIsInstalling(false);
setIsCheckingLaunch(false);
}, 5000);
return () => {
clearInterval(checkInterval);
clearTimeout(timeout);
};
}
}, [isInstalling, checkLaunchPath]);
const handleDownload = useCallback(() => { const handleDownload = useCallback(() => {
if (!id || !selectedMirror || !app || !selectedVersion) return; if (!id || !selectedMirror || !app || !selectedVersion) return;
const versionData = sortedVersions.find(v => v.version === selectedVersion); const versionData = sortedVersions.find(v => v.version === selectedVersion);
@ -130,36 +162,87 @@ export default function DownloadPage() {
} }
}, [id, app, appDownloads]); }, [id, app, appDownloads]);
const canDownload = useMemo(() => {
return selectedMirror && (isMirrorOnline === true || selectedMirror.startsWith('http')) && !isDownloading && !isDownloaded;
}, [selectedMirror, isMirrorOnline, isDownloading, isDownloaded]);
const confirmInstall = useCallback(() => { const confirmInstall = useCallback(() => {
if (!id || !selectedVersion) return; if (!id || !selectedVersion) return;
const versionData = sortedVersions.find(v => v.version === selectedVersion); const versionData = sortedVersions.find(v => v.version === selectedVersion);
if (versionData) { if (versionData) {
setIsInstalling(true);
setLaunchPath(null);
installApp(id, versionData.hash).then(() => { installApp(id, versionData.hash).then(() => {
fetchData(id);
setShowCapApproval(false); setShowCapApproval(false);
setManifest(null); setManifest(null);
fetchData(id);
}); });
} }
}, [id, selectedVersion, sortedVersions, installApp, fetchData]); }, [id, selectedVersion, sortedVersions, installApp, fetchData]);
const handleLaunch = useCallback(() => { const handleLaunch = useCallback(() => {
if (app) { if (launchPath) {
const launchUrl = getLaunchUrl(`${app.package_id.package_name}:${app.package_id.publisher_node}`); window.location.href = launchPath;
if (launchUrl) {
window.location.href = launchUrl;
} }
} }, [launchPath]);
}, [app, getLaunchUrl]);
const canLaunch = useMemo(() => { const canLaunch = useMemo(() => {
if (!app) return false; if (!app) return false;
return !!getLaunchUrl(`${app.package_id.package_name}:${app.package_id.publisher_node}`); return !!getLaunchUrl(`${app.package_id.package_name}:${app.package_id.publisher_node}`);
}, [app, getLaunchUrl]); }, [app, getLaunchUrl]);
const canDownload = useMemo(() => {
return selectedMirror && (isMirrorOnline === true || selectedMirror.startsWith('http')) && !isDownloading && !isDownloaded;
}, [selectedMirror, isMirrorOnline, isDownloading, isDownloaded]);
const renderActionButton = () => {
if (isCurrentVersionInstalled || launchPath) {
return (
<button className="action-button installed-button" disabled>
<FaRocket /> Installed
</button>
);
}
if (isInstalling || isCheckingLaunch) {
return (
<button className="action-button installing-button" disabled>
<FaSpinner className="fa-spin" /> Installing...
</button>
);
}
if (isDownloaded) {
return (
<button
onClick={() => {
const versionData = sortedVersions.find(v => v.version === selectedVersion);
if (versionData) {
handleInstall(versionData.version, versionData.hash);
}
}}
className="action-button install-button"
>
<FaRocket /> Install
</button>
);
}
return (
<button
onClick={handleDownload}
disabled={!canDownload}
className="action-button download-button"
>
{isDownloading ? (
<>
<FaSpinner className="fa-spin" /> Downloading... {downloadProgress}%
</>
) : (
<>
<FaDownload /> Download
</>
)}
</button>
);
};
if (!app) { if (!app) {
return <div className="downloads-page"><h4>Loading app details...</h4></div>; return <div className="downloads-page"><h4>Loading app details...</h4></div>;
} }
@ -176,15 +259,22 @@ export default function DownloadPage() {
<p className="app-id">{`${app.package_id.package_name}.${app.package_id.publisher_node}`}</p> <p className="app-id">{`${app.package_id.package_name}.${app.package_id.publisher_node}`}</p>
</div> </div>
</div> </div>
{installedApp && ( {launchPath ? (
<button <button
onClick={handleLaunch} onClick={handleLaunch}
className="launch-button" className="launch-button"
disabled={!canLaunch}
> >
<FaPlay /> {canLaunch ? 'Launch' : 'No UI found for app'} <FaPlay /> Launch
</button> </button>
)} ) : isInstalling || isCheckingLaunch ? (
<button className="launch-button" disabled>
<FaSpinner className="fa-spin" /> Checking...
</button>
) : installedApp ? (
<button className="launch-button" disabled>
No UI found for app
</button>
) : null}
</div> </div>
<p className="app-description">{app.metadata?.description}</p> <p className="app-description">{app.metadata?.description}</p>
@ -207,39 +297,7 @@ export default function DownloadPage() {
onMirrorSelect={handleMirrorSelect} onMirrorSelect={handleMirrorSelect}
/> />
{isCurrentVersionInstalled ? ( {renderActionButton()}
<button className="action-button installed-button" disabled>
<FaRocket /> Installed
</button>
) : isDownloaded ? (
<button
onClick={() => {
const versionData = sortedVersions.find(v => v.version === selectedVersion);
if (versionData) {
handleInstall(versionData.version, versionData.hash);
}
}}
className="action-button install-button"
>
<FaRocket /> Install
</button>
) : (
<button
onClick={handleDownload}
disabled={!canDownload}
className="action-button download-button"
>
{isDownloading ? (
<>
<FaSpinner className="fa-spin" /> Downloading... {downloadProgress}%
</>
) : (
<>
<FaDownload /> Download
</>
)}
</button>
)}
</div> </div>
<div className="my-downloads"> <div className="my-downloads">

View File

@ -12,7 +12,7 @@ const NAME_INVALID = "Package name must contain only valid characters (a-z, 0-9,
export default function PublishPage() { export default function PublishPage() {
const { openConnectModal } = useConnectModal(); const { openConnectModal } = useConnectModal();
const { ourApps, fetchOurApps, installed, downloads } = useAppsStore(); const { ourApps, fetchOurApps, downloads } = useAppsStore();
const publicClient = usePublicClient(); const publicClient = usePublicClient();
const { address, isConnected, isConnecting } = useAccount(); const { address, isConnected, isConnecting } = useAccount();

View File

@ -218,12 +218,6 @@ const useAppsStore = create<AppsStore>()((set, get) => ({
}); });
if (res.status === HTTP_STATUS.CREATED) { if (res.status === HTTP_STATUS.CREATED) {
await get().fetchInstalled(); await get().fetchInstalled();
// hacky: a small delay (500ms) before fetching homepage apps
// to give the app time to add itself to the homepage
// might make sense to add more state and do retry logic instead.
await new Promise(resolve => setTimeout(resolve, 500));
await get().fetchHomepageApps(); await get().fetchHomepageApps();
} }
} catch (error) { } catch (error) {

View File

@ -5,12 +5,7 @@
"on_exit": "Restart", "on_exit": "Restart",
"request_networking": true, "request_networking": true,
"request_capabilities": [ "request_capabilities": [
"net:distro:sys", "chess:chess:sys",
"filesystem:distro:sys",
"http_server:distro:sys",
"http_client:distro:sys",
"kernel:distro:sys",
"vfs:distro:sys",
"eth:distro:sys", "eth:distro:sys",
{ {
"process": "eth:distro:sys", "process": "eth:distro:sys",
@ -18,10 +13,16 @@
"root": true "root": true
} }
}, },
"sqlite:distro:sys", "fd_manager:distro:sys",
"kv:distro:sys", "filesystem:distro:sys",
"chess:chess:sys", "http_server:distro:sys",
"http_client:distro:sys",
"kernel:distro:sys",
"kns_indexer:kns_indexer:sys", "kns_indexer:kns_indexer:sys",
"kv:distro:sys",
"net:distro:sys",
"sqlite:distro:sys",
"vfs:distro:sys",
{ {
"process": "vfs:distro:sys", "process": "vfs:distro:sys",
"params": { "params": {
@ -30,6 +31,6 @@
} }
], ],
"grant_capabilities": [], "grant_capabilities": [],
"public": true "public": false
} }
] ]

358
kinode/src/fd_manager.rs Normal file
View File

@ -0,0 +1,358 @@
use lib::types::core::{
Address, FdManagerError, FdManagerRequest, FdManagerResponse, FdsLimit, KernelMessage, Message,
MessageReceiver, MessageSender, PrintSender, Printout, ProcessId, Request,
FD_MANAGER_PROCESS_ID,
};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
const DEFAULT_MAX_OPEN_FDS: u64 = 180;
const DEFAULT_FDS_AS_FRACTION_OF_ULIMIT_PERCENTAGE: u64 = 90;
const SYS_RESERVED_FDS: u64 = 30;
const DEFAULT_UPDATE_ULIMIT_SECS: u64 = 3600;
const _DEFAULT_CULL_FRACTION_DENOMINATOR: u64 = 2;
#[derive(Debug, Serialize, Deserialize)]
struct State {
fds_limits: HashMap<ProcessId, FdsLimit>,
mode: Mode,
max_fds: u64,
}
#[derive(Debug, Serialize, Deserialize)]
enum Mode {
/// don't update the max_fds except by user input
StaticMax,
/// check the system's ulimit periodically and update max_fds accordingly
DynamicMax {
max_fds_as_fraction_of_ulimit_percentage: u64,
update_ulimit_secs: u64,
},
}
impl State {
fn new(static_max_fds: Option<u64>) -> Self {
Self::default(static_max_fds)
}
fn default(static_max_fds: Option<u64>) -> Self {
Self {
fds_limits: HashMap::new(),
mode: Mode::default(static_max_fds),
max_fds: match static_max_fds {
Some(max) => max,
None => DEFAULT_MAX_OPEN_FDS,
},
}
}
fn update_max_fds_from_ulimit(&mut self, ulimit_max_fds: u64) {
let Mode::DynamicMax {
ref max_fds_as_fraction_of_ulimit_percentage,
..
} = self.mode
else {
return;
};
let min_ulimit = SYS_RESERVED_FDS + 10;
if ulimit_max_fds <= min_ulimit {
panic!(
"fatal: ulimit from system ({ulimit_max_fds}) is too small to operate Kinode. Please run Kinode with a larger ulimit (at least {min_ulimit}).",
);
}
self.max_fds =
ulimit_max_fds * max_fds_as_fraction_of_ulimit_percentage / 100 - SYS_RESERVED_FDS;
}
async fn update_all_fds_limits(&mut self, our_node: &str, send_to_loop: &MessageSender) {
let weights = self
.fds_limits
.values()
.map(|limit| limit.hit_count)
.sum::<u64>();
let statically_allocated = self.max_fds as f64 / 2.0;
let per_process_unweighted =
statically_allocated / std::cmp::max(self.fds_limits.len() as u64, 1) as f64;
let per_process_weighted = statically_allocated / std::cmp::max(weights, 1) as f64;
for limit in self.fds_limits.values_mut() {
limit.limit = (per_process_unweighted + per_process_weighted * limit.hit_count as f64)
.floor() as u64;
}
send_all_fds_limits(our_node, send_to_loop, self).await;
}
}
impl Mode {
fn default(static_max_fds: Option<u64>) -> Self {
match static_max_fds {
Some(_) => Self::StaticMax,
None => Self::DynamicMax {
max_fds_as_fraction_of_ulimit_percentage:
DEFAULT_FDS_AS_FRACTION_OF_ULIMIT_PERCENTAGE,
update_ulimit_secs: DEFAULT_UPDATE_ULIMIT_SECS,
},
}
}
}
/// The fd_manager entrypoint.
pub async fn fd_manager(
our_node: Arc<String>,
send_to_loop: MessageSender,
send_to_terminal: PrintSender,
mut recv_from_loop: MessageReceiver,
static_max_fds: Option<u64>,
) -> anyhow::Result<()> {
let mut state = State::new(static_max_fds);
let mut interval = {
// in code block to release the reference into state
let Mode::DynamicMax {
ref update_ulimit_secs,
..
} = state.mode
else {
return Ok(());
};
tokio::time::interval(tokio::time::Duration::from_secs(*update_ulimit_secs))
};
loop {
tokio::select! {
Some(message) = recv_from_loop.recv() => {
match handle_message(
&our_node,
message,
&mut interval,
&mut state,
&send_to_loop,
).await {
Ok(Some(to_print)) => {
Printout::new(2, to_print).send(&send_to_terminal).await;
}
Err(e) => {
Printout::new(1, &format!("handle_message error: {e:?}"))
.send(&send_to_terminal)
.await;
}
_ => {}
}
}
_ = interval.tick() => {
let old_max_fds = state.max_fds;
match update_max_fds(&mut state).await {
Ok(new) => {
if new != old_max_fds {
state.update_all_fds_limits(our_node.as_str(), &send_to_loop).await;
}
}
Err(e) => Printout::new(1, &format!("update_max_fds error: {e:?}"))
.send(&send_to_terminal)
.await,
}
}
}
}
}
async fn handle_message(
our_node: &str,
km: KernelMessage,
_interval: &mut tokio::time::Interval,
state: &mut State,
send_to_loop: &MessageSender,
) -> anyhow::Result<Option<String>> {
let Message::Request(Request {
body,
expects_response,
..
}) = km.message
else {
return Err(FdManagerError::NotARequest.into());
};
let request: FdManagerRequest =
serde_json::from_slice(&body).map_err(|_e| FdManagerError::BadRequest)?;
let return_value = match request {
FdManagerRequest::RequestFdsLimit => {
// divide max_fds by number of processes requesting fds limits,
// then send each process its new limit
// TODO can weight different processes differently
state.fds_limits.insert(
km.source.process,
FdsLimit {
limit: 0,
hit_count: 1, // starts with 1 to give initial weight
},
);
state.update_all_fds_limits(our_node, &send_to_loop).await;
None
}
FdManagerRequest::FdsLimitHit => {
// sender process hit its fd limit
// react to this by incrementing hit count and
// re-weighting all processes' limits
state.fds_limits.get_mut(&km.source.process).map(|limit| {
limit.hit_count += 1;
});
state.update_all_fds_limits(our_node, &send_to_loop).await;
Some(format!("{} hit its fd limit", km.source.process))
}
FdManagerRequest::FdsLimit(_) => {
// should only send this, never receive it
return Err(FdManagerError::FdManagerWasSentLimit.into());
}
FdManagerRequest::UpdateMaxFdsAsFractionOfUlimitPercentage(new) => {
match state.mode {
Mode::DynamicMax {
ref mut max_fds_as_fraction_of_ulimit_percentage,
..
} => *max_fds_as_fraction_of_ulimit_percentage = new,
_ => return Err(FdManagerError::BadRequest.into()),
}
None
}
FdManagerRequest::UpdateUpdateUlimitSecs(new) => {
match state.mode {
Mode::DynamicMax {
ref mut update_ulimit_secs,
..
} => *update_ulimit_secs = new,
_ => return Err(FdManagerError::BadRequest.into()),
}
None
}
FdManagerRequest::UpdateCullFractionDenominator(_new) => {
// state.cull_fraction_denominator = new;
None
}
FdManagerRequest::GetState => {
if expects_response.is_some() {
KernelMessage::builder()
.id(km.id)
.source(km.target)
.target(km.rsvp.unwrap_or(km.source))
.message(Message::Response((
lib::core::Response {
body: serde_json::to_vec(&FdManagerResponse::GetState(
state.fds_limits.clone(),
))
.unwrap(),
inherit: false,
metadata: None,
capabilities: vec![],
},
None,
)))
.build()
.unwrap()
.send(send_to_loop)
.await;
}
Some(format!("fd_manager: {:?}", state))
}
FdManagerRequest::GetProcessFdLimit(process) => {
if expects_response.is_some() {
KernelMessage::builder()
.id(km.id)
.source(km.target)
.target(km.rsvp.unwrap_or(km.source))
.message(Message::Response((
lib::core::Response {
body: serde_json::to_vec(&FdManagerResponse::GetProcessFdLimit(
state
.fds_limits
.get(&process)
.map(|limit| limit.limit)
.unwrap_or(0),
))
.unwrap(),
inherit: false,
metadata: None,
capabilities: vec![],
},
None,
)))
.build()
.unwrap()
.send(send_to_loop)
.await;
}
None
}
};
Ok(return_value)
}
async fn update_max_fds(state: &mut State) -> anyhow::Result<u64> {
let ulimit_max_fds = get_max_fd_limit()
.map_err(|_| anyhow::anyhow!("Couldn't update max fd limit: ulimit failed"))?;
state.update_max_fds_from_ulimit(ulimit_max_fds);
Ok(ulimit_max_fds)
}
async fn send_all_fds_limits(our_node: &str, send_to_loop: &MessageSender, state: &State) {
for (process_id, limit) in &state.fds_limits {
KernelMessage::builder()
.id(rand::random())
.source((our_node, FD_MANAGER_PROCESS_ID.clone()))
.target((our_node, process_id.clone()))
.message(Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&FdManagerRequest::FdsLimit(limit.limit)).unwrap(),
metadata: None,
capabilities: vec![],
}))
.build()
.unwrap()
.send(send_to_loop)
.await;
}
}
fn get_max_fd_limit() -> anyhow::Result<u64> {
let mut rlim = libc::rlimit {
rlim_cur: 0, // Current limit
rlim_max: 0, // Maximum limit value
};
// RLIMIT_NOFILE is the resource indicating the maximum file descriptor number.
if unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut rlim) } == 0 {
Ok(rlim.rlim_cur as u64)
} else {
Err(anyhow::anyhow!("Failed to get the resource limit."))
}
}
pub async fn send_fd_manager_request_fds_limit(our: &Address, send_to_loop: &MessageSender) {
let message = Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&FdManagerRequest::RequestFdsLimit).unwrap(),
metadata: None,
capabilities: vec![],
});
send_to_fd_manager(our, message, send_to_loop).await
}
pub async fn send_fd_manager_hit_fds_limit(our: &Address, send_to_loop: &MessageSender) {
let message = Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&FdManagerRequest::FdsLimitHit).unwrap(),
metadata: None,
capabilities: vec![],
});
send_to_fd_manager(our, message, send_to_loop).await
}
async fn send_to_fd_manager(our: &Address, message: Message, send_to_loop: &MessageSender) {
KernelMessage::builder()
.id(rand::random())
.source(our.clone())
.target((our.node.clone(), FD_MANAGER_PROCESS_ID.clone()))
.message(message)
.build()
.unwrap()
.send(send_to_loop)
.await
}

View File

@ -1,8 +1,10 @@
use crate::vfs::UniqueQueue;
use dashmap::DashMap; use dashmap::DashMap;
use lib::types::core::{ use lib::types::core::{
Address, CapMessage, CapMessageSender, Capability, KernelMessage, KvAction, KvError, KvRequest, Address, CapMessage, CapMessageSender, Capability, FdManagerRequest, KernelMessage, KvAction,
KvResponse, LazyLoadBlob, Message, MessageReceiver, MessageSender, PackageId, PrintSender, KvError, KvRequest, KvResponse, LazyLoadBlob, Message, MessageReceiver, MessageSender,
Printout, ProcessId, Request, Response, KV_PROCESS_ID, PackageId, PrintSender, Printout, ProcessId, Request, Response, FD_MANAGER_PROCESS_ID,
KV_PROCESS_ID,
}; };
use rocksdb::OptimisticTransactionDB; use rocksdb::OptimisticTransactionDB;
use std::{ use std::{
@ -11,6 +13,81 @@ use std::{
}; };
use tokio::{fs, sync::Mutex}; use tokio::{fs, sync::Mutex};
#[derive(Clone)]
struct KvState {
our: Arc<Address>,
kv_path: Arc<String>,
send_to_loop: MessageSender,
send_to_terminal: PrintSender,
open_kvs: Arc<DashMap<(PackageId, String), OptimisticTransactionDB>>,
/// access order of dbs, used to cull if we hit the fds limit
access_order: Arc<Mutex<UniqueQueue<(PackageId, String)>>>,
txs: Arc<DashMap<u64, Vec<(KvAction, Option<Vec<u8>>)>>>,
fds_limit: u64,
}
impl KvState {
pub fn new(
our: Address,
send_to_terminal: PrintSender,
send_to_loop: MessageSender,
home_directory_path: String,
) -> Self {
Self {
our: Arc::new(our),
kv_path: Arc::new(format!("{home_directory_path}/kv")),
send_to_loop,
send_to_terminal,
open_kvs: Arc::new(DashMap::new()),
access_order: Arc::new(Mutex::new(UniqueQueue::new())),
txs: Arc::new(DashMap::new()),
fds_limit: 10,
}
}
pub async fn open_db(&mut self, package_id: PackageId, db: String) -> Result<(), KvError> {
let key = (package_id.clone(), db.clone());
if self.open_kvs.contains_key(&key) {
let mut access_order = self.access_order.lock().await;
access_order.remove(&key);
access_order.push_back(key);
return Ok(());
}
if self.open_kvs.len() as u64 >= self.fds_limit {
// close least recently used db
let key = self.access_order.lock().await.pop_front().unwrap();
self.remove_db(key.0, key.1).await;
}
let db_path = format!("{}/{}/{}", self.kv_path.as_str(), package_id, db);
fs::create_dir_all(&db_path).await?;
self.open_kvs.insert(
key,
OptimisticTransactionDB::open_default(&db_path).map_err(rocks_to_kv_err)?,
);
let mut access_order = self.access_order.lock().await;
access_order.push_back((package_id, db));
Ok(())
}
pub async fn remove_db(&mut self, package_id: PackageId, db: String) {
self.open_kvs.remove(&(package_id.clone(), db.to_string()));
let mut access_order = self.access_order.lock().await;
access_order.remove(&(package_id, db));
}
pub async fn remove_least_recently_used_dbs(&mut self, n: u64) {
for _ in 0..n {
let mut lock = self.access_order.lock().await;
let key = lock.pop_front().unwrap();
drop(lock);
self.remove_db(key.0, key.1).await;
}
}
}
pub async fn kv( pub async fn kv(
our_node: Arc<String>, our_node: Arc<String>,
send_to_loop: MessageSender, send_to_loop: MessageSender,
@ -19,31 +96,44 @@ pub async fn kv(
send_to_caps_oracle: CapMessageSender, send_to_caps_oracle: CapMessageSender,
home_directory_path: String, home_directory_path: String,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let kv_path = Arc::new(format!("{home_directory_path}/kv")); let our = Address::new(our_node.as_str(), KV_PROCESS_ID.clone());
if let Err(e) = fs::create_dir_all(&*kv_path).await {
crate::fd_manager::send_fd_manager_request_fds_limit(&our, &send_to_loop).await;
let mut state = KvState::new(our, send_to_terminal, send_to_loop, home_directory_path);
if let Err(e) = fs::create_dir_all(state.kv_path.as_str()).await {
panic!("failed creating kv dir! {e:?}"); panic!("failed creating kv dir! {e:?}");
} }
let open_kvs: Arc<DashMap<(PackageId, String), OptimisticTransactionDB>> =
Arc::new(DashMap::new());
let txs: Arc<DashMap<u64, Vec<(KvAction, Option<Vec<u8>>)>>> = Arc::new(DashMap::new());
let process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> = HashMap::new(); let process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> = HashMap::new();
while let Some(km) = recv_from_loop.recv().await { while let Some(km) = recv_from_loop.recv().await {
if *our_node != km.source.node { if state.our.node != km.source.node {
Printout::new( Printout::new(
1, 1,
format!( format!(
"kv: got request from {}, but requests must come from our node {our_node}", "kv: got request from {}, but requests must come from our node {}",
km.source.node km.source.node, state.our.node,
), ),
) )
.send(&send_to_terminal) .send(&state.send_to_terminal)
.await; .await;
continue; continue;
} }
if km.source.process == *FD_MANAGER_PROCESS_ID {
if let Err(e) = handle_fd_request(km, &mut state).await {
Printout::new(
1,
format!("kv: got request from fd_manager that errored: {e:?}"),
)
.send(&state.send_to_terminal)
.await;
};
continue;
}
let queue = process_queues let queue = process_queues
.get(&km.source.process) .get(&km.source.process)
.cloned() .cloned()
@ -55,13 +145,8 @@ pub async fn kv(
} }
// clone Arcs // clone Arcs
let our_node = our_node.clone(); let mut state = state.clone();
let send_to_loop = send_to_loop.clone();
let send_to_terminal = send_to_terminal.clone();
let send_to_caps_oracle = send_to_caps_oracle.clone(); let send_to_caps_oracle = send_to_caps_oracle.clone();
let open_kvs = open_kvs.clone();
let txs = txs.clone();
let kv_path = kv_path.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut queue_lock = queue.lock().await; let mut queue_lock = queue.lock().await;
@ -69,23 +154,13 @@ pub async fn kv(
let (km_id, km_rsvp) = let (km_id, km_rsvp) =
(km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone())); (km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone()));
if let Err(e) = handle_request( if let Err(e) = handle_request(km, &mut state, &send_to_caps_oracle).await {
&our_node,
km,
open_kvs,
txs,
&send_to_loop,
&send_to_caps_oracle,
&kv_path,
)
.await
{
Printout::new(1, format!("kv: {e}")) Printout::new(1, format!("kv: {e}"))
.send(&send_to_terminal) .send(&state.send_to_terminal)
.await; .await;
KernelMessage::builder() KernelMessage::builder()
.id(km_id) .id(km_id)
.source((our_node.as_str(), KV_PROCESS_ID.clone())) .source(state.our.as_ref().clone())
.target(km_rsvp) .target(km_rsvp)
.message(Message::Response(( .message(Message::Response((
Response { Response {
@ -98,7 +173,7 @@ pub async fn kv(
))) )))
.build() .build()
.unwrap() .unwrap()
.send(&send_to_loop) .send(&state.send_to_loop)
.await; .await;
} }
} }
@ -108,13 +183,9 @@ pub async fn kv(
} }
async fn handle_request( async fn handle_request(
our_node: &str,
km: KernelMessage, km: KernelMessage,
open_kvs: Arc<DashMap<(PackageId, String), OptimisticTransactionDB>>, state: &mut KvState,
txs: Arc<DashMap<u64, Vec<(KvAction, Option<Vec<u8>>)>>>,
send_to_loop: &MessageSender,
send_to_caps_oracle: &CapMessageSender, send_to_caps_oracle: &CapMessageSender,
kv_path: &str,
) -> Result<(), KvError> { ) -> Result<(), KvError> {
let KernelMessage { let KernelMessage {
id, id,
@ -145,14 +216,11 @@ async fn handle_request(
} }
}; };
check_caps( check_caps(&source, state, send_to_caps_oracle, &request).await?;
our_node,
&source, // always open to ensure db exists
&open_kvs, state
send_to_caps_oracle, .open_db(request.package_id.clone(), request.db.clone())
&request,
kv_path,
)
.await?; .await?;
let (body, bytes) = match &request.action { let (body, bytes) = match &request.action {
@ -165,7 +233,7 @@ async fn handle_request(
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None) (serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
} }
KvAction::Get { key } => { KvAction::Get { key } => {
let db = match open_kvs.get(&(request.package_id, request.db)) { let db = match state.open_kvs.get(&(request.package_id, request.db)) {
None => { None => {
return Err(KvError::NoDb); return Err(KvError::NoDb);
} }
@ -190,14 +258,14 @@ async fn handle_request(
} }
KvAction::BeginTx => { KvAction::BeginTx => {
let tx_id = rand::random::<u64>(); let tx_id = rand::random::<u64>();
txs.insert(tx_id, Vec::new()); state.txs.insert(tx_id, Vec::new());
( (
serde_json::to_vec(&KvResponse::BeginTx { tx_id }).unwrap(), serde_json::to_vec(&KvResponse::BeginTx { tx_id }).unwrap(),
None, None,
) )
} }
KvAction::Set { key, tx_id } => { KvAction::Set { key, tx_id } => {
let db = match open_kvs.get(&(request.package_id, request.db)) { let db = match state.open_kvs.get(&(request.package_id, request.db)) {
None => { None => {
return Err(KvError::NoDb); return Err(KvError::NoDb);
} }
@ -214,7 +282,7 @@ async fn handle_request(
db.put(key, blob.bytes).map_err(rocks_to_kv_err)?; db.put(key, blob.bytes).map_err(rocks_to_kv_err)?;
} }
Some(tx_id) => { Some(tx_id) => {
let mut tx = match txs.get_mut(tx_id) { let mut tx = match state.txs.get_mut(tx_id) {
None => { None => {
return Err(KvError::NoTx); return Err(KvError::NoTx);
} }
@ -227,7 +295,7 @@ async fn handle_request(
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None) (serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
} }
KvAction::Delete { key, tx_id } => { KvAction::Delete { key, tx_id } => {
let db = match open_kvs.get(&(request.package_id, request.db)) { let db = match state.open_kvs.get(&(request.package_id, request.db)) {
None => { None => {
return Err(KvError::NoDb); return Err(KvError::NoDb);
} }
@ -238,7 +306,7 @@ async fn handle_request(
db.delete(key).map_err(rocks_to_kv_err)?; db.delete(key).map_err(rocks_to_kv_err)?;
} }
Some(tx_id) => { Some(tx_id) => {
let mut tx = match txs.get_mut(tx_id) { let mut tx = match state.txs.get_mut(tx_id) {
None => { None => {
return Err(KvError::NoTx); return Err(KvError::NoTx);
} }
@ -250,14 +318,14 @@ async fn handle_request(
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None) (serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
} }
KvAction::Commit { tx_id } => { KvAction::Commit { tx_id } => {
let db = match open_kvs.get(&(request.package_id, request.db)) { let db = match state.open_kvs.get(&(request.package_id, request.db)) {
None => { None => {
return Err(KvError::NoDb); return Err(KvError::NoDb);
} }
Some(db) => db, Some(db) => db,
}; };
let txs = match txs.remove(tx_id).map(|(_, tx)| tx) { let txs = match state.txs.remove(tx_id).map(|(_, tx)| tx) {
None => { None => {
return Err(KvError::NoTx); return Err(KvError::NoTx);
} }
@ -291,7 +359,7 @@ async fn handle_request(
} }
KvAction::Backup => { KvAction::Backup => {
// looping through open dbs and flushing their memtables // looping through open dbs and flushing their memtables
for db_ref in open_kvs.iter() { for db_ref in state.open_kvs.iter() {
let db = db_ref.value(); let db = db_ref.value();
db.flush().map_err(rocks_to_kv_err)?; db.flush().map_err(rocks_to_kv_err)?;
} }
@ -302,7 +370,7 @@ async fn handle_request(
if let Some(target) = km.rsvp.or_else(|| expects_response.map(|_| source)) { if let Some(target) = km.rsvp.or_else(|| expects_response.map(|_| source)) {
KernelMessage::builder() KernelMessage::builder()
.id(id) .id(id)
.source((our_node, KV_PROCESS_ID.clone())) .source(state.our.as_ref().clone())
.target(target) .target(target)
.message(Message::Response(( .message(Message::Response((
Response { Response {
@ -319,7 +387,7 @@ async fn handle_request(
})) }))
.build() .build()
.unwrap() .unwrap()
.send(send_to_loop) .send(&state.send_to_loop)
.await; .await;
} }
@ -327,12 +395,10 @@ async fn handle_request(
} }
async fn check_caps( async fn check_caps(
our_node: &str,
source: &Address, source: &Address,
open_kvs: &Arc<DashMap<(PackageId, String), OptimisticTransactionDB>>, state: &mut KvState,
send_to_caps_oracle: &CapMessageSender, send_to_caps_oracle: &CapMessageSender,
request: &KvRequest, request: &KvRequest,
kv_path: &str,
) -> Result<(), KvError> { ) -> Result<(), KvError> {
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
let src_package_id = PackageId::new(source.process.package(), source.process.publisher()); let src_package_id = PackageId::new(source.process.package(), source.process.publisher());
@ -345,17 +411,14 @@ async fn check_caps(
send_to_caps_oracle send_to_caps_oracle
.send(CapMessage::Has { .send(CapMessage::Has {
on: source.process.clone(), on: source.process.clone(),
cap: Capability { cap: Capability::new(
issuer: Address { state.our.as_ref().clone(),
node: our_node.to_string(), serde_json::json!({
process: KV_PROCESS_ID.clone(),
},
params: serde_json::json!({
"kind": "write", "kind": "write",
"db": request.db.to_string(), "db": request.db.to_string(),
}) })
.to_string(), .to_string(),
}, ),
responder: send_cap_bool, responder: send_cap_bool,
}) })
.await?; .await?;
@ -371,17 +434,14 @@ async fn check_caps(
send_to_caps_oracle send_to_caps_oracle
.send(CapMessage::Has { .send(CapMessage::Has {
on: source.process.clone(), on: source.process.clone(),
cap: Capability { cap: Capability::new(
issuer: Address { state.our.as_ref().clone(),
node: our_node.to_string(), serde_json::json!({
process: KV_PROCESS_ID.clone(),
},
params: serde_json::json!({
"kind": "read", "kind": "read",
"db": request.db.to_string(), "db": request.db.to_string(),
}) })
.to_string(), .to_string(),
}, ),
responder: send_cap_bool, responder: send_cap_bool,
}) })
.await?; .await?;
@ -403,7 +463,7 @@ async fn check_caps(
add_capability( add_capability(
"read", "read",
&request.db.to_string(), &request.db.to_string(),
&our_node, &state.our,
&source, &source,
send_to_caps_oracle, send_to_caps_oracle,
) )
@ -411,22 +471,22 @@ async fn check_caps(
add_capability( add_capability(
"write", "write",
&request.db.to_string(), &request.db.to_string(),
&our_node, &state.our,
&source, &source,
send_to_caps_oracle, send_to_caps_oracle,
) )
.await?; .await?;
if open_kvs.contains_key(&(request.package_id.clone(), request.db.clone())) { if state
.open_kvs
.contains_key(&(request.package_id.clone(), request.db.clone()))
{
return Ok(()); return Ok(());
} }
let db_path = format!("{}/{}/{}", kv_path, request.package_id, request.db); state
fs::create_dir_all(&db_path).await?; .open_db(request.package_id.clone(), request.db.clone())
.await?;
let db = OptimisticTransactionDB::open_default(&db_path).map_err(rocks_to_kv_err)?;
open_kvs.insert((request.package_id.clone(), request.db.clone()), db);
Ok(()) Ok(())
} }
KvAction::RemoveDb { .. } => { KvAction::RemoveDb { .. } => {
@ -436,28 +496,57 @@ async fn check_caps(
}); });
} }
let db_path = format!("{}/{}/{}", kv_path, request.package_id, request.db); state
open_kvs.remove(&(request.package_id.clone(), request.db.clone())); .remove_db(request.package_id.clone(), request.db.clone())
.await;
fs::remove_dir_all(format!(
"{}/{}/{}",
state.kv_path, request.package_id, request.db
))
.await?;
fs::remove_dir_all(&db_path).await?;
Ok(()) Ok(())
} }
KvAction::Backup { .. } => Ok(()), KvAction::Backup { .. } => Ok(()),
} }
} }
async fn handle_fd_request(km: KernelMessage, state: &mut KvState) -> anyhow::Result<()> {
let Message::Request(Request { body, .. }) = km.message else {
return Err(anyhow::anyhow!("not a request"));
};
let request: FdManagerRequest = serde_json::from_slice(&body)?;
match request {
FdManagerRequest::FdsLimit(new_fds_limit) => {
state.fds_limit = new_fds_limit;
if state.open_kvs.len() as u64 >= state.fds_limit {
crate::fd_manager::send_fd_manager_hit_fds_limit(&state.our, &state.send_to_loop)
.await;
state
.remove_least_recently_used_dbs(state.open_kvs.len() as u64 - state.fds_limit)
.await;
}
}
_ => {
return Err(anyhow::anyhow!("non-Cull FdManagerRequest"));
}
}
Ok(())
}
async fn add_capability( async fn add_capability(
kind: &str, kind: &str,
db: &str, db: &str,
our_node: &str, our: &Address,
source: &Address, source: &Address,
send_to_caps_oracle: &CapMessageSender, send_to_caps_oracle: &CapMessageSender,
) -> Result<(), KvError> { ) -> Result<(), KvError> {
let cap = Capability { let cap = Capability {
issuer: Address { issuer: our.clone(),
node: our_node.to_string(),
process: KV_PROCESS_ID.clone(),
},
params: serde_json::json!({ "kind": kind, "db": db }).to_string(), params: serde_json::json!({ "kind": kind, "db": db }).to_string(),
}; };
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();

View File

@ -17,6 +17,7 @@ use tokio::sync::mpsc;
mod eth; mod eth;
#[cfg(feature = "simulation-mode")] #[cfg(feature = "simulation-mode")]
mod fakenet; mod fakenet;
pub mod fd_manager;
mod http; mod http;
mod kernel; mod kernel;
mod keygen; mod keygen;
@ -42,10 +43,15 @@ const VFS_CHANNEL_CAPACITY: usize = 1_000;
const CAP_CHANNEL_CAPACITY: usize = 1_000; const CAP_CHANNEL_CAPACITY: usize = 1_000;
const KV_CHANNEL_CAPACITY: usize = 1_000; const KV_CHANNEL_CAPACITY: usize = 1_000;
const SQLITE_CHANNEL_CAPACITY: usize = 1_000; const SQLITE_CHANNEL_CAPACITY: usize = 1_000;
const FD_MANAGER_CHANNEL_CAPACITY: usize = 1_000;
const VERSION: &str = env!("CARGO_PKG_VERSION"); const VERSION: &str = env!("CARGO_PKG_VERSION");
const WS_MIN_PORT: u16 = 9_000; const WS_MIN_PORT: u16 = 9_000;
const TCP_MIN_PORT: u16 = 10_000; const TCP_MIN_PORT: u16 = 10_000;
const MAX_PORT: u16 = 65_535; const MAX_PORT: u16 = 65_535;
const DEFAULT_MAX_PEERS: u64 = 32;
const DEFAULT_MAX_PASSTHROUGHS: u64 = 0;
/// default routers as a eth-provider fallback /// default routers as a eth-provider fallback
const DEFAULT_ETH_PROVIDERS: &str = include_str!("eth/default_providers_mainnet.json"); const DEFAULT_ETH_PROVIDERS: &str = include_str!("eth/default_providers_mainnet.json");
#[cfg(not(feature = "simulation-mode"))] #[cfg(not(feature = "simulation-mode"))]
@ -60,6 +66,10 @@ pub const MULTICALL_ADDRESS: &str = "0xcA11bde05977b3631167028862bE2a173976CA11"
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
println!(
"\nDOCKER_BUILD_IMAGE_VERSION: {}\n",
env!("DOCKER_BUILD_IMAGE_VERSION")
);
let app = build_command(); let app = build_command();
let matches = app.get_matches(); let matches = app.get_matches();
@ -175,6 +185,9 @@ async fn main() {
// vfs maintains metadata about files in fs for processes // vfs maintains metadata about files in fs for processes
let (vfs_message_sender, vfs_message_receiver): (MessageSender, MessageReceiver) = let (vfs_message_sender, vfs_message_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(VFS_CHANNEL_CAPACITY); mpsc::channel(VFS_CHANNEL_CAPACITY);
// fd_manager makes sure we don't overrun the `ulimit -n`: max number of file descriptors
let (fd_manager_sender, fd_manager_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(FD_MANAGER_CHANNEL_CAPACITY);
// terminal receives prints via this channel, all other modules send prints // terminal receives prints via this channel, all other modules send prints
let (print_sender, print_receiver): (PrintSender, PrintReceiver) = let (print_sender, print_receiver): (PrintSender, PrintReceiver) =
mpsc::channel(TERMINAL_CHANNEL_CAPACITY); mpsc::channel(TERMINAL_CHANNEL_CAPACITY);
@ -282,6 +295,12 @@ async fn main() {
None, None,
false, false,
), ),
(
ProcessId::new(Some("fd_manager"), "distro", "sys"),
fd_manager_sender,
None,
false,
),
]; ];
/* /*
@ -342,6 +361,12 @@ async fn main() {
print_sender.clone(), print_sender.clone(),
net_message_receiver, net_message_receiver,
*matches.get_one::<bool>("reveal-ip").unwrap_or(&true), *matches.get_one::<bool>("reveal-ip").unwrap_or(&true),
*matches
.get_one::<u64>("max-peers")
.unwrap_or(&DEFAULT_MAX_PEERS),
*matches
.get_one::<u64>("max-passthroughs")
.unwrap_or(&DEFAULT_MAX_PASSTHROUGHS),
)); ));
tasks.spawn(state::state_sender( tasks.spawn(state::state_sender(
our_name_arc.clone(), our_name_arc.clone(),
@ -351,6 +376,13 @@ async fn main() {
db, db,
home_directory_path.clone(), home_directory_path.clone(),
)); ));
tasks.spawn(fd_manager::fd_manager(
our_name_arc.clone(),
kernel_message_sender.clone(),
print_sender.clone(),
fd_manager_receiver,
matches.get_one::<u64>("soft-ulimit").copied(),
));
tasks.spawn(kv::kv( tasks.spawn(kv::kv(
our_name_arc.clone(), our_name_arc.clone(),
kernel_message_sender.clone(), kernel_message_sender.clone(),
@ -678,6 +710,18 @@ fn build_command() -> Command {
.arg( .arg(
arg!(--"number-log-files" <NUMBER_LOG_FILES> "Number of logs to rotate (default 4)") arg!(--"number-log-files" <NUMBER_LOG_FILES> "Number of logs to rotate (default 4)")
.value_parser(value_parser!(u64)), .value_parser(value_parser!(u64)),
)
.arg(
arg!(--"max-peers" <MAX_PEERS> "Maximum number of peers to hold active connections with (default 32)")
.value_parser(value_parser!(u32)),
)
.arg(
arg!(--"max-passthroughs" <MAX_PASSTHROUGHS> "Maximum number of passthroughs serve as a router (default 0)")
.value_parser(value_parser!(u32)),
)
.arg(
arg!(--"soft-ulimit" <SOFT_ULIMIT> "Enforce a static maximum number of file descriptors (default fetched from system)")
.value_parser(value_parser!(u64)),
); );
#[cfg(feature = "simulation-mode")] #[cfg(feature = "simulation-mode")]

View File

@ -7,23 +7,17 @@ use tokio::sync::mpsc;
/// if target is a peer, queue to be routed /// if target is a peer, queue to be routed
/// otherwise, create peer and initiate routing /// otherwise, create peer and initiate routing
pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, km: KernelMessage) { pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, km: KernelMessage) {
if let Some(peer) = data.peers.get_mut(&km.target.node) { if let Some(mut peer) = data.peers.get_mut(&km.target.node) {
peer.sender.send(km).expect("net: peer sender was dropped"); peer.sender.send(km).expect("net: peer sender was dropped");
peer.set_last_message();
} else { } else {
let Some(peer_id) = data.pki.get(&km.target.node) else { let Some(peer_id) = data.pki.get(&km.target.node) else {
return utils::error_offline(km, &ext.network_error_tx).await; return utils::error_offline(km, &ext.network_error_tx).await;
}; };
let (peer_tx, peer_rx) = mpsc::unbounded_channel(); let (mut peer, peer_rx) = Peer::new(peer_id.clone(), false);
// send message to be routed // send message to be routed
peer_tx.send(km).unwrap(); peer.send(km);
data.peers.insert( data.peers.insert(peer_id.name.clone(), peer).await;
peer_id.name.clone(),
Peer {
identity: peer_id.clone(),
routing_for: false,
sender: peer_tx.clone(),
},
);
tokio::spawn(connect_to_peer( tokio::spawn(connect_to_peer(
ext.clone(), ext.clone(),
data.clone(), data.clone(),
@ -157,7 +151,7 @@ pub async fn handle_failed_connection(
&format!("net: failed to connect to {}", peer_id.name), &format!("net: failed to connect to {}", peer_id.name),
) )
.await; .await;
drop(data.peers.remove(&peer_id.name)); data.peers.remove(&peer_id.name).await;
peer_rx.close(); peer_rx.close();
while let Some(km) = peer_rx.recv().await { while let Some(km) = peer_rx.recv().await {
utils::error_offline(km, &ext.network_error_tx).await; utils::error_offline(km, &ext.network_error_tx).await;

View File

@ -1,7 +1,7 @@
use crate::net::types::{IdentityExt, NetData, Peer}; use crate::net::types::{IdentityExt, NetData, Peer};
use crate::net::{connect, tcp, utils, ws}; use crate::net::{connect, tcp, utils, ws};
use lib::types::core::{Identity, NodeRouting}; use lib::types::core::{Identity, NodeRouting};
use tokio::{sync::mpsc, time}; use tokio::time;
pub async fn maintain_routers(ext: IdentityExt, data: NetData) -> anyhow::Result<()> { pub async fn maintain_routers(ext: IdentityExt, data: NetData) -> anyhow::Result<()> {
let NodeRouting::Routers(ref routers) = ext.our.routing else { let NodeRouting::Routers(ref routers) = ext.our.routing else {
@ -29,15 +29,8 @@ pub async fn connect_to_router(router_id: &Identity, ext: &IdentityExt, data: &N
&format!("net: attempting to connect to router {}", router_id.name), &format!("net: attempting to connect to router {}", router_id.name),
) )
.await; .await;
let (peer_tx, peer_rx) = mpsc::unbounded_channel(); let (peer, peer_rx) = Peer::new(router_id.clone(), false);
data.peers.insert( data.peers.insert(router_id.name.clone(), peer).await;
router_id.name.clone(),
Peer {
identity: router_id.clone(),
routing_for: false,
sender: peer_tx.clone(),
},
);
if let Some((_ip, port)) = router_id.tcp_routing() { if let Some((_ip, port)) = router_id.tcp_routing() {
match tcp::init_direct(ext, data, &router_id, *port, true, peer_rx).await { match tcp::init_direct(ext, data, &router_id, *port, true, peer_rx).await {
Ok(()) => { Ok(()) => {

View File

@ -1,9 +1,13 @@
use lib::types::core::{ use lib::{
core::Address,
types::core::{
Identity, KernelMessage, MessageReceiver, MessageSender, NetAction, NetResponse, Identity, KernelMessage, MessageReceiver, MessageSender, NetAction, NetResponse,
NetworkErrorSender, NodeRouting, PrintSender, NetworkErrorSender, NodeRouting, PrintSender, NET_PROCESS_ID,
},
}; };
use types::{ use types::{
IdentityExt, NetData, OnchainPKI, Peers, PendingPassthroughs, TCP_PROTOCOL, WS_PROTOCOL, ActivePassthroughs, IdentityExt, NetData, OnchainPKI, Peers, PendingPassthroughs, TCP_PROTOCOL,
WS_PROTOCOL,
}; };
use {dashmap::DashMap, ring::signature::Ed25519KeyPair, std::sync::Arc, tokio::task::JoinSet}; use {dashmap::DashMap, ring::signature::Ed25519KeyPair, std::sync::Arc, tokio::task::JoinSet};
@ -31,8 +35,18 @@ pub async fn networking(
network_error_tx: NetworkErrorSender, network_error_tx: NetworkErrorSender,
print_tx: PrintSender, print_tx: PrintSender,
kernel_message_rx: MessageReceiver, kernel_message_rx: MessageReceiver,
_reveal_ip: bool, // only used if indirect // only used if indirect -- TODO use
_reveal_ip: bool,
max_peers: u64,
// only used by routers
max_passthroughs: u64,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
crate::fd_manager::send_fd_manager_request_fds_limit(
&Address::new(&our.name, NET_PROCESS_ID.clone()),
&kernel_message_tx,
)
.await;
let ext = IdentityExt { let ext = IdentityExt {
our: Arc::new(our), our: Arc::new(our),
our_ip: Arc::new(our_ip), our_ip: Arc::new(our_ip),
@ -45,14 +59,19 @@ pub async fn networking(
// start by initializing the structs where we'll store PKI in memory // start by initializing the structs where we'll store PKI in memory
// and store a mapping of peers we have an active route for // and store a mapping of peers we have an active route for
let pki: OnchainPKI = Arc::new(DashMap::new()); let pki: OnchainPKI = Arc::new(DashMap::new());
let peers: Peers = Arc::new(DashMap::new()); let peers: Peers = Peers::new(max_peers, ext.kernel_message_tx.clone());
// only used by routers // only used by routers
let pending_passthroughs: PendingPassthroughs = Arc::new(DashMap::new()); let pending_passthroughs: PendingPassthroughs = Arc::new(DashMap::new());
let active_passthroughs: ActivePassthroughs = Arc::new(DashMap::new());
let net_data = NetData { let net_data = NetData {
pki, pki,
peers, peers,
pending_passthroughs, pending_passthroughs,
active_passthroughs,
max_peers,
max_passthroughs,
fds_limit: 10, // small hardcoded limit that gets replaced by fd_manager soon after boot
}; };
let mut tasks = JoinSet::<anyhow::Result<()>>::new(); let mut tasks = JoinSet::<anyhow::Result<()>>::new();
@ -107,12 +126,12 @@ pub async fn networking(
async fn local_recv( async fn local_recv(
ext: IdentityExt, ext: IdentityExt,
mut kernel_message_rx: MessageReceiver, mut kernel_message_rx: MessageReceiver,
data: NetData, mut data: NetData,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
while let Some(km) = kernel_message_rx.recv().await { while let Some(km) = kernel_message_rx.recv().await {
if km.target.node == ext.our.name { if km.target.node == ext.our.name {
// handle messages sent to us // handle messages sent to us
handle_message(&ext, km, &data).await; handle_message(&ext, km, &mut data).await;
} else { } else {
connect::send_to_peer(&ext, &data, km).await; connect::send_to_peer(&ext, &data, km).await;
} }
@ -120,7 +139,7 @@ async fn local_recv(
Err(anyhow::anyhow!("net: kernel message channel was dropped")) Err(anyhow::anyhow!("net: kernel message channel was dropped"))
} }
async fn handle_message(ext: &IdentityExt, km: KernelMessage, data: &NetData) { async fn handle_message(ext: &IdentityExt, km: KernelMessage, data: &mut NetData) {
match &km.message { match &km.message {
lib::core::Message::Request(request) => handle_request(ext, &km, &request.body, data).await, lib::core::Message::Request(request) => handle_request(ext, &km, &request.body, data).await,
lib::core::Message::Response((response, _context)) => { lib::core::Message::Response((response, _context)) => {
@ -133,7 +152,7 @@ async fn handle_request(
ext: &IdentityExt, ext: &IdentityExt,
km: &KernelMessage, km: &KernelMessage,
request_body: &[u8], request_body: &[u8],
data: &NetData, data: &mut NetData,
) { ) {
if km.source.node == ext.our.name { if km.source.node == ext.our.name {
handle_local_request(ext, km, request_body, data).await; handle_local_request(ext, km, request_body, data).await;
@ -149,11 +168,12 @@ async fn handle_local_request(
ext: &IdentityExt, ext: &IdentityExt,
km: &KernelMessage, km: &KernelMessage,
request_body: &[u8], request_body: &[u8],
data: &NetData, data: &mut NetData,
) { ) {
match rmp_serde::from_slice::<NetAction>(request_body) { match rmp_serde::from_slice::<NetAction>(request_body) {
Err(_e) => { Err(_e) => {
// ignore // only other possible message is from fd_manager -- handle here
handle_fdman(km, request_body, data).await;
} }
Ok(NetAction::ConnectionRequest(_)) => { Ok(NetAction::ConnectionRequest(_)) => {
// we shouldn't get these locally, ignore // we shouldn't get these locally, ignore
@ -171,6 +191,7 @@ async fn handle_local_request(
NetAction::GetPeers => ( NetAction::GetPeers => (
NetResponse::Peers( NetResponse::Peers(
data.peers data.peers
.peers()
.iter() .iter()
.map(|p| p.identity.clone()) .map(|p| p.identity.clone())
.collect::<Vec<Identity>>(), .collect::<Vec<Identity>>(),
@ -189,19 +210,31 @@ async fn handle_local_request(
)); ));
printout.push_str(&format!("our Identity: {:#?}\r\n", ext.our)); printout.push_str(&format!("our Identity: {:#?}\r\n", ext.our));
printout.push_str(&format!( printout.push_str(&format!(
"we have connections with {} peers:\r\n", "we have connections with {} peers ({} max):\r\n",
data.peers.len() data.peers.peers().len(),
data.max_peers,
)); ));
for peer in data.peers.iter() {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
for peer in data.peers.peers().iter() {
printout.push_str(&format!( printout.push_str(&format!(
" {}, routing_for={}\r\n", " {},{} last message {}s ago\r\n",
peer.identity.name, peer.routing_for, peer.identity.name,
if peer.routing_for { " (routing)" } else { "" },
now.saturating_sub(peer.last_message)
)); ));
} }
if data.max_passthroughs > 0 {
printout.push_str(&format!( printout.push_str(&format!(
"we have {} entries in the PKI\r\n", "we allow {} max passthroughs\r\n",
data.pki.len() data.max_passthroughs
)); ));
}
if !data.pending_passthroughs.is_empty() { if !data.pending_passthroughs.is_empty() {
printout.push_str(&format!( printout.push_str(&format!(
"we have {} pending passthroughs:\r\n", "we have {} pending passthroughs:\r\n",
@ -212,6 +245,21 @@ async fn handle_local_request(
} }
} }
if !data.active_passthroughs.is_empty() {
printout.push_str(&format!(
"we have {} active passthroughs:\r\n",
data.active_passthroughs.len()
));
for p in data.active_passthroughs.iter() {
printout.push_str(&format!(" {} -> {}\r\n", p.key().0, p.key().1));
}
}
printout.push_str(&format!(
"we have {} entries in the PKI\r\n",
data.pki.len()
));
(NetResponse::Diagnostics(printout), None) (NetResponse::Diagnostics(printout), None)
} }
NetAction::Sign => ( NetAction::Sign => (
@ -284,6 +332,33 @@ async fn handle_local_request(
} }
} }
async fn handle_fdman(km: &KernelMessage, request_body: &[u8], data: &mut NetData) {
if km.source.process != *lib::core::FD_MANAGER_PROCESS_ID {
return;
}
let Ok(req) = serde_json::from_slice::<lib::core::FdManagerRequest>(request_body) else {
return;
};
match req {
lib::core::FdManagerRequest::FdsLimit(fds_limit) => {
data.fds_limit = fds_limit;
if data.max_peers > fds_limit {
data.max_peers = fds_limit;
}
// TODO combine with max_peers check
if data.max_passthroughs > fds_limit {
data.max_passthroughs = fds_limit;
}
// TODO cull passthroughs too
if data.peers.peers().len() >= data.fds_limit as usize {
let diff = data.peers.peers().len() - data.fds_limit as usize;
data.peers.cull(diff).await;
}
}
_ => return,
}
}
async fn handle_remote_request( async fn handle_remote_request(
ext: &IdentityExt, ext: &IdentityExt,
km: &KernelMessage, km: &KernelMessage,

View File

@ -170,15 +170,7 @@ async fn recv_connection(
if len != 32 { if len != 32 {
let (from_id, target_id) = let (from_id, target_id) =
validate_routing_request(&ext.our.name, &first_message, &data.pki)?; validate_routing_request(&ext.our.name, &first_message, &data.pki)?;
return create_passthrough( return create_passthrough(&ext, from_id, target_id, &data, PendingStream::Tcp(stream))
&ext.our,
&ext.our_ip,
from_id,
target_id,
&data.peers,
&data.pending_passthroughs,
PendingStream::Tcp(stream),
)
.await; .await;
} }
@ -215,15 +207,9 @@ async fn recv_connection(
&their_id, &their_id,
)?; )?;
let (peer_tx, peer_rx) = mpsc::unbounded_channel(); let (peer, peer_rx) = Peer::new(their_id.clone(), their_handshake.proxy_request);
data.peers.insert( data.peers.insert(their_id.name.clone(), peer).await;
their_id.name.clone(),
Peer {
identity: their_id.clone(),
routing_for: their_handshake.proxy_request,
sender: peer_tx,
},
);
tokio::spawn(utils::maintain_connection( tokio::spawn(utils::maintain_connection(
their_handshake.name, their_handshake.name,
data.peers, data.peers,
@ -336,15 +322,8 @@ pub async fn recv_via_router(
}; };
match connect_with_handshake_via_router(&ext, &peer_id, &router_id, stream).await { match connect_with_handshake_via_router(&ext, &peer_id, &router_id, stream).await {
Ok(connection) => { Ok(connection) => {
let (peer_tx, peer_rx) = mpsc::unbounded_channel(); let (peer, peer_rx) = Peer::new(peer_id.clone(), false);
data.peers.insert( data.peers.insert(peer_id.name.clone(), peer).await;
peer_id.name.clone(),
Peer {
identity: peer_id.clone(),
routing_for: false,
sender: peer_tx,
},
);
// maintain direct connection // maintain direct connection
tokio::spawn(utils::maintain_connection( tokio::spawn(utils::maintain_connection(
peer_id.name, peer_id.name,

View File

@ -1,7 +1,7 @@
use crate::net::{ use crate::net::{
tcp::PeerConnection, tcp::PeerConnection,
types::{HandshakePayload, IdentityExt, Peers}, types::{HandshakePayload, IdentityExt, Peers},
utils::{print_debug, print_loud, MESSAGE_MAX_SIZE}, utils::{print_debug, print_loud, IDLE_TIMEOUT, MESSAGE_MAX_SIZE},
}; };
use lib::types::core::{KernelMessage, MessageSender, NodeId, PrintSender}; use lib::types::core::{KernelMessage, MessageSender, NodeId, PrintSender};
use { use {
@ -82,13 +82,18 @@ pub async fn maintain_connection(
} }
}; };
let timeout = tokio::time::sleep(IDLE_TIMEOUT);
tokio::select! { tokio::select! {
_ = write => (), _ = write => (),
_ = read => (), _ = read => (),
_ = timeout => {
print_debug(&print_tx, &format!("net: closing idle connection with {peer_name}")).await;
}
} }
print_debug(&print_tx, &format!("net: connection lost with {peer_name}")).await; print_debug(&print_tx, &format!("net: connection lost with {peer_name}")).await;
peers.remove(&peer_name); peers.remove(&peer_name).await;
} }
async fn send_protocol_message( async fn send_protocol_message(

View File

@ -1,5 +1,6 @@
use lib::types::core::{ use lib::types::core::{
Identity, KernelMessage, MessageSender, NetworkErrorSender, NodeId, PrintSender, Address, Identity, KernelMessage, MessageSender, NetworkErrorSender, NodeId, PrintSender,
NET_PROCESS_ID,
}; };
use { use {
dashmap::DashMap, dashmap::DashMap,
@ -7,7 +8,7 @@ use {
serde::{Deserialize, Serialize}, serde::{Deserialize, Serialize},
std::sync::Arc, std::sync::Arc,
tokio::net::TcpStream, tokio::net::TcpStream,
tokio::sync::mpsc::UnboundedSender, tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender},
tokio_tungstenite::{MaybeTlsStream, WebSocketStream}, tokio_tungstenite::{MaybeTlsStream, WebSocketStream},
}; };
@ -54,16 +55,99 @@ pub struct RoutingRequest {
pub target: NodeId, pub target: NodeId,
} }
pub type Peers = Arc<DashMap<String, Peer>>; #[derive(Clone)]
pub struct Peers {
max_peers: u64,
send_to_loop: MessageSender,
peers: Arc<DashMap<String, Peer>>,
}
impl Peers {
pub fn new(max_peers: u64, send_to_loop: MessageSender) -> Self {
Self {
max_peers,
send_to_loop,
peers: Arc::new(DashMap::new()),
}
}
pub fn peers(&self) -> &DashMap<String, Peer> {
&self.peers
}
pub fn get(&self, name: &str) -> Option<dashmap::mapref::one::Ref<'_, String, Peer>> {
self.peers.get(name)
}
pub fn get_mut(
&self,
name: &str,
) -> std::option::Option<dashmap::mapref::one::RefMut<'_, String, Peer>> {
self.peers.get_mut(name)
}
pub fn contains_key(&self, name: &str) -> bool {
self.peers.contains_key(name)
}
/// when a peer is inserted, if the total number of peers exceeds the limit,
/// remove the one with the oldest last_message.
pub async fn insert(&self, name: String, peer: Peer) {
self.peers.insert(name, peer);
if self.peers.len() > self.max_peers as usize {
let oldest = self
.peers
.iter()
.min_by_key(|p| p.last_message)
.unwrap()
.key()
.clone();
self.peers.remove(&oldest);
crate::fd_manager::send_fd_manager_hit_fds_limit(
&Address::new("our", NET_PROCESS_ID.clone()),
&self.send_to_loop,
)
.await;
}
}
pub async fn remove(&self, name: &str) -> Option<(String, Peer)> {
self.peers.remove(name)
}
/// close the n oldest connections
pub async fn cull(&self, n: usize) {
let mut to_remove = Vec::with_capacity(n);
let mut sorted_peers: Vec<_> = self.peers.iter().collect();
sorted_peers.sort_by_key(|p| p.last_message);
to_remove.extend(sorted_peers.iter().take(n));
for peer in to_remove {
self.peers.remove(&peer.identity.name);
}
crate::fd_manager::send_fd_manager_hit_fds_limit(
&Address::new("our", NET_PROCESS_ID.clone()),
&self.send_to_loop,
)
.await;
}
}
pub type OnchainPKI = Arc<DashMap<String, Identity>>; pub type OnchainPKI = Arc<DashMap<String, Identity>>;
/// (from, target) -> from's socket /// (from, target) -> from's socket
pub type PendingPassthroughs = Arc<DashMap<(NodeId, NodeId), PendingStream>>; ///
/// only used by routers
pub type PendingPassthroughs = Arc<DashMap<(NodeId, NodeId), (PendingStream, u64)>>;
pub enum PendingStream { pub enum PendingStream {
WebSocket(WebSocketStream<MaybeTlsStream<TcpStream>>), WebSocket(WebSocketStream<MaybeTlsStream<TcpStream>>),
Tcp(TcpStream), Tcp(TcpStream),
} }
/// (from, target)
///
/// only used by routers
pub type ActivePassthroughs = Arc<DashMap<(NodeId, NodeId), (u64, KillSender)>>;
impl PendingStream { impl PendingStream {
pub fn is_ws(&self) -> bool { pub fn is_ws(&self) -> bool {
matches!(self, PendingStream::WebSocket(_)) matches!(self, PendingStream::WebSocket(_))
@ -73,15 +157,51 @@ impl PendingStream {
} }
} }
#[derive(Clone)] type KillSender = tokio::sync::mpsc::Sender<()>;
pub struct Peer { pub struct Peer {
pub identity: Identity, pub identity: Identity,
/// If true, we are routing for them and have a RoutingClientConnection /// If true, we are routing for them and have a RoutingClientConnection
/// associated with them. We can send them prompts to establish Passthroughs. /// associated with them. We can send them prompts to establish Passthroughs.
pub routing_for: bool, pub routing_for: bool,
pub sender: UnboundedSender<KernelMessage>, pub sender: UnboundedSender<KernelMessage>,
/// unix timestamp of last message sent *or* received
pub last_message: u64,
} }
impl Peer {
/// Create a new Peer.
/// If `routing_for` is true, we are routing for them.
pub fn new(identity: Identity, routing_for: bool) -> (Self, UnboundedReceiver<KernelMessage>) {
let (peer_tx, peer_rx) = tokio::sync::mpsc::unbounded_channel();
(
Self {
identity,
routing_for,
sender: peer_tx,
last_message: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
},
peer_rx,
)
}
/// Send a message to the peer.
pub fn send(&mut self, km: KernelMessage) {
self.sender.send(km).expect("net: peer sender was dropped");
self.set_last_message();
}
/// Update the last message time to now.
pub fn set_last_message(&mut self) {
self.last_message = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
}
}
/// [`Identity`], with additional fields for networking. /// [`Identity`], with additional fields for networking.
#[derive(Clone)] #[derive(Clone)]
pub struct IdentityExt { pub struct IdentityExt {
@ -98,5 +218,11 @@ pub struct IdentityExt {
pub struct NetData { pub struct NetData {
pub pki: OnchainPKI, pub pki: OnchainPKI,
pub peers: Peers, pub peers: Peers,
/// only used by routers
pub pending_passthroughs: PendingPassthroughs, pub pending_passthroughs: PendingPassthroughs,
/// only used by routers
pub active_passthroughs: ActivePassthroughs,
pub max_peers: u64,
pub max_passthroughs: u64,
pub fds_limit: u64,
} }

View File

@ -1,10 +1,10 @@
use crate::net::types::{ use crate::net::types::{
HandshakePayload, OnchainPKI, Peers, PendingPassthroughs, PendingStream, RoutingRequest, ActivePassthroughs, HandshakePayload, IdentityExt, NetData, OnchainPKI, PendingStream,
TCP_PROTOCOL, WS_PROTOCOL, RoutingRequest, TCP_PROTOCOL, WS_PROTOCOL,
}; };
use lib::types::core::{ use lib::types::core::{
Identity, KernelMessage, KnsUpdate, Message, MessageSender, NetAction, NetworkErrorSender, Identity, KernelMessage, KnsUpdate, Message, MessageSender, NetAction, NetworkErrorSender,
NodeRouting, PrintSender, Printout, Request, Response, SendError, SendErrorKind, NodeId, NodeRouting, PrintSender, Printout, Request, Response, SendError, SendErrorKind,
WrappedSendError, WrappedSendError,
}; };
use { use {
@ -27,27 +27,82 @@ pub const MESSAGE_MAX_SIZE: u32 = 10_485_800;
pub const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); pub const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
/// 30 minute idle timeout for connections
pub const IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1800);
pub async fn create_passthrough( pub async fn create_passthrough(
our: &Identity, ext: &IdentityExt,
our_ip: &str,
from_id: Identity, from_id: Identity,
target_id: Identity, target_id: Identity,
peers: &Peers, data: &NetData,
pending_passthroughs: &PendingPassthroughs,
socket_1: PendingStream, socket_1: PendingStream,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// if we already are at the max number of passthroughs, reject
if data.max_passthroughs == 0 {
return Err(anyhow::anyhow!(
"passthrough denied: this node has disallowed passthroughs. Start node with `--max-passthroughs <VAL>` to allow passthroughs"
));
}
// remove pending before checking bound because otherwise we stop
// ourselves from matching pending if this connection will be
// the max_passthroughs passthrough
let maybe_pending = data
.pending_passthroughs
.remove(&(target_id.name.clone(), from_id.name.clone()));
if data.active_passthroughs.len() + data.pending_passthroughs.len()
>= data.max_passthroughs as usize
{
let oldest_active = data.active_passthroughs.iter().min_by_key(|p| p.0);
let (oldest_active_key, oldest_active_time, oldest_active_kill_sender) = match oldest_active
{
None => (None, get_now(), None),
Some(oldest_active) => {
let (oldest_active_key, oldest_active_val) = oldest_active.pair();
let oldest_active_key = oldest_active_key.clone();
let (oldest_active_time, oldest_active_kill_sender) = oldest_active_val.clone();
(
Some(oldest_active_key),
oldest_active_time,
Some(oldest_active_kill_sender),
)
}
};
let oldest_pending = data.pending_passthroughs.iter().min_by_key(|p| p.1);
let (oldest_pending_key, oldest_pending_time) = match oldest_pending {
None => (None, get_now()),
Some(oldest_pending) => {
let (oldest_pending_key, oldest_pending_val) = oldest_pending.pair();
let oldest_pending_key = oldest_pending_key.clone();
let (_, oldest_pending_time) = oldest_pending_val;
(Some(oldest_pending_key), oldest_pending_time.clone())
}
};
if oldest_active_time < oldest_pending_time {
// active key is oldest
oldest_active_kill_sender.unwrap().send(()).await.unwrap();
data.active_passthroughs.remove(&oldest_active_key.unwrap());
} else {
// pending key is oldest
data.pending_passthroughs
.remove(&oldest_pending_key.unwrap());
}
}
// if the target has already generated a pending passthrough for this source, // if the target has already generated a pending passthrough for this source,
// immediately match them // immediately match them
if let Some(((_target, _from), pending_stream)) = if let Some(((from, target), (pending_stream, _))) = maybe_pending {
pending_passthroughs.remove(&(target_id.name.clone(), from_id.name.clone())) tokio::spawn(maintain_passthrough(
{ from,
tokio::spawn(maintain_passthrough(socket_1, pending_stream)); target,
socket_1,
pending_stream,
data.active_passthroughs.clone(),
));
return Ok(()); return Ok(());
} }
if socket_1.is_tcp() { if socket_1.is_tcp() {
if let Some((ip, tcp_port)) = target_id.tcp_routing() { if let Some((ip, tcp_port)) = target_id.tcp_routing() {
// create passthrough to direct node over tcp // create passthrough to direct node over tcp
let tcp_url = make_conn_url(our_ip, ip, tcp_port, TCP_PROTOCOL)?; let tcp_url = make_conn_url(&ext.our_ip, ip, tcp_port, TCP_PROTOCOL)?;
let Ok(Ok(stream_2)) = let Ok(Ok(stream_2)) =
time::timeout(TIMEOUT, tokio::net::TcpStream::connect(tcp_url.to_string())).await time::timeout(TIMEOUT, tokio::net::TcpStream::connect(tcp_url.to_string())).await
else { else {
@ -57,13 +112,19 @@ pub async fn create_passthrough(
from_id.name from_id.name
)); ));
}; };
tokio::spawn(maintain_passthrough(socket_1, PendingStream::Tcp(stream_2))); tokio::spawn(maintain_passthrough(
from_id.name,
target_id.name,
socket_1,
PendingStream::Tcp(stream_2),
data.active_passthroughs.clone(),
));
return Ok(()); return Ok(());
} }
} else if socket_1.is_ws() { } else if socket_1.is_ws() {
if let Some((ip, ws_port)) = target_id.ws_routing() { if let Some((ip, ws_port)) = target_id.ws_routing() {
// create passthrough to direct node over websocket // create passthrough to direct node over websocket
let ws_url = make_conn_url(our_ip, ip, ws_port, WS_PROTOCOL)?; let ws_url = make_conn_url(&ext.our_ip, ip, ws_port, WS_PROTOCOL)?;
let Ok(Ok((socket_2, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await let Ok(Ok((socket_2, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await
else { else {
return Err(anyhow::anyhow!( return Err(anyhow::anyhow!(
@ -73,14 +134,17 @@ pub async fn create_passthrough(
)); ));
}; };
tokio::spawn(maintain_passthrough( tokio::spawn(maintain_passthrough(
from_id.name,
target_id.name,
socket_1, socket_1,
PendingStream::WebSocket(socket_2), PendingStream::WebSocket(socket_2),
data.active_passthroughs.clone(),
)); ));
return Ok(()); return Ok(());
} }
} }
// create passthrough to indirect node that we do routing for // create passthrough to indirect node that we do routing for
let target_peer = peers.get(&target_id.name).ok_or(anyhow::anyhow!( let target_peer = data.peers.get(&target_id.name).ok_or(anyhow::anyhow!(
"can't route to {}, not a peer, for passthrough requested by {}", "can't route to {}, not a peer, for passthrough requested by {}",
target_id.name, target_id.name,
from_id.name from_id.name
@ -97,7 +161,7 @@ pub async fn create_passthrough(
target_peer.sender.send( target_peer.sender.send(
KernelMessage::builder() KernelMessage::builder()
.id(rand::random()) .id(rand::random())
.source((our.name.as_str(), "net", "distro", "sys")) .source((ext.our.name.as_str(), "net", "distro", "sys"))
.target((target_id.name.as_str(), "net", "distro", "sys")) .target((target_id.name.as_str(), "net", "distro", "sys"))
.message(Message::Request(Request { .message(Message::Request(Request {
inherit: false, inherit: false,
@ -113,12 +177,23 @@ pub async fn create_passthrough(
// or if the target node connects to us with a matching passthrough. // or if the target node connects to us with a matching passthrough.
// TODO it is currently possible to have dangling passthroughs in the map // TODO it is currently possible to have dangling passthroughs in the map
// if the target is "connected" to us but nonresponsive. // if the target is "connected" to us but nonresponsive.
pending_passthroughs.insert((from_id.name, target_id.name), socket_1); let now = get_now();
data.pending_passthroughs
.insert((from_id.name, target_id.name), (socket_1, now));
Ok(()) Ok(())
} }
/// cross the streams -- spawn on own task /// cross the streams -- spawn on own task
pub async fn maintain_passthrough(socket_1: PendingStream, socket_2: PendingStream) { pub async fn maintain_passthrough(
from: NodeId,
target: NodeId,
socket_1: PendingStream,
socket_2: PendingStream,
active_passthroughs: ActivePassthroughs,
) {
let now = get_now();
let (kill_sender, mut kill_receiver) = tokio::sync::mpsc::channel(1);
active_passthroughs.insert((from.clone(), target.clone()), (now, kill_sender));
match (socket_1, socket_2) { match (socket_1, socket_2) {
(PendingStream::Tcp(socket_1), PendingStream::Tcp(socket_2)) => { (PendingStream::Tcp(socket_1), PendingStream::Tcp(socket_2)) => {
// do not use bidirectional because if one side closes, // do not use bidirectional because if one side closes,
@ -129,6 +204,7 @@ pub async fn maintain_passthrough(socket_1: PendingStream, socket_2: PendingStre
tokio::select! { tokio::select! {
_ = copy(&mut r1, &mut w2) => {}, _ = copy(&mut r1, &mut w2) => {},
_ = copy(&mut r2, &mut w1) => {}, _ = copy(&mut r2, &mut w1) => {},
_ = kill_receiver.recv() => {},
} }
} }
(PendingStream::WebSocket(mut socket_1), PendingStream::WebSocket(mut socket_2)) => { (PendingStream::WebSocket(mut socket_1), PendingStream::WebSocket(mut socket_2)) => {
@ -163,6 +239,7 @@ pub async fn maintain_passthrough(socket_1: PendingStream, socket_2: PendingStre
break break
} }
} }
_ = kill_receiver.recv() => break,
} }
} }
let _ = socket_1.close(None).await; let _ = socket_1.close(None).await;
@ -170,9 +247,9 @@ pub async fn maintain_passthrough(socket_1: PendingStream, socket_2: PendingStre
} }
_ => { _ => {
// these foolish combinations must never occur // these foolish combinations must never occur
return;
} }
} }
active_passthroughs.remove(&(from, target));
} }
pub fn ingest_log(log: KnsUpdate, pki: &OnchainPKI) { pub fn ingest_log(log: KnsUpdate, pki: &OnchainPKI) {
@ -359,3 +436,11 @@ pub async fn print_loud(print_tx: &PrintSender, content: &str) {
pub async fn print_debug(print_tx: &PrintSender, content: &str) { pub async fn print_debug(print_tx: &PrintSender, content: &str) {
Printout::new(2, content).send(print_tx).await; Printout::new(2, content).send(print_tx).await;
} }
pub fn get_now() -> u64 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
now
}

View File

@ -187,15 +187,8 @@ pub async fn recv_via_router(
}; };
match connect_with_handshake_via_router(&ext, &peer_id, &router_id, socket).await { match connect_with_handshake_via_router(&ext, &peer_id, &router_id, socket).await {
Ok(connection) => { Ok(connection) => {
let (peer_tx, peer_rx) = mpsc::unbounded_channel(); let (peer, peer_rx) = Peer::new(peer_id.clone(), false);
data.peers.insert( data.peers.insert(peer_id.name.clone(), peer).await;
peer_id.name.clone(),
Peer {
identity: peer_id.clone(),
routing_for: false,
sender: peer_tx,
},
);
// maintain direct connection // maintain direct connection
tokio::spawn(utils::maintain_connection( tokio::spawn(utils::maintain_connection(
peer_id.name, peer_id.name,
@ -228,12 +221,10 @@ async fn recv_connection(
let (from_id, target_id) = let (from_id, target_id) =
validate_routing_request(&ext.our.name, first_message, &data.pki)?; validate_routing_request(&ext.our.name, first_message, &data.pki)?;
return create_passthrough( return create_passthrough(
&ext.our, &ext,
&ext.our_ip,
from_id, from_id,
target_id, target_id,
&data.peers, &data,
&data.pending_passthroughs,
PendingStream::WebSocket(socket), PendingStream::WebSocket(socket),
) )
.await; .await;
@ -272,15 +263,9 @@ async fn recv_connection(
&their_id, &their_id,
)?; )?;
let (peer_tx, peer_rx) = mpsc::unbounded_channel(); let (peer, peer_rx) = Peer::new(their_id.clone(), their_handshake.proxy_request);
data.peers.insert( data.peers.insert(their_id.name.clone(), peer).await;
their_id.name.clone(),
Peer {
identity: their_id.clone(),
routing_for: their_handshake.proxy_request,
sender: peer_tx,
},
);
tokio::spawn(utils::maintain_connection( tokio::spawn(utils::maintain_connection(
their_handshake.name, their_handshake.name,
data.peers, data.peers,

View File

@ -1,6 +1,6 @@
use crate::net::{ use crate::net::{
types::{HandshakePayload, IdentityExt, Peers}, types::{HandshakePayload, IdentityExt, Peers},
utils::{print_debug, print_loud, MESSAGE_MAX_SIZE}, utils::{print_debug, print_loud, IDLE_TIMEOUT, MESSAGE_MAX_SIZE},
ws::{PeerConnection, WebSocket}, ws::{PeerConnection, WebSocket},
}; };
use lib::core::{KernelMessage, MessageSender, NodeId, PrintSender}; use lib::core::{KernelMessage, MessageSender, NodeId, PrintSender};
@ -103,13 +103,18 @@ pub async fn maintain_connection(
} }
}; };
let timeout = tokio::time::sleep(IDLE_TIMEOUT);
tokio::select! { tokio::select! {
_ = write => (), _ = write => (),
_ = read => (), _ = read => (),
_ = timeout => {
print_debug(&print_tx, &format!("net: closing idle connection with {peer_name}")).await;
}
} }
print_debug(&print_tx, &format!("net: connection lost with {peer_name}")).await; print_debug(&print_tx, &format!("net: connection lost with {peer_name}")).await;
peers.remove(&peer_name); peers.remove(&peer_name).await;
} }
async fn send_protocol_message( async fn send_protocol_message(

View File

@ -1,9 +1,11 @@
use crate::vfs::UniqueQueue;
use base64::{engine::general_purpose::STANDARD as base64_standard, Engine}; use base64::{engine::general_purpose::STANDARD as base64_standard, Engine};
use dashmap::DashMap; use dashmap::DashMap;
use lib::types::core::{ use lib::types::core::{
Address, CapMessage, CapMessageSender, Capability, KernelMessage, LazyLoadBlob, Message, Address, CapMessage, CapMessageSender, Capability, FdManagerRequest, KernelMessage,
MessageReceiver, MessageSender, PackageId, PrintSender, Printout, ProcessId, Request, Response, LazyLoadBlob, Message, MessageReceiver, MessageSender, PackageId, PrintSender, Printout,
SqlValue, SqliteAction, SqliteError, SqliteRequest, SqliteResponse, SQLITE_PROCESS_ID, ProcessId, Request, Response, SqlValue, SqliteAction, SqliteError, SqliteRequest,
SqliteResponse, FD_MANAGER_PROCESS_ID, SQLITE_PROCESS_ID,
}; };
use rusqlite::Connection; use rusqlite::Connection;
use std::{ use std::{
@ -20,6 +22,83 @@ lazy_static::lazy_static! {
HashSet::from(["ALTER", "ANALYZE", "COMMIT", "CREATE", "DELETE", "DETACH", "DROP", "END", "INSERT", "REINDEX", "RELEASE", "RENAME", "REPLACE", "ROLLBACK", "SAVEPOINT", "UPDATE", "VACUUM"]); HashSet::from(["ALTER", "ANALYZE", "COMMIT", "CREATE", "DELETE", "DETACH", "DROP", "END", "INSERT", "REINDEX", "RELEASE", "RENAME", "REPLACE", "ROLLBACK", "SAVEPOINT", "UPDATE", "VACUUM"]);
} }
#[derive(Clone)]
struct SqliteState {
our: Arc<Address>,
sqlite_path: Arc<String>,
send_to_loop: MessageSender,
send_to_terminal: PrintSender,
open_dbs: Arc<DashMap<(PackageId, String), Mutex<Connection>>>,
access_order: Arc<Mutex<UniqueQueue<(PackageId, String)>>>,
txs: Arc<DashMap<u64, Vec<(String, Vec<SqlValue>)>>>,
fds_limit: u64,
}
impl SqliteState {
pub fn new(
our: Address,
send_to_terminal: PrintSender,
send_to_loop: MessageSender,
home_directory_path: String,
) -> Self {
Self {
our: Arc::new(our),
sqlite_path: Arc::new(format!("{home_directory_path}/sqlite")),
send_to_loop,
send_to_terminal,
open_dbs: Arc::new(DashMap::new()),
access_order: Arc::new(Mutex::new(UniqueQueue::new())),
txs: Arc::new(DashMap::new()),
fds_limit: 10,
}
}
pub async fn open_db(&mut self, package_id: PackageId, db: String) -> Result<(), SqliteError> {
let key = (package_id.clone(), db.clone());
if self.open_dbs.contains_key(&key) {
let mut access_order = self.access_order.lock().await;
access_order.remove(&key);
access_order.push_back(key);
return Ok(());
}
if self.open_dbs.len() as u64 >= self.fds_limit {
// close least recently used db
let key = self.access_order.lock().await.pop_front().unwrap();
self.remove_db(key.0, key.1).await;
}
let db_path = format!("{}/{}/{}", self.sqlite_path.as_str(), package_id, db);
fs::create_dir_all(&db_path).await?;
let db_file_path = format!("{}/{}.db", db_path, db);
let db_conn = Connection::open(db_file_path)?;
let _ = db_conn.execute("PRAGMA journal_mode=WAL", []);
self.open_dbs.insert(key, Mutex::new(db_conn));
let mut access_order = self.access_order.lock().await;
access_order.push_back((package_id, db));
Ok(())
}
pub async fn remove_db(&mut self, package_id: PackageId, db: String) {
self.open_dbs.remove(&(package_id.clone(), db.to_string()));
let mut access_order = self.access_order.lock().await;
access_order.remove(&(package_id, db));
}
pub async fn remove_least_recently_used_dbs(&mut self, n: u64) {
for _ in 0..n {
let mut lock = self.access_order.lock().await;
let key = lock.pop_front().unwrap();
drop(lock);
self.remove_db(key.0, key.1).await;
}
}
}
pub async fn sqlite( pub async fn sqlite(
our_node: Arc<String>, our_node: Arc<String>,
send_to_loop: MessageSender, send_to_loop: MessageSender,
@ -28,30 +107,44 @@ pub async fn sqlite(
send_to_caps_oracle: CapMessageSender, send_to_caps_oracle: CapMessageSender,
home_directory_path: String, home_directory_path: String,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let sqlite_path = Arc::new(format!("{home_directory_path}/sqlite")); let our = Address::new(our_node.as_str(), SQLITE_PROCESS_ID.clone());
if let Err(e) = fs::create_dir_all(&*sqlite_path).await {
crate::fd_manager::send_fd_manager_request_fds_limit(&our, &send_to_loop).await;
let mut state = SqliteState::new(our, send_to_terminal, send_to_loop, home_directory_path);
if let Err(e) = fs::create_dir_all(state.sqlite_path.as_str()).await {
panic!("failed creating sqlite dir! {e:?}"); panic!("failed creating sqlite dir! {e:?}");
} }
let open_dbs: Arc<DashMap<(PackageId, String), Mutex<Connection>>> = Arc::new(DashMap::new());
let txs: Arc<DashMap<u64, Vec<(String, Vec<SqlValue>)>>> = Arc::new(DashMap::new());
let process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> = HashMap::new(); let process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> = HashMap::new();
while let Some(km) = recv_from_loop.recv().await { while let Some(km) = recv_from_loop.recv().await {
if *our_node != km.source.node { if state.our.node != km.source.node {
Printout::new( Printout::new(
1, 1,
format!( format!(
"sqlite: got request from {}, but requests must come from our node {our_node}", "sqlite: got request from {}, but requests must come from our node {}",
km.source.node km.source.node, state.our.node
), ),
) )
.send(&send_to_terminal) .send(&state.send_to_terminal)
.await; .await;
continue; continue;
} }
if km.source.process == *FD_MANAGER_PROCESS_ID {
if let Err(e) = handle_fd_request(km, &mut state).await {
Printout::new(
1,
format!("sqlite: got request from fd_manager that errored: {e:?}"),
)
.send(&state.send_to_terminal)
.await;
};
continue;
}
let queue = process_queues let queue = process_queues
.get(&km.source.process) .get(&km.source.process)
.cloned() .cloned()
@ -63,13 +156,8 @@ pub async fn sqlite(
} }
// clone Arcs // clone Arcs
let our_node = our_node.clone(); let mut state = state.clone();
let send_to_loop = send_to_loop.clone();
let send_to_terminal = send_to_terminal.clone();
let send_to_caps_oracle = send_to_caps_oracle.clone(); let send_to_caps_oracle = send_to_caps_oracle.clone();
let open_dbs = open_dbs.clone();
let txs = txs.clone();
let sqlite_path = sqlite_path.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut queue_lock = queue.lock().await; let mut queue_lock = queue.lock().await;
@ -77,23 +165,13 @@ pub async fn sqlite(
let (km_id, km_rsvp) = let (km_id, km_rsvp) =
(km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone())); (km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone()));
if let Err(e) = handle_request( if let Err(e) = handle_request(km, &mut state, &send_to_caps_oracle).await {
&our_node,
km,
open_dbs,
txs,
&send_to_loop,
&send_to_caps_oracle,
&sqlite_path,
)
.await
{
Printout::new(1, format!("sqlite: {e}")) Printout::new(1, format!("sqlite: {e}"))
.send(&send_to_terminal) .send(&state.send_to_terminal)
.await; .await;
KernelMessage::builder() KernelMessage::builder()
.id(km_id) .id(km_id)
.source((our_node.as_str(), SQLITE_PROCESS_ID.clone())) .source(state.our.as_ref().clone())
.target(km_rsvp) .target(km_rsvp)
.message(Message::Response(( .message(Message::Response((
Response { Response {
@ -107,7 +185,7 @@ pub async fn sqlite(
))) )))
.build() .build()
.unwrap() .unwrap()
.send(&send_to_loop) .send(&state.send_to_loop)
.await; .await;
} }
} }
@ -117,13 +195,9 @@ pub async fn sqlite(
} }
async fn handle_request( async fn handle_request(
our_node: &str,
km: KernelMessage, km: KernelMessage,
open_dbs: Arc<DashMap<(PackageId, String), Mutex<Connection>>>, state: &mut SqliteState,
txs: Arc<DashMap<u64, Vec<(String, Vec<SqlValue>)>>>,
send_to_loop: &MessageSender,
send_to_caps_oracle: &CapMessageSender, send_to_caps_oracle: &CapMessageSender,
sqlite_path: &str,
) -> Result<(), SqliteError> { ) -> Result<(), SqliteError> {
let KernelMessage { let KernelMessage {
id, id,
@ -154,14 +228,11 @@ async fn handle_request(
} }
}; };
check_caps( check_caps(&source, state, send_to_caps_oracle, &request).await?;
our_node,
&source, // always open to ensure db exists
&open_dbs, state
send_to_caps_oracle, .open_db(request.package_id.clone(), request.db.clone())
&request,
sqlite_path,
)
.await?; .await?;
let (body, bytes) = match request.action { let (body, bytes) = match request.action {
@ -174,7 +245,7 @@ async fn handle_request(
(serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None) (serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None)
} }
SqliteAction::Read { query } => { SqliteAction::Read { query } => {
let db = match open_dbs.get(&(request.package_id, request.db)) { let db = match state.open_dbs.get(&(request.package_id, request.db)) {
Some(db) => db, Some(db) => db,
None => { None => {
return Err(SqliteError::NoDb); return Err(SqliteError::NoDb);
@ -230,7 +301,7 @@ async fn handle_request(
) )
} }
SqliteAction::Write { statement, tx_id } => { SqliteAction::Write { statement, tx_id } => {
let db = match open_dbs.get(&(request.package_id, request.db)) { let db = match state.open_dbs.get(&(request.package_id, request.db)) {
Some(db) => db, Some(db) => db,
None => { None => {
return Err(SqliteError::NoDb); return Err(SqliteError::NoDb);
@ -252,7 +323,9 @@ async fn handle_request(
match tx_id { match tx_id {
Some(tx_id) => { Some(tx_id) => {
txs.entry(tx_id) state
.txs
.entry(tx_id)
.or_default() .or_default()
.push((statement.clone(), parameters)); .push((statement.clone(), parameters));
} }
@ -265,7 +338,7 @@ async fn handle_request(
} }
SqliteAction::BeginTx => { SqliteAction::BeginTx => {
let tx_id = rand::random::<u64>(); let tx_id = rand::random::<u64>();
txs.insert(tx_id, Vec::new()); state.txs.insert(tx_id, Vec::new());
( (
serde_json::to_vec(&SqliteResponse::BeginTx { tx_id }).unwrap(), serde_json::to_vec(&SqliteResponse::BeginTx { tx_id }).unwrap(),
@ -273,7 +346,7 @@ async fn handle_request(
) )
} }
SqliteAction::Commit { tx_id } => { SqliteAction::Commit { tx_id } => {
let db = match open_dbs.get(&(request.package_id, request.db)) { let db = match state.open_dbs.get(&(request.package_id, request.db)) {
Some(db) => db, Some(db) => db,
None => { None => {
return Err(SqliteError::NoDb); return Err(SqliteError::NoDb);
@ -281,7 +354,7 @@ async fn handle_request(
}; };
let mut db = db.lock().await; let mut db = db.lock().await;
let txs = match txs.remove(&tx_id).map(|(_, tx)| tx) { let txs = match state.txs.remove(&tx_id).map(|(_, tx)| tx) {
None => { None => {
return Err(SqliteError::NoTx); return Err(SqliteError::NoTx);
} }
@ -297,7 +370,7 @@ async fn handle_request(
(serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None) (serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None)
} }
SqliteAction::Backup => { SqliteAction::Backup => {
for db_ref in open_dbs.iter() { for db_ref in state.open_dbs.iter() {
let db = db_ref.value().lock().await; let db = db_ref.value().lock().await;
let result: rusqlite::Result<()> = db let result: rusqlite::Result<()> = db
.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |_| Ok(())) .query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |_| Ok(()))
@ -315,7 +388,7 @@ async fn handle_request(
if let Some(target) = km.rsvp.or_else(|| expects_response.map(|_| source)) { if let Some(target) = km.rsvp.or_else(|| expects_response.map(|_| source)) {
KernelMessage::builder() KernelMessage::builder()
.id(id) .id(id)
.source((our_node, SQLITE_PROCESS_ID.clone())) .source(state.our.as_ref().clone())
.target(target) .target(target)
.message(Message::Response(( .message(Message::Response((
Response { Response {
@ -332,7 +405,7 @@ async fn handle_request(
})) }))
.build() .build()
.unwrap() .unwrap()
.send(send_to_loop) .send(&state.send_to_loop)
.await; .await;
} }
@ -340,12 +413,10 @@ async fn handle_request(
} }
async fn check_caps( async fn check_caps(
our_node: &str,
source: &Address, source: &Address,
open_dbs: &Arc<DashMap<(PackageId, String), Mutex<Connection>>>, state: &mut SqliteState,
send_to_caps_oracle: &CapMessageSender, send_to_caps_oracle: &CapMessageSender,
request: &SqliteRequest, request: &SqliteRequest,
sqlite_path: &str,
) -> Result<(), SqliteError> { ) -> Result<(), SqliteError> {
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
let src_package_id = PackageId::new(source.process.package(), source.process.publisher()); let src_package_id = PackageId::new(source.process.package(), source.process.publisher());
@ -356,7 +427,7 @@ async fn check_caps(
.send(CapMessage::Has { .send(CapMessage::Has {
on: source.process.clone(), on: source.process.clone(),
cap: Capability::new( cap: Capability::new(
(our_node, SQLITE_PROCESS_ID.clone()), state.our.as_ref().clone(),
serde_json::json!({ serde_json::json!({
"kind": "write", "kind": "write",
"db": request.db.to_string(), "db": request.db.to_string(),
@ -379,7 +450,7 @@ async fn check_caps(
.send(CapMessage::Has { .send(CapMessage::Has {
on: source.process.clone(), on: source.process.clone(),
cap: Capability::new( cap: Capability::new(
(our_node, SQLITE_PROCESS_ID.clone()), state.our.as_ref().clone(),
serde_json::json!({ serde_json::json!({
"kind": "read", "kind": "read",
"db": request.db.to_string(), "db": request.db.to_string(),
@ -407,7 +478,7 @@ async fn check_caps(
add_capability( add_capability(
"read", "read",
&request.db.to_string(), &request.db.to_string(),
&our_node, &state.our,
&source, &source,
send_to_caps_oracle, send_to_caps_oracle,
) )
@ -415,28 +486,22 @@ async fn check_caps(
add_capability( add_capability(
"write", "write",
&request.db.to_string(), &request.db.to_string(),
&our_node, &state.our,
&source, &source,
send_to_caps_oracle, send_to_caps_oracle,
) )
.await?; .await?;
if open_dbs.contains_key(&(request.package_id.clone(), request.db.clone())) { if state
.open_dbs
.contains_key(&(request.package_id.clone(), request.db.clone()))
{
return Ok(()); return Ok(());
} }
let db_path = format!("{}/{}/{}", sqlite_path, request.package_id, request.db); state
fs::create_dir_all(&db_path).await?; .open_db(request.package_id.clone(), request.db.clone())
.await?;
let db_file_path = format!("{}/{}.db", db_path, request.db);
let db = Connection::open(db_file_path)?;
let _ = db.execute("PRAGMA journal_mode=WAL", []);
open_dbs.insert(
(request.package_id.clone(), request.db.clone()),
Mutex::new(db),
);
Ok(()) Ok(())
} }
SqliteAction::RemoveDb => { SqliteAction::RemoveDb => {
@ -446,10 +511,16 @@ async fn check_caps(
}); });
} }
let db_path = format!("{}/{}/{}", sqlite_path, request.package_id, request.db); state
open_dbs.remove(&(request.package_id.clone(), request.db.clone())); .remove_db(request.package_id.clone(), request.db.clone())
.await;
fs::remove_dir_all(format!(
"{}/{}/{}",
state.sqlite_path, request.package_id, request.db
))
.await?;
fs::remove_dir_all(&db_path).await?;
Ok(()) Ok(())
} }
SqliteAction::Backup => { SqliteAction::Backup => {
@ -459,18 +530,41 @@ async fn check_caps(
} }
} }
async fn handle_fd_request(km: KernelMessage, state: &mut SqliteState) -> anyhow::Result<()> {
let Message::Request(Request { body, .. }) = km.message else {
return Err(anyhow::anyhow!("not a request"));
};
let request: FdManagerRequest = serde_json::from_slice(&body)?;
match request {
FdManagerRequest::FdsLimit(new_fds_limit) => {
state.fds_limit = new_fds_limit;
if state.open_dbs.len() as u64 >= state.fds_limit {
crate::fd_manager::send_fd_manager_hit_fds_limit(&state.our, &state.send_to_loop)
.await;
state
.remove_least_recently_used_dbs(state.open_dbs.len() as u64 - state.fds_limit)
.await;
}
}
_ => {
return Err(anyhow::anyhow!("non-Cull FdManagerRequest"));
}
}
Ok(())
}
async fn add_capability( async fn add_capability(
kind: &str, kind: &str,
db: &str, db: &str,
our_node: &str, our: &Address,
source: &Address, source: &Address,
send_to_caps_oracle: &CapMessageSender, send_to_caps_oracle: &CapMessageSender,
) -> Result<(), SqliteError> { ) -> Result<(), SqliteError> {
let cap = Capability { let cap = Capability {
issuer: Address { issuer: our.clone(),
node: our_node.to_string(),
process: SQLITE_PROCESS_ID.clone(),
},
params: serde_json::json!({ "kind": kind, "db": db }).to_string(), params: serde_json::json!({ "kind": kind, "db": db }).to_string(),
}; };
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();

View File

@ -392,7 +392,9 @@ async fn handle_event(
cursor::MoveTo(0, height), cursor::MoveTo(0, height),
terminal::Clear(ClearType::CurrentLine) terminal::Clear(ClearType::CurrentLine)
)?; )?;
*win_cols = width - 1; // since we subtract prompt_len from win_cols, win_cols must always
// be >= prompt_len
*win_cols = std::cmp::max(width - 1, current_line.prompt_len as u16);
*win_rows = height; *win_rows = height;
if current_line.cursor_col + current_line.prompt_len as u16 > *win_cols { if current_line.cursor_col + current_line.prompt_len as u16 > *win_cols {
current_line.cursor_col = *win_cols - current_line.prompt_len as u16; current_line.cursor_col = *win_cols - current_line.prompt_len as u16;

View File

@ -1,9 +1,9 @@
use dashmap::DashMap; use dashmap::DashMap;
use lib::types::core::{ use lib::types::core::{
Address, CapMessage, CapMessageSender, Capability, DirEntry, FileMetadata, FileType, Address, CapMessage, CapMessageSender, Capability, DirEntry, FdManagerRequest, FileMetadata,
KernelMessage, LazyLoadBlob, Message, MessageReceiver, MessageSender, PackageId, PrintSender, FileType, KernelMessage, LazyLoadBlob, Message, MessageReceiver, MessageSender, PackageId,
Printout, ProcessId, Request, Response, VfsAction, VfsError, VfsRequest, VfsResponse, PrintSender, Printout, ProcessId, Request, Response, VfsAction, VfsError, VfsRequest,
KERNEL_PROCESS_ID, VFS_PROCESS_ID, VfsResponse, FD_MANAGER_PROCESS_ID, KERNEL_PROCESS_ID, VFS_PROCESS_ID,
}; };
use std::{ use std::{
collections::{HashMap, HashSet, VecDeque}, collections::{HashMap, HashSet, VecDeque},
@ -19,9 +19,6 @@ use tokio::{
sync::Mutex, sync::Mutex,
}; };
// Constants for file cleanup
const MAX_OPEN_FILES: usize = 180;
/// The main VFS service function. /// The main VFS service function.
/// ///
/// This function sets up the VFS, handles incoming requests, and manages file operations. /// This function sets up the VFS, handles incoming requests, and manages file operations.
@ -52,11 +49,16 @@ pub async fn vfs(
.map_err(|e| anyhow::anyhow!("failed creating vfs dir! {e:?}"))?; .map_err(|e| anyhow::anyhow!("failed creating vfs dir! {e:?}"))?;
let vfs_path = Arc::new(fs::canonicalize(&vfs_path).await?); let vfs_path = Arc::new(fs::canonicalize(&vfs_path).await?);
let files = Files::new(); let mut files = Files::new(
Address::new(our_node.as_str(), VFS_PROCESS_ID.clone()),
send_to_loop,
);
let process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> = let process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> =
HashMap::default(); HashMap::default();
crate::fd_manager::send_fd_manager_request_fds_limit(&files.our, &files.send_to_loop).await;
while let Some(km) = recv_from_loop.recv().await { while let Some(km) = recv_from_loop.recv().await {
if *our_node != km.source.node { if *our_node != km.source.node {
Printout::new( Printout::new(
@ -71,6 +73,18 @@ pub async fn vfs(
continue; continue;
} }
if km.source.process == *FD_MANAGER_PROCESS_ID {
if let Err(e) = handle_fd_request(km, &mut files).await {
Printout::new(
1,
format!("vfs: got request from fd_manager that errored: {e:?}"),
)
.send(&send_to_terminal)
.await;
};
continue;
}
let queue = process_queues let queue = process_queues
.get(&km.source.process) .get(&km.source.process)
.cloned() .cloned()
@ -83,9 +97,8 @@ pub async fn vfs(
// Clone Arcs for the new task // Clone Arcs for the new task
let our_node = our_node.clone(); let our_node = our_node.clone();
let send_to_loop = send_to_loop.clone();
let send_to_caps_oracle = send_to_caps_oracle.clone(); let send_to_caps_oracle = send_to_caps_oracle.clone();
let files = files.clone(); let mut files = files.clone();
let vfs_path = vfs_path.clone(); let vfs_path = vfs_path.clone();
tokio::spawn(async move { tokio::spawn(async move {
@ -94,15 +107,8 @@ pub async fn vfs(
let (km_id, km_rsvp) = let (km_id, km_rsvp) =
(km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone())); (km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone()));
if let Err(e) = handle_request( if let Err(e) =
&our_node, handle_request(&our_node, km, &mut files, &send_to_caps_oracle, &vfs_path).await
km,
files,
&send_to_loop,
&send_to_caps_oracle,
&vfs_path,
)
.await
{ {
KernelMessage::builder() KernelMessage::builder()
.id(km_id) .id(km_id)
@ -119,7 +125,7 @@ pub async fn vfs(
))) )))
.build() .build()
.unwrap() .unwrap()
.send(&send_to_loop) .send(&files.send_to_loop)
.await; .await;
} }
} }
@ -137,6 +143,9 @@ struct Files {
cursor_positions: Arc<DashMap<PathBuf, u64>>, cursor_positions: Arc<DashMap<PathBuf, u64>>,
/// access order of files /// access order of files
access_order: Arc<Mutex<UniqueQueue<PathBuf>>>, access_order: Arc<Mutex<UniqueQueue<PathBuf>>>,
pub our: Address,
pub send_to_loop: MessageSender,
pub fds_limit: u64,
} }
struct FileEntry { struct FileEntry {
@ -145,11 +154,14 @@ struct FileEntry {
} }
impl Files { impl Files {
pub fn new() -> Self { pub fn new(our: Address, send_to_loop: MessageSender) -> Self {
Self { Self {
open_files: Arc::new(DashMap::new()), open_files: Arc::new(DashMap::new()),
cursor_positions: Arc::new(DashMap::new()), cursor_positions: Arc::new(DashMap::new()),
access_order: Arc::new(Mutex::new(UniqueQueue::new())), access_order: Arc::new(Mutex::new(UniqueQueue::new())),
our,
send_to_loop,
fds_limit: 10, // small hardcoded limit that gets replaced by fd_manager soon after boot
} }
} }
@ -167,10 +179,6 @@ impl Files {
return Ok(entry.value().file.clone()); return Ok(entry.value().file.clone());
} }
if self.open_files.len() >= MAX_OPEN_FILES {
self.close_least_recently_used_files().await?;
}
let mut file = self.try_open_file(&path, create, truncate).await?; let mut file = self.try_open_file(&path, create, truncate).await?;
if let Some(position) = self.cursor_positions.get(&path) { if let Some(position) = self.cursor_positions.get(&path) {
file.seek(SeekFrom::Start(*position)).await?; file.seek(SeekFrom::Start(*position)).await?;
@ -184,18 +192,30 @@ impl Files {
}, },
); );
self.update_access_order(&path).await; self.update_access_order(&path).await;
// if open files >= fds_limit, close the (limit/2) least recently used files
if self.open_files.len() as u64 >= self.fds_limit {
crate::fd_manager::send_fd_manager_hit_fds_limit(&self.our, &self.send_to_loop).await;
self.close_least_recently_used_files(self.fds_limit / 2)
.await?;
}
Ok(file) Ok(file)
} }
async fn remove_file(&self, path: &Path) -> Result<(), VfsError> {
self.open_files.remove(path);
Ok(())
}
async fn update_access_order(&self, path: &Path) { async fn update_access_order(&self, path: &Path) {
let mut access_order = self.access_order.lock().await; let mut access_order = self.access_order.lock().await;
access_order.push_back(path.to_path_buf()); access_order.push_back(path.to_path_buf());
} }
async fn close_least_recently_used_files(&self) -> Result<(), VfsError> { async fn close_least_recently_used_files(&self, to_close: u64) -> Result<(), VfsError> {
let mut access_order = self.access_order.lock().await; let mut access_order = self.access_order.lock().await;
let mut closed = 0; let mut closed = 0;
let to_close = MAX_OPEN_FILES / 3; // close 33% of max open files
while closed < to_close { while closed < to_close {
if let Some(path) = access_order.pop_front() { if let Some(path) = access_order.pop_front() {
@ -254,8 +274,7 @@ impl Files {
async fn handle_request( async fn handle_request(
our_node: &str, our_node: &str,
km: KernelMessage, km: KernelMessage,
files: Files, files: &mut Files,
send_to_loop: &MessageSender,
send_to_caps_oracle: &CapMessageSender, send_to_caps_oracle: &CapMessageSender,
vfs_path: &PathBuf, vfs_path: &PathBuf,
) -> Result<(), VfsError> { ) -> Result<(), VfsError> {
@ -311,7 +330,7 @@ async fn handle_request(
))) )))
.build() .build()
.unwrap() .unwrap()
.send(send_to_loop) .send(&files.send_to_loop)
.await; .await;
return Ok(()); return Ok(());
} else { } else {
@ -361,7 +380,7 @@ async fn handle_request(
} }
VfsAction::CreateFile => { VfsAction::CreateFile => {
// create truncates any file that might've existed before // create truncates any file that might've existed before
files.open_files.remove(&path); files.remove_file(&path).await?;
let _file = files.open_file(&path, true, true).await?; let _file = files.open_file(&path, true, true).await?;
(VfsResponse::Ok, None) (VfsResponse::Ok, None)
} }
@ -373,7 +392,7 @@ async fn handle_request(
} }
VfsAction::CloseFile => { VfsAction::CloseFile => {
// removes file from scope, resets file_handle and cursor. // removes file from scope, resets file_handle and cursor.
files.open_files.remove(&path); files.remove_file(&path).await?;
(VfsResponse::Ok, None) (VfsResponse::Ok, None)
} }
VfsAction::WriteAll => { VfsAction::WriteAll => {
@ -470,7 +489,7 @@ async fn handle_request(
} }
VfsAction::RemoveFile => { VfsAction::RemoveFile => {
fs::remove_file(&path).await?; fs::remove_file(&path).await?;
files.open_files.remove(&path); files.remove_file(&path).await?;
(VfsResponse::Ok, None) (VfsResponse::Ok, None)
} }
VfsAction::RemoveDir => { VfsAction::RemoveDir => {
@ -625,7 +644,7 @@ async fn handle_request(
})) }))
.build() .build()
.unwrap() .unwrap()
.send(send_to_loop) .send(&files.send_to_loop)
.await; .await;
} }
@ -902,7 +921,6 @@ fn get_file_type(metadata: &std::fs::Metadata) -> FileType {
} }
/// helper cache for most recently used paths /// helper cache for most recently used paths
pub struct UniqueQueue<T> pub struct UniqueQueue<T>
where where
T: Eq + Hash, T: Eq + Hash,
@ -993,3 +1011,29 @@ fn join_paths_safely(base: &PathBuf, extension: &str) -> PathBuf {
let extension_path = Path::new(extension_str); let extension_path = Path::new(extension_str);
base.join(extension_path) base.join(extension_path)
} }
async fn handle_fd_request(km: KernelMessage, files: &mut Files) -> anyhow::Result<()> {
let Message::Request(Request { body, .. }) = km.message else {
return Err(anyhow::anyhow!("not a request"));
};
let request: FdManagerRequest = serde_json::from_slice(&body)?;
match request {
FdManagerRequest::FdsLimit(fds_limit) => {
files.fds_limit = fds_limit;
if files.open_files.len() as u64 >= fds_limit {
crate::fd_manager::send_fd_manager_hit_fds_limit(&files.our, &files.send_to_loop)
.await;
files
.close_least_recently_used_files(files.open_files.len() as u64 - fds_limit)
.await?;
}
}
_ => {
return Err(anyhow::anyhow!("non-Cull FdManagerRequest"));
}
}
Ok(())
}

View File

@ -1,7 +1,7 @@
[package] [package]
name = "lib" name = "lib"
authors = ["KinodeDAO"] authors = ["KinodeDAO"]
version = "0.9.4" version = "0.9.5"
edition = "2021" edition = "2021"
description = "A general-purpose sovereign cloud computing platform" description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org" homepage = "https://kinode.org"

View File

@ -8,15 +8,17 @@ use thiserror::Error;
lazy_static::lazy_static! { lazy_static::lazy_static! {
pub static ref ETH_PROCESS_ID: ProcessId = ProcessId::new(Some("eth"), "distro", "sys"); pub static ref ETH_PROCESS_ID: ProcessId = ProcessId::new(Some("eth"), "distro", "sys");
pub static ref FD_MANAGER_PROCESS_ID: ProcessId = ProcessId::new(Some("fd_manager"), "distro", "sys");
pub static ref HTTP_CLIENT_PROCESS_ID: ProcessId = ProcessId::new(Some("http_client"), "distro", "sys"); pub static ref HTTP_CLIENT_PROCESS_ID: ProcessId = ProcessId::new(Some("http_client"), "distro", "sys");
pub static ref HTTP_SERVER_PROCESS_ID: ProcessId = ProcessId::new(Some("http_server"), "distro", "sys"); pub static ref HTTP_SERVER_PROCESS_ID: ProcessId = ProcessId::new(Some("http_server"), "distro", "sys");
pub static ref KERNEL_PROCESS_ID: ProcessId = ProcessId::new(Some("kernel"), "distro", "sys"); pub static ref KERNEL_PROCESS_ID: ProcessId = ProcessId::new(Some("kernel"), "distro", "sys");
pub static ref KV_PROCESS_ID: ProcessId = ProcessId::new(Some("kv"), "distro", "sys");
pub static ref NET_PROCESS_ID: ProcessId = ProcessId::new(Some("net"), "distro", "sys");
pub static ref STATE_PROCESS_ID: ProcessId = ProcessId::new(Some("state"), "distro", "sys");
pub static ref SQLITE_PROCESS_ID: ProcessId = ProcessId::new(Some("sqlite"), "distro", "sys");
pub static ref TERMINAL_PROCESS_ID: ProcessId = ProcessId::new(Some("terminal"), "terminal", "sys"); pub static ref TERMINAL_PROCESS_ID: ProcessId = ProcessId::new(Some("terminal"), "terminal", "sys");
pub static ref TIMER_PROCESS_ID: ProcessId = ProcessId::new(Some("timer"), "distro", "sys"); pub static ref TIMER_PROCESS_ID: ProcessId = ProcessId::new(Some("timer"), "distro", "sys");
pub static ref VFS_PROCESS_ID: ProcessId = ProcessId::new(Some("vfs"), "distro", "sys"); pub static ref VFS_PROCESS_ID: ProcessId = ProcessId::new(Some("vfs"), "distro", "sys");
pub static ref STATE_PROCESS_ID: ProcessId = ProcessId::new(Some("state"), "distro", "sys");
pub static ref KV_PROCESS_ID: ProcessId = ProcessId::new(Some("kv"), "distro", "sys");
pub static ref SQLITE_PROCESS_ID: ProcessId = ProcessId::new(Some("sqlite"), "distro", "sys");
} }
// //
@ -1719,6 +1721,8 @@ pub enum VfsError {
NotFound { path: String }, NotFound { path: String },
#[error("Creating directory failed at path: {path}: {error}")] #[error("Creating directory failed at path: {path}: {error}")]
CreateDirError { path: String, error: String }, CreateDirError { path: String, error: String },
#[error("Other error: {error}")]
Other { error: String },
} }
impl VfsError { impl VfsError {
@ -1733,6 +1737,7 @@ impl VfsError {
VfsError::BadJson { .. } => "NoJson", VfsError::BadJson { .. } => "NoJson",
VfsError::NotFound { .. } => "NotFound", VfsError::NotFound { .. } => "NotFound",
VfsError::CreateDirError { .. } => "CreateDirError", VfsError::CreateDirError { .. } => "CreateDirError",
VfsError::Other { .. } => "Other",
} }
} }
} }
@ -2068,3 +2073,53 @@ impl KnsUpdate {
self.ports.get(protocol) self.ports.get(protocol)
} }
} }
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum FdManagerRequest {
/// other process -> fd_manager
/// must send this to fd_manager to get an initial fds_limit
RequestFdsLimit,
/// other process -> fd_manager
/// send this to notify fd_manager that limit was hit,
/// which may or may not be reacted to
FdsLimitHit,
/// fd_manager -> other process
FdsLimit(u64),
/// administrative
UpdateMaxFdsAsFractionOfUlimitPercentage(u64),
/// administrative
UpdateUpdateUlimitSecs(u64),
/// administrative
UpdateCullFractionDenominator(u64),
/// get a `HashMap` of all `ProcessId`s to their number of allocated file descriptors.
GetState,
/// get the `u64` number of file descriptors allocated to `ProcessId`.
GetProcessFdLimit(ProcessId),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum FdManagerResponse {
/// response to [`FdManagerRequest::GetState`]
GetState(HashMap<ProcessId, FdsLimit>),
/// response to [`FdManagerRequest::GetProcessFdLimit`]
GetProcessFdLimit(u64),
}
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct FdsLimit {
pub limit: u64,
pub hit_count: u64,
}
#[derive(Debug, Error)]
pub enum FdManagerError {
#[error("fd_manager: received a non-Request message")]
NotARequest,
#[error("fd_manager: received a non-FdManangerRequest")]
BadRequest,
#[error("fd_manager: received a FdManagerRequest::FdsLimit, but I am the one who sets limits")]
FdManagerWasSentLimit,
}

View File

@ -42,6 +42,7 @@ def build_and_move(feature, tmp_dir, architecture, os_name):
source_path = f"target/release/{binary_name}" source_path = f"target/release/{binary_name}"
dest_path = os.path.join(tmp_dir, binary_name) dest_path = os.path.join(tmp_dir, binary_name)
shutil.move(source_path, dest_path) shutil.move(source_path, dest_path)
os.chmod(dest_path, 0o775)
# Create a zip archive of the binary # Create a zip archive of the binary
zip_path = os.path.join(tmp_dir, zip_name) zip_path = os.path.join(tmp_dir, zip_name)