Merge pull request #58 from uqbar-dao/da/runtime-extensions

Da/runtime extensions
This commit is contained in:
dr-frmr 2023-11-14 15:34:59 -05:00 committed by GitHub
commit a744112402
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 531 additions and 83 deletions

96
Cargo.lock generated
View File

@ -138,6 +138,54 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "anstream"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87"
[[package]]
name = "anstyle-parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b"
dependencies = [
"windows-sys",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628"
dependencies = [
"anstyle",
"windows-sys",
]
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.75" version = "1.0.75"
@ -630,6 +678,33 @@ dependencies = [
"zeroize", "zeroize",
] ]
[[package]]
name = "clap"
version = "4.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2275f18819641850fa26c89acc84d465c1bf91ce57bc2748b28c420473352f64"
dependencies = [
"clap_builder",
]
[[package]]
name = "clap_builder"
version = "4.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07cdf1b148b25c1e1f7a42225e30a0d99a615cd4637eae7365548dd4529b95bc"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim 0.10.0",
]
[[package]]
name = "clap_lex"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1"
[[package]] [[package]]
name = "coins-bip32" name = "coins-bip32"
version = "0.8.7" version = "0.8.7"
@ -682,6 +757,12 @@ dependencies = [
"thiserror", "thiserror",
] ]
[[package]]
name = "colorchoice"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]] [[package]]
name = "const-hex" name = "const-hex"
version = "1.8.0" version = "1.8.0"
@ -1037,7 +1118,7 @@ dependencies = [
"ident_case", "ident_case",
"proc-macro2", "proc-macro2",
"quote", "quote",
"strsim", "strsim 0.9.3",
"syn 1.0.109", "syn 1.0.109",
] ]
@ -4421,6 +4502,12 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c" checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c"
[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]] [[package]]
name = "strum" name = "strum"
version = "0.25.0" version = "0.25.0"
@ -5007,6 +5094,7 @@ dependencies = [
"cap-std", "cap-std",
"chacha20poly1305 0.10.1", "chacha20poly1305 0.10.1",
"chrono", "chrono",
"clap",
"crossterm", "crossterm",
"dashmap", "dashmap",
"digest 0.10.7", "digest 0.10.7",
@ -5084,6 +5172,12 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "utf8parse"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "0.8.2" version = "0.8.2"

View File

@ -68,3 +68,7 @@ warp = "0.3.5"
wasmtime = "14.0.4" wasmtime = "14.0.4"
wasmtime-wasi = "14.0.4" wasmtime-wasi = "14.0.4"
zip = "0.6" zip = "0.6"
clap = "4.4.8"
[features]
llm = []

View File

@ -16,6 +16,7 @@ pub async fn load_fs(
home_directory_path: String, home_directory_path: String,
file_key: Vec<u8>, file_key: Vec<u8>,
fs_config: FsConfig, fs_config: FsConfig,
runtime_extensions: Vec<(ProcessId, MessageSender, bool)>,
) -> Result<(ProcessMap, Manifest, Vec<KernelMessage>), FsError> { ) -> Result<(ProcessMap, Manifest, Vec<KernelMessage>), FsError> {
// load/create fs directory, manifest + log if none. // load/create fs directory, manifest + log if none.
let fs_directory_path_str = format!("{}/fs", &home_directory_path); let fs_directory_path_str = format!("{}/fs", &home_directory_path);
@ -71,6 +72,7 @@ pub async fn load_fs(
bootstrap( bootstrap(
&our_name, &our_name,
&kernel_process_id, &kernel_process_id,
runtime_extensions,
&mut process_map, &mut process_map,
&mut manifest, &mut manifest,
) )
@ -97,27 +99,34 @@ pub async fn load_fs(
async fn bootstrap( async fn bootstrap(
our_name: &str, our_name: &str,
kernel_process_id: &FileIdentifier, kernel_process_id: &FileIdentifier,
runtime_extensions: Vec<(ProcessId, MessageSender, bool)>,
process_map: &mut ProcessMap, process_map: &mut ProcessMap,
manifest: &mut Manifest, manifest: &mut Manifest,
) -> Result<Vec<KernelMessage>> { ) -> Result<Vec<KernelMessage>> {
println!("bootstrapping node...\r"); println!("bootstrapping node...\r");
const RUNTIME_MODULES: [(&str, bool); 8] = [
("filesystem:sys:uqbar", false),
("http_server:sys:uqbar", true), // TODO evaluate
("http_client:sys:uqbar", false),
("encryptor:sys:uqbar", false),
("net:sys:uqbar", false),
("vfs:sys:uqbar", true),
("kernel:sys:uqbar", false),
("eth_rpc:sys:uqbar", true), // TODO evaluate
];
let mut runtime_caps: HashSet<Capability> = HashSet::new(); let mut runtime_caps: HashSet<Capability> = HashSet::new();
for runtime_module in RUNTIME_MODULES { // kernel is a special case
runtime_caps.insert(Capability { runtime_caps.insert(Capability {
issuer: Address { issuer: Address {
node: our_name.to_string(), node: our_name.to_string(),
process: ProcessId::from_str(runtime_module.0).unwrap(), process: ProcessId::from_str("kernel:sys:uqbar").unwrap(),
},
params: "\"messaging\"".into(),
});
// net is a special case
runtime_caps.insert(Capability {
issuer: Address {
node: our_name.to_string(),
process: ProcessId::from_str("net:sys:uqbar").unwrap(),
},
params: "\"messaging\"".into(),
});
for runtime_module in runtime_extensions.clone() {
runtime_caps.insert(Capability {
issuer: Address {
node: our_name.to_string(),
process: runtime_module.0,
}, },
params: "\"messaging\"".into(), params: "\"messaging\"".into(),
}); });
@ -132,14 +141,31 @@ async fn bootstrap(
}); });
// finally, save runtime modules in state map as well, somewhat fakely // finally, save runtime modules in state map as well, somewhat fakely
for runtime_module in RUNTIME_MODULES { // special cases for kernel and net
process_map process_map
.entry(ProcessId::from_str(runtime_module.0).unwrap()) .entry(ProcessId::from_str("kernel:sys:uqbar").unwrap())
.or_insert(PersistedProcess { .or_insert(PersistedProcess {
wasm_bytes_handle: 0, wasm_bytes_handle: 0,
on_panic: OnPanic::Restart, on_panic: OnPanic::Restart,
capabilities: runtime_caps.clone(), capabilities: runtime_caps.clone(),
public: runtime_module.1, public: false,
});
process_map
.entry(ProcessId::from_str("net:sys:uqbar").unwrap())
.or_insert(PersistedProcess {
wasm_bytes_handle: 0,
on_panic: OnPanic::Restart,
capabilities: runtime_caps.clone(),
public: false,
});
for runtime_module in runtime_extensions {
process_map
.entry(runtime_module.0)
.or_insert(PersistedProcess {
wasm_bytes_handle: 0,
on_panic: OnPanic::Restart,
capabilities: runtime_caps.clone(),
public: runtime_module.2,
}); });
} }

View File

@ -1802,48 +1802,22 @@ async fn make_event_loop(
mut recv_debug_in_loop: t::DebugReceiver, mut recv_debug_in_loop: t::DebugReceiver,
send_to_loop: t::MessageSender, send_to_loop: t::MessageSender,
send_to_net: t::MessageSender, send_to_net: t::MessageSender,
send_to_fs: t::MessageSender,
send_to_http_server: t::MessageSender,
send_to_http_client: t::MessageSender,
send_to_eth_rpc: t::MessageSender,
send_to_vfs: t::MessageSender,
send_to_encryptor: t::MessageSender,
send_to_terminal: t::PrintSender, send_to_terminal: t::PrintSender,
engine: Engine, engine: Engine,
runtime_extensions: Vec<(t::ProcessId, t::MessageSender, bool)>,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> { ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
// shared global flag to mark if we're finished boot process // shared global flag to mark if we're finished boot process
let booted = Arc::new(AtomicBool::new(false)); let booted = Arc::new(AtomicBool::new(false));
Box::pin(async move { Box::pin(async move {
let mut senders: Senders = HashMap::new(); let mut senders: Senders = HashMap::new();
senders.insert(
t::ProcessId::new(Some("eth_rpc"), "sys", "uqbar"),
ProcessSender::Runtime(send_to_eth_rpc),
);
senders.insert(
t::ProcessId::new(Some("filesystem"), "sys", "uqbar"),
ProcessSender::Runtime(send_to_fs),
);
senders.insert(
t::ProcessId::new(Some("http_server"), "sys", "uqbar"),
ProcessSender::Runtime(send_to_http_server),
);
senders.insert(
t::ProcessId::new(Some("http_client"), "sys", "uqbar"),
ProcessSender::Runtime(send_to_http_client),
);
senders.insert(
t::ProcessId::new(Some("encryptor"), "sys", "uqbar"),
ProcessSender::Runtime(send_to_encryptor),
);
senders.insert( senders.insert(
t::ProcessId::new(Some("net"), "sys", "uqbar"), t::ProcessId::new(Some("net"), "sys", "uqbar"),
ProcessSender::Runtime(send_to_net.clone()), ProcessSender::Runtime(send_to_net.clone()),
); );
senders.insert( for (process_id, sender, _) in runtime_extensions {
t::ProcessId::new(Some("vfs"), "sys", "uqbar"), senders.insert(process_id, ProcessSender::Runtime(sender));
ProcessSender::Runtime(send_to_vfs), }
);
// each running process is stored in this map // each running process is stored in this map
let mut process_handles: ProcessHandles = HashMap::new(); let mut process_handles: ProcessHandles = HashMap::new();
@ -2235,12 +2209,7 @@ pub async fn kernel(
network_error_recv: t::NetworkErrorReceiver, network_error_recv: t::NetworkErrorReceiver,
recv_debug_in_loop: t::DebugReceiver, recv_debug_in_loop: t::DebugReceiver,
send_to_wss: t::MessageSender, send_to_wss: t::MessageSender,
send_to_fs: t::MessageSender, runtime_extensions: Vec<(t::ProcessId, t::MessageSender, bool)>,
send_to_http_server: t::MessageSender,
send_to_http_client: t::MessageSender,
send_to_eth_rpc: t::MessageSender,
send_to_vfs: t::MessageSender,
send_to_encryptor: t::MessageSender,
) -> Result<()> { ) -> Result<()> {
let mut config = Config::new(); let mut config = Config::new();
config.cache_config_load_default().unwrap(); config.cache_config_load_default().unwrap();
@ -2261,14 +2230,9 @@ pub async fn kernel(
recv_debug_in_loop, recv_debug_in_loop,
send_to_loop, send_to_loop,
send_to_wss, send_to_wss,
send_to_fs,
send_to_http_server,
send_to_http_client,
send_to_eth_rpc,
send_to_vfs,
send_to_encryptor,
send_to_terminal, send_to_terminal,
engine, engine,
runtime_extensions,
) )
.await, .await,
); );

7
src/llm/README.md Normal file
View File

@ -0,0 +1,7 @@
# Local LLM Integration
1. Clone and build [llama.cpp](https://github.com/ggerganov/llama.cpp) on the same machine where you will run your uqbar node
- follow their README for details on how to do this. In most cases simply running `make` works
- make sure to get your model as a .gguf file
2. Within the llama.cpp directory, run this command in llama.cpp on the same machine you will run your uqbar node: `./server --port <PORT>`
- Note: you can pass in whatever other command line arguments to the llama cpp server you want depending on your preferences/hardware/model/etc.
3. Run your Uqbar node with `--features llm` and `--llm http://localhost:<PORT>`. For example `cargo +nightly run --features llm --release home --rpc wss://eth-sepolia.g.alchemy.com/v2/<YOUR_API_KEY> --llm http://localhost:<PORT>`

180
src/llm/mod.rs Normal file
View File

@ -0,0 +1,180 @@
use crate::llm::types::*;
use crate::types::*;
use anyhow::Result;
use reqwest::Response as ReqwestResponse;
mod types;
pub async fn llm(
our_name: String,
send_to_loop: MessageSender,
mut recv_in_client: MessageReceiver,
llm_url: String,
print_tx: PrintSender,
) -> Result<()> {
while let Some(message) = recv_in_client.recv().await {
let KernelMessage {
id,
source,
rsvp,
message:
Message::Request(Request {
expects_response,
ipc,
..
}),
..
} = message.clone()
else {
return Err(anyhow::anyhow!("llm: bad message"));
};
let our_name = our_name.clone();
let llm_url = llm_url.clone();
let send_to_loop = send_to_loop.clone();
let print_tx = print_tx.clone();
tokio::spawn(async move {
if let Err(e) = handle_message(
our_name.clone(),
send_to_loop.clone(),
llm_url.clone(),
id,
rsvp,
expects_response,
source.clone(),
ipc,
print_tx.clone(),
)
.await
{
send_to_loop
.send(make_error_message(our_name.clone(), id, source, e))
.await
.unwrap();
}
});
}
Err(anyhow::anyhow!("llm: exited"))
}
async fn handle_message(
our: String,
send_to_loop: MessageSender,
llm_url: String,
id: u64,
rsvp: Option<Address>,
expects_response: Option<u64>,
source: Address,
json: Vec<u8>,
_print_tx: PrintSender,
) -> Result<(), LlmError> {
let target = if expects_response.is_some() {
source.clone()
} else if source.process == ProcessId::from_str("terminal:terminal:uqbar").unwrap() {
source.clone()
} else {
let Some(rsvp) = rsvp else {
return Err(LlmError::BadRsvp);
};
rsvp.clone()
};
let req: LlmPrompt = match serde_json::from_slice(&json) {
Ok(req) => req,
Err(e) => {
return Err(LlmError::BadJson {
json: String::from_utf8(json).unwrap_or_default(),
error: format!("{}", e),
})
}
};
let client = reqwest::Client::new();
let res: ReqwestResponse = match client
.post(&format!("{}/completion", llm_url))
.json(&req)
.send()
.await
{
Ok(res) => res,
Err(e) => {
return Err(LlmError::RequestFailed {
error: format!("{}", e),
});
}
};
let llm_response = match res.json::<LlmResponse>().await {
Ok(response) => response,
Err(e) => {
return Err(LlmError::DeserializationToLlmResponseFailed {
error: format!("{}", e),
});
}
};
let _ = _print_tx
.send(Printout {
verbosity: 0,
content: format!("llm: {:?}", llm_response.clone().content),
})
.await;
let message = KernelMessage {
id,
source: Address {
node: our,
process: ProcessId::new(Some("llm"), "sys", "uqbar"),
},
target,
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec::<Result<LlmResponse, LlmError>>(&Ok(llm_response))
.unwrap(),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
};
send_to_loop.send(message).await.unwrap();
Ok(())
}
//
// helpers
//
fn make_error_message(
our_name: String,
id: u64,
source: Address,
error: LlmError,
) -> KernelMessage {
KernelMessage {
id,
source: source.clone(),
target: Address {
node: our_name.clone(),
process: source.process.clone(),
},
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec::<Result<HttpClientResponse, LlmError>>(&Err(error))
.unwrap(),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
}
}

114
src/llm/types.rs Normal file
View File

@ -0,0 +1,114 @@
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Debug, Serialize, Deserialize)]
pub struct LlmPrompt {
prompt: String, // TODO can be a string or an array of strings
temperature: Option<f64>,
top_k: Option<usize>,
top_p: Option<f64>,
n_predict: Option<isize>, // isize to accommodate -1
n_keep: Option<isize>, // isize to accommodate -1
stream: Option<bool>,
stop: Option<Vec<String>>,
tfs_z: Option<f64>,
typical_p: Option<f64>,
repeat_penalty: Option<f64>,
repeat_last_n: Option<isize>, // isize to accommodate -1
penalize_nl: Option<bool>,
presence_penalty: Option<f64>,
frequency_penalty: Option<f64>,
mirostat: Option<u8>, // u8 as it's 0, 1, or 2
mirostat_tau: Option<f64>,
mirostat_eta: Option<f64>,
grammar: Option<String>,
seed: Option<isize>, // isize to accommodate -1
ignore_eos: Option<bool>,
logit_bias: Option<Vec<(usize, f64)>>,
n_probs: Option<usize>,
image_data: Option<Vec<ImageData>>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ImageData {
data: String, // Base64 string
id: usize,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct LlmResponse {
pub content: String,
pub generation_settings: GenerationSettings,
pub model: String,
pub prompt: String,
pub slot_id: u64,
pub stop: bool,
pub stopped_eos: bool,
pub stopped_limit: bool,
pub stopped_word: bool,
pub stopping_word: String,
pub timings: Timings,
pub tokens_cached: u64,
pub tokens_evaluated: u64,
pub tokens_predicted: u64,
pub truncated: bool,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct GenerationSettings {
pub frequency_penalty: f64,
pub grammar: String,
pub ignore_eos: bool,
pub logit_bias: Vec<serde_json::Value>, // This should be changed to the appropriate type
pub mirostat: u64,
pub mirostat_eta: f64,
pub mirostat_tau: f64,
pub model: String,
pub n_ctx: u64,
pub n_keep: u64,
pub n_predict: u64,
pub n_probs: u64,
pub penalize_nl: bool,
pub presence_penalty: f64,
pub repeat_last_n: u64,
pub repeat_penalty: f64,
pub seed: u64,
pub stop: Vec<serde_json::Value>, // This should be changed to the appropriate type
pub stream: bool,
pub temp: f64,
pub tfs_z: f64,
pub top_k: u64,
pub top_p: f64,
pub typical_p: f64,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct Timings {
pub predicted_ms: f64,
pub predicted_n: u64,
pub predicted_per_second: f64,
pub predicted_per_token_ms: f64,
pub prompt_ms: f64,
pub prompt_n: u64,
pub prompt_per_second: f64,
pub prompt_per_token_ms: f64,
}
#[derive(Error, Debug, Serialize, Deserialize)]
pub enum LlmError {
#[error("llm: rsvp is None but message is expecting response")]
BadRsvp,
#[error("llm: no json in request")]
NoJson,
#[error(
"llm: JSON payload could not be parsed to LlmPrompt: {error}. Got {:?}.",
json
)]
BadJson { json: String, error: String },
#[error("llm: http method not supported: {:?}", method)]
BadMethod { method: String },
#[error("llm: failed to execute request {:?}", error)]
RequestFailed { error: String },
#[error("llm: failed to deserialize response {:?}", error)]
DeserializationToLlmResponseFailed { error: String },
}

View File

@ -1,6 +1,6 @@
use crate::types::*; use crate::types::*;
use anyhow::Result; use anyhow::Result;
use clap::{arg, Command};
use std::env; use std::env;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
@ -19,6 +19,10 @@ mod terminal;
mod types; mod types;
mod vfs; mod vfs;
// extensions
#[cfg(feature = "llm")]
mod llm;
const EVENT_LOOP_CHANNEL_CAPACITY: usize = 10_000; const EVENT_LOOP_CHANNEL_CAPACITY: usize = 10_000;
const EVENT_LOOP_DEBUG_CHANNEL_CAPACITY: usize = 50; const EVENT_LOOP_DEBUG_CHANNEL_CAPACITY: usize = 50;
const TERMINAL_CHANNEL_CAPACITY: usize = 32; const TERMINAL_CHANNEL_CAPACITY: usize = 32;
@ -30,6 +34,8 @@ const ETH_RPC_CHANNEL_CAPACITY: usize = 32;
const VFS_CHANNEL_CAPACITY: usize = 1_000; const VFS_CHANNEL_CAPACITY: usize = 1_000;
const ENCRYPTOR_CHANNEL_CAPACITY: usize = 32; const ENCRYPTOR_CHANNEL_CAPACITY: usize = 32;
const CAP_CHANNEL_CAPACITY: usize = 1_000; const CAP_CHANNEL_CAPACITY: usize = 1_000;
#[cfg(feature = "llm")]
const LLM_CHANNEL_CAPACITY: usize = 32;
// const QNS_SEPOLIA_ADDRESS: &str = "0x9e5ed0e7873E0d7f10eEb6dE72E87fE087A12776"; // const QNS_SEPOLIA_ADDRESS: &str = "0x9e5ed0e7873E0d7f10eEb6dE72E87fE087A12776";
@ -46,24 +52,21 @@ async fn main() {
// console_subscriber::init(); // console_subscriber::init();
// DEMO ONLY: remove all CLI arguments // DEMO ONLY: remove all CLI arguments
let args: Vec<String> = env::args().collect(); let matches = Command::new("Uqbar")
let home_directory_path = &args[1]; .version("0.1.0")
// let home_directory_path = "home"; .author("Uqbar DAO")
// create home directory if it does not already exist .about("A decentralized operating system")
if let Err(e) = fs::create_dir_all(home_directory_path).await { .arg(arg!([home] "Path to home directory").required(true))
panic!("failed to create home directory: {:?}", e); .arg(arg!(--rpc <WS_URL> "Ethereum RPC endpoint (must be wss://)").required(true))
} .arg(arg!(--llm <LLM_URL> "LLM endpoint"))
// read PKI from websocket endpoint served by public RPC .get_matches();
// if you get rate-limited or something, pass in your own RPC as a boot argument let home_directory_path = matches.get_one::<String>("home").unwrap();
let mut rpc_url = "".to_string(); let rpc_url = matches.get_one::<String>("rpc").unwrap();
let llm_url = matches.get_one::<String>("llm");
for (i, arg) in args.iter().enumerate() { #[cfg(not(feature = "llm"))]
if arg == "--rpc" { if let Some(llm_url) = llm_url {
// Check if the next argument exists and is not another flag panic!("You passed in --llm {:?} but you do not have the llm feature enabled. Please re-run with `--features llm`", llm_url);
if i + 1 < args.len() && !args[i + 1].starts_with('-') {
rpc_url = args[i + 1].clone();
}
}
} }
// kernel receives system messages via this channel, all other modules send messages // kernel receives system messages via this channel, all other modules send messages
@ -98,6 +101,10 @@ async fn main() {
// encryptor handles end-to-end encryption for client messages // encryptor handles end-to-end encryption for client messages
let (encryptor_sender, encryptor_receiver): (MessageSender, MessageReceiver) = let (encryptor_sender, encryptor_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(ENCRYPTOR_CHANNEL_CAPACITY); mpsc::channel(ENCRYPTOR_CHANNEL_CAPACITY);
// optional llm extension
#[cfg(feature = "llm")]
let (llm_sender, llm_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(LLM_CHANNEL_CAPACITY);
// terminal receives prints via this channel, all other modules send prints // terminal receives prints via this channel, all other modules send prints
let (print_sender, print_receiver): (PrintSender, PrintReceiver) = let (print_sender, print_receiver): (PrintSender, PrintReceiver) =
mpsc::channel(TERMINAL_CHANNEL_CAPACITY); mpsc::channel(TERMINAL_CHANNEL_CAPACITY);
@ -214,11 +221,57 @@ async fn main() {
println!("registration complete!"); println!("registration complete!");
let mut runtime_extensions = vec![
(
ProcessId::new(Some("filesystem"), "sys", "uqbar"),
fs_message_sender,
false,
),
(
ProcessId::new(Some("http_server"), "sys", "uqbar"),
http_server_sender,
true,
),
(
ProcessId::new(Some("http_client"), "sys", "uqbar"),
http_client_sender,
false,
),
(
ProcessId::new(Some("eth_rpc"), "sys", "uqbar"),
eth_rpc_sender,
true,
),
(
ProcessId::new(Some("vfs"), "sys", "uqbar"),
vfs_message_sender,
true,
),
(
ProcessId::new(Some("encryptor"), "sys", "uqbar"),
encryptor_sender,
false,
),
];
#[cfg(feature = "llm")]
{
if llm_url.is_none() {
panic!("You did not pass in --llm <LLM_URL> but you have the llm feature enabled. Please re-run with `--llm <LLM_URL>`");
}
runtime_extensions.push((
ProcessId::new(Some("llm"), "sys", "uqbar"), // TODO llm:extensions:uqbar ?
llm_sender,
true,
));
}
let (kernel_process_map, manifest, vfs_messages) = filesystem::load_fs( let (kernel_process_map, manifest, vfs_messages) = filesystem::load_fs(
our.name.clone(), our.name.clone(),
home_directory_path.clone(), home_directory_path.clone(),
decoded_keyfile.file_key, decoded_keyfile.file_key,
fs_config, fs_config,
runtime_extensions.clone(),
) )
.await .await
.expect("fs load failed!"); .expect("fs load failed!");
@ -252,12 +305,7 @@ async fn main() {
network_error_receiver, network_error_receiver,
kernel_debug_message_receiver, kernel_debug_message_receiver,
net_message_sender.clone(), net_message_sender.clone(),
fs_message_sender, runtime_extensions,
http_server_sender,
http_client_sender,
eth_rpc_sender,
vfs_message_sender,
encryptor_sender,
)); ));
tasks.spawn(net::networking( tasks.spawn(net::networking(
our.clone(), our.clone(),
@ -315,7 +363,18 @@ async fn main() {
encryptor_receiver, encryptor_receiver,
print_sender.clone(), print_sender.clone(),
)); ));
#[cfg(feature = "llm")]
{
tasks.spawn(llm::llm(
our.name.clone(),
kernel_message_sender.clone(),
llm_receiver,
llm_url.unwrap().to_string(),
print_sender.clone(),
));
}
// if a runtime task exits, try to recover it,
// unless it was terminal signaling a quit
let quit_msg: String = tokio::select! { let quit_msg: String = tokio::select! {
Some(Ok(res)) = tasks.join_next() => { Some(Ok(res)) = tasks.join_next() => {
format!( format!(