From e5c5d58d28620150436b0991a0bae1cec6901596 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Mon, 18 Dec 2023 20:16:54 -0300 Subject: [PATCH] sqlite: move to runtime --- Cargo.lock | 41 ++++ Cargo.toml | 1 + src/main.rs | 25 ++- src/sqlite.rs | 569 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/types.rs | 66 +++++- 5 files changed, 698 insertions(+), 4 deletions(-) create mode 100644 src/sqlite.rs diff --git a/Cargo.lock b/Cargo.lock index a6fcd08e..57dc568e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1842,6 +1842,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "2.0.0" @@ -2226,6 +2232,15 @@ dependencies = [ "fxhash", ] +[[package]] +name = "hashlink" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" +dependencies = [ + "hashbrown 0.14.0", +] + [[package]] name = "headers" version = "0.3.9" @@ -2821,6 +2836,17 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "libsqlite3-sys" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf4e226dcd58b4be396f7bd3c20da8fdee2911400705297ba7d2d7cc2c30f716" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libz-sys" version = "1.1.12" @@ -4081,6 +4107,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "rusqlite" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a78046161564f5e7cd9008aff3b2990b3850dc8e0349119b98e8f251e099f24d" +dependencies = [ + "bitflags 2.4.0", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec 1.11.0", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -5279,6 +5319,7 @@ dependencies = [ "rusoto_core", "rusoto_credential", "rusoto_s3", + "rusqlite", "serde", "serde_json", "serde_urlencoded", diff --git a/Cargo.toml b/Cargo.toml index 340f03f3..fede8a1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,3 +78,4 @@ wasmtime = "15.0.1" wasmtime-wasi = "15.0.1" zip = "0.6" rocksdb = { version = "0.21.0", features = ["multi-threaded-cf"] } +rusqlite = { version = "0.30.0", features = ["bundled"] } diff --git a/src/main.rs b/src/main.rs index 38e7f95b..bfdc8ea6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,6 +19,7 @@ mod kv; mod net; mod register; mod state; +mod sqlite; mod terminal; mod timer; mod types; @@ -34,6 +35,8 @@ const ETH_RPC_CHANNEL_CAPACITY: usize = 32; const VFS_CHANNEL_CAPACITY: usize = 1_000; const CAP_CHANNEL_CAPACITY: usize = 1_000; const KV_CHANNEL_CAPACITY: usize = 1_000; +const SQLITE_CHANNEL_CAPACITY: usize = 1_000; + const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -159,6 +162,9 @@ async fn main() { // kv sender and receiver let (kv_sender, kv_receiver): (MessageSender, MessageReceiver) = mpsc::channel(KV_CHANNEL_CAPACITY); + // sqlite sender and receiver + let (sqlite_sender, sqlite_receiver): (MessageSender, MessageReceiver) = + mpsc::channel(SQLITE_CHANNEL_CAPACITY); // http server channel w/ websockets (eyre) let (http_server_sender, http_server_receiver): (MessageSender, MessageReceiver) = mpsc::channel(HTTP_CHANNEL_CAPACITY); @@ -330,7 +336,16 @@ async fn main() { state_sender, true, ), - (ProcessId::new(Some("kv"), "sys", "uqbar"), kv_sender, true), + ( + ProcessId::new(Some("kv"), "sys", "uqbar"), + kv_sender, + true, + ), + ( + ProcessId::new(Some("sqlite"), "sys", "uqbar"), + sqlite_sender, + true, + ), ]; #[cfg(feature = "llm")] @@ -406,6 +421,14 @@ async fn main() { caps_oracle_sender.clone(), home_directory_path.clone(), )); + tasks.spawn(sqlite::sqlite( + our.name.clone(), + kernel_message_sender.clone(), + print_sender.clone(), + sqlite_receiver, + caps_oracle_sender.clone(), + home_directory_path.clone(), + )); tasks.spawn(http::server::http_server( our.name.clone(), http_server_port, diff --git a/src/sqlite.rs b/src/sqlite.rs new file mode 100644 index 00000000..93cef031 --- /dev/null +++ b/src/sqlite.rs @@ -0,0 +1,569 @@ +use anyhow::Result; +use dashmap::DashMap; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use tokio::fs; +use tokio::sync::Mutex; +use rusqlite::Connection; +use rusqlite::types::{FromSql, FromSqlError, ToSql, ValueRef}; + +use crate::types::*; + +pub async fn sqlite( + our_node: String, + send_to_loop: MessageSender, + send_to_terminal: PrintSender, + mut recv_from_loop: MessageReceiver, + send_to_caps_oracle: CapMessageSender, + home_directory_path: String, +) -> anyhow::Result<()> { + let sqlite_path = format!("{}/sqlite", &home_directory_path); + + if let Err(e) = fs::create_dir_all(&sqlite_path).await { + panic!("failed creating sqlite dir! {:?}", e); + } + + let open_dbs: Arc>> = Arc::new(DashMap::new()); + let txs: Arc)>>> = Arc::new(DashMap::new()); + + let mut process_queues: HashMap>>> = + HashMap::new(); + + loop { + tokio::select! { + Some(km) = recv_from_loop.recv() => { + if our_node.clone() != km.source.node { + println!( + "sqlite: request must come from our_node={}, got: {}", + our_node, + km.source.node, + ); + continue; + } + + let queue = process_queues + .entry(km.source.process.clone()) + .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) + .clone(); + + { + let mut queue_lock = queue.lock().await; + queue_lock.push_back(km.clone()); + } + + // clone Arcs + let our_node = our_node.clone(); + let send_to_caps_oracle = send_to_caps_oracle.clone(); + let send_to_terminal = send_to_terminal.clone(); + let send_to_loop = send_to_loop.clone(); + let open_dbs = open_dbs.clone(); + let txs = txs.clone(); + let sqlite_path = sqlite_path.clone(); + + tokio::spawn(async move { + let mut queue_lock = queue.lock().await; + if let Some(km) = queue_lock.pop_front() { + if let Err(e) = handle_request( + our_node.clone(), + km.clone(), + open_dbs.clone(), + txs.clone(), + send_to_loop.clone(), + send_to_terminal.clone(), + send_to_caps_oracle.clone(), + sqlite_path.clone(), + ) + .await + { + let _ = send_to_loop + .send(make_error_message(our_node.clone(), &km, e)) + .await; + } + } + }); + } + } + } +} + +async fn handle_request( + our_node: String, + km: KernelMessage, + open_dbs: Arc>>, + txs: Arc)>>>, + send_to_loop: MessageSender, + send_to_terminal: PrintSender, + send_to_caps_oracle: CapMessageSender, + sqlite_path: String, +) -> Result<(), SqliteError> { + let KernelMessage { + id, + source, + message, + payload, + .. + } = km.clone(); + let Message::Request(Request { + ipc, + expects_response, + metadata, + .. + }) = message.clone() + else { + return Err(SqliteError::InputError { + error: "not a request".into(), + }); + }; + + let request: SqliteRequest = match serde_json::from_slice(&ipc) { + Ok(r) => r, + Err(e) => { + println!("sqlite: got invalid Request: {}", e); + return Err(SqliteError::InputError { + error: "didn't serialize to SqliteRequest.".into(), + }); + } + }; + + check_caps( + our_node.clone(), + source.clone(), + open_dbs.clone(), + send_to_caps_oracle.clone(), + &request, + sqlite_path.clone(), + ) + .await?; + + let (ipc, bytes) = match request.action { + SqliteAction::New => { + // handled in check_caps + // + (serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None) + } + SqliteAction::Read { query } => { + let db = match open_dbs.get(&request.db) { + Some(db) => db, + None => { + return Err(SqliteError::NoDb); + } + }; + let db = db.lock().await; + + let parameters = get_json_params(payload)?; + + let mut statement = db.prepare(&query)?; + let column_names: Vec = statement + .column_names() + .iter() + .map(|c| c.to_string()) + .collect(); + + let results: Vec> = statement + .query_map(rusqlite::params_from_iter(parameters.iter()), |row| { + let mut map = HashMap::new(); + for (i, column_name) in column_names.iter().enumerate() { + let value: SqlValue = row.get(i)?; + let value_json = match value { + SqlValue::Integer(int) => serde_json::Value::Number(int.into()), + SqlValue::Real(real) => serde_json::Value::Number(serde_json::Number::from_f64(real).unwrap()), + SqlValue::Text(text) => serde_json::Value::String(text), + SqlValue::Blob(blob) => serde_json::Value::String(base64::encode(blob)), // or another representation if you prefer + _ => serde_json::Value::Null, + }; + map.insert(column_name.clone(), value_json); + } + Ok(map) + })? + .collect::, _>>()?; + + let results = serde_json::json!(results).to_string(); + let results_bytes = results.as_bytes().to_vec(); + + (serde_json::to_vec(&SqliteResponse::Read).unwrap(), Some(results_bytes)) + } + SqliteAction::Write { statement, tx_id } => { + let db = match open_dbs.get(&request.db) { + Some(db) => db, + None => { + return Err(SqliteError::NoDb); + } + }; + let db = db.lock().await; + + let parameters = get_json_params(payload)?; + + match tx_id { + Some(tx_id) => { + txs.entry(tx_id) + .or_insert_with(Vec::new) + .push((statement.clone(), parameters)); + }, + None => { + let mut stmt = db.prepare(&statement)?; + stmt.execute(rusqlite::params_from_iter(parameters.iter()))?; + }, + }; + (serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None) + } + SqliteAction::BeginTx => { + let tx_id = rand::random::(); + txs.insert(tx_id, Vec::new()); + + ( + serde_json::to_vec(&SqliteResponse::BeginTx { tx_id }).unwrap(), + None, + ) + } + SqliteAction::Commit { tx_id } => { + let db = match open_dbs.get(&request.db) { + Some(db) => db, + None => { + return Err(SqliteError::NoDb); + } + }; + let mut db = db.lock().await; + + let txs = match txs.remove(&tx_id).map(|(_, tx)| tx) { + None => { + return Err(SqliteError::NoTx); + } + Some(tx) => tx, + }; + + let tx = db.transaction()?; + for (query, params) in txs { + tx.execute(&query, rusqlite::params_from_iter(params.iter()))?; + } + + tx.commit()?; + (serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None) + } + SqliteAction::Backup => { + // execute WAL flush. + // + (serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None) + } + }; + + if let Some(target) = km.rsvp.or_else(|| { + expects_response.map(|_| Address { + node: our_node.clone(), + process: source.process.clone(), + }) + }) { + let response = KernelMessage { + id, + source: Address { + node: our_node.clone(), + process: SQLITE_PROCESS_ID.clone(), + }, + target, + rsvp: None, + message: Message::Response(( + Response { + inherit: false, + ipc, + metadata, + }, + None, + )), + payload: bytes.map(|bytes| Payload { + mime: Some("application/octet-stream".into()), + bytes, + }), + signed_capabilities: None, + }; + + let _ = send_to_loop.send(response).await; + } else { + send_to_terminal + .send(Printout { + verbosity: 2, + content: format!( + "sqlite: not sending response: {:?}", + serde_json::from_slice::(&ipc) + ), + }) + .await + .unwrap(); + } + + Ok(()) +} + +async fn check_caps( + our_node: String, + source: Address, + open_dbs: Arc>>, + mut send_to_caps_oracle: CapMessageSender, + request: &SqliteRequest, + sqlite_path: String, +) -> Result<(), SqliteError> { + let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); + let src_package_id = PackageId::new(source.process.package(), source.process.publisher()); + + match &request.action { + SqliteAction::Write { .. } + | SqliteAction::BeginTx + | SqliteAction::Commit { .. } => { + send_to_caps_oracle + .send(CapMessage::Has { + on: source.process.clone(), + cap: Capability { + issuer: Address { + node: our_node.clone(), + process: SQLITE_PROCESS_ID.clone(), + }, + params: serde_json::to_string(&serde_json::json!({ + "kind": "write", + "db": request.db.to_string(), + })) + .unwrap(), + }, + responder: send_cap_bool, + }) + .await?; + let has_cap = recv_cap_bool.await?; + if !has_cap { + return Err(SqliteError::NoCap { + error: request.action.to_string(), + }); + } + Ok(()) + } + SqliteAction::Read { .. } => { + send_to_caps_oracle + .send(CapMessage::Has { + on: source.process.clone(), + cap: Capability { + issuer: Address { + node: our_node.clone(), + process: SQLITE_PROCESS_ID.clone(), + }, + params: serde_json::to_string(&serde_json::json!({ + "kind": "read", + "db": request.db.to_string(), + })) + .unwrap(), + }, + responder: send_cap_bool, + }) + .await?; + let has_cap = recv_cap_bool.await?; + if !has_cap { + return Err(SqliteError::NoCap { + error: request.action.to_string(), + }); + } + Ok(()) + } + SqliteAction::New => { + if src_package_id != request.db.package_id { + return Err(SqliteError::NoCap { + error: request.action.to_string(), + }); + } + + add_capability( + "read", + &request.db.to_string(), + &our_node, + &source, + &mut send_to_caps_oracle, + ) + .await?; + add_capability( + "write", + &request.db.to_string(), + &our_node, + &source, + &mut send_to_caps_oracle, + ) + .await?; + + let db_path = format!("{}{}", sqlite_path, request.db.to_string()); + + fs::create_dir_all(&db_path).await?; + + let db = Connection::open(&db_path)?; + db.execute("PRAGMA journal_mode=WAL;", [])?; + + open_dbs.insert(request.db.clone(), Mutex::new(db)); + Ok(()) + } + SqliteAction::Backup => { + if source.process != *STATE_PROCESS_ID { + return Err(SqliteError::NoCap { + error: request.action.to_string(), + }); + } + Ok(()) + } + } +} + +async fn add_capability( + kind: &str, + db: &str, + our_node: &str, + source: &Address, + send_to_caps_oracle: &mut CapMessageSender, +) -> Result<(), SqliteError> { + let cap = Capability { + issuer: Address { + node: our_node.to_string(), + process: SQLITE_PROCESS_ID.clone(), + }, + params: serde_json::to_string(&serde_json::json!({ "kind": kind, "db": db })).unwrap(), + }; + let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); + send_to_caps_oracle + .send(CapMessage::Add { + on: source.process.clone(), + cap, + responder: send_cap_bool, + }) + .await?; + let _ = recv_cap_bool.await?; + Ok(()) +} + +fn json_to_sqlite(value: &serde_json::Value) -> Result { + match value { + serde_json::Value::Number(n) => { + if let Some(int_val) = n.as_i64() { + Ok(SqlValue::Integer(int_val)) + } else if let Some(float_val) = n.as_f64() { + Ok(SqlValue::Real(float_val)) + } else { + Err(SqliteError::InvalidParameters) + } + }, + serde_json::Value::String(s) => { + match base64::decode(&s) { + Ok(decoded_bytes) => { + // convert to SQLite Blob if it's a valid base64 string + Ok(SqlValue::Blob(decoded_bytes)) + }, + Err(_) => { + // if it's not base64, just use the string itself + Ok(SqlValue::Text(s.clone())) + } + } + }, + serde_json::Value::Bool(b) => { + Ok(SqlValue::Boolean(*b)) + }, + serde_json::Value::Null => { + Ok(SqlValue::Null) + }, + _ => { + Err(SqliteError::InvalidParameters) + } + } +} + +fn get_json_params(payload: Option) -> Result, SqliteError> { + match payload { + None => Ok(vec![]), + Some(payload) => { + match serde_json::from_slice::(&payload.bytes) { + Ok(serde_json::Value::Array(vec)) => { + vec.iter().map(|value| json_to_sqlite(value)).collect::, _>>() + }, + _ => Err(SqliteError::InvalidParameters), + } + } + } +} + +fn make_error_message(our_name: String, km: &KernelMessage, error: SqliteError) -> KernelMessage { + KernelMessage { + id: km.id, + source: Address { + node: our_name.clone(), + process: KV_PROCESS_ID.clone(), + }, + target: match &km.rsvp { + None => km.source.clone(), + Some(rsvp) => rsvp.clone(), + }, + rsvp: None, + message: Message::Response(( + Response { + inherit: false, + ipc: serde_json::to_vec(&SqliteResponse::Err { error: error }).unwrap(), + metadata: None, + }, + None, + )), + payload: None, + signed_capabilities: None, + } +} +impl ToSql for SqlValue { + fn to_sql(&self) -> rusqlite::Result { + match self { + SqlValue::Integer(i) => i.to_sql(), + SqlValue::Real(f) => f.to_sql(), + SqlValue::Text(ref s) => s.to_sql(), + SqlValue::Blob(ref b) => b.to_sql(), + SqlValue::Boolean(b) => b.to_sql(), + SqlValue::Null => Ok(rusqlite::types::ToSqlOutput::Owned(rusqlite::types::Value::Null)), + } + } +} + +impl FromSql for SqlValue { + fn column_result(value: ValueRef<'_>) -> Result { + match value { + ValueRef::Integer(i) => Ok(SqlValue::Integer(i)), + ValueRef::Real(f) => Ok(SqlValue::Real(f)), + ValueRef::Text(t) => { + let text_str = std::str::from_utf8(t).map_err(|_| FromSqlError::InvalidType)?; + Ok(SqlValue::Text(text_str.to_string())) + }, + ValueRef::Blob(b) => Ok(SqlValue::Blob(b.to_vec())), + _ => Err(FromSqlError::InvalidType), + } + } +} + +impl std::fmt::Display for SqliteAction { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl From for SqliteError { + fn from(err: std::io::Error) -> Self { + SqliteError::IOError { + error: err.to_string(), + } + } +} + +impl From for SqliteError { + fn from(err: rusqlite::Error) -> Self { + SqliteError::RusqliteError { + error: err.to_string(), + } + } +} + +impl From for SqliteError { + fn from(err: tokio::sync::oneshot::error::RecvError) -> Self { + SqliteError::NoCap { + error: err.to_string(), + } + } +} + +impl From> for SqliteError { + fn from(err: tokio::sync::mpsc::error::SendError) -> Self { + SqliteError::NoCap { + error: err.to_string(), + } + } +} \ No newline at end of file diff --git a/src/types.rs b/src/types.rs index 6ad5a479..93a0b72a 100644 --- a/src/types.rs +++ b/src/types.rs @@ -15,6 +15,7 @@ lazy_static::lazy_static! { pub static ref VFS_PROCESS_ID: ProcessId = ProcessId::new(Some("vfs"), "sys", "uqbar"); pub static ref STATE_PROCESS_ID: ProcessId = ProcessId::new(Some("state"), "sys", "uqbar"); pub static ref KV_PROCESS_ID: ProcessId = ProcessId::new(Some("kv"), "sys", "uqbar"); + pub static ref SQLITE_PROCESS_ID: ProcessId = ProcessId::new(Some("sqlite"), "sys", "uqbar"); } // @@ -1067,10 +1068,8 @@ pub enum KvError { KeyNotFound, #[error("kv: no Tx found")] NoTx, - #[error("kv: No capability error: {error}")] + #[error("kv: No capability: {error}")] NoCap { error: String }, - #[error("kv: RejectForeign")] - RejectForeign, #[error("kv: rocksdb internal error: {error}")] RocksDBError { action: String, error: String }, #[error("kv: input bytes/json/key error: {error}")] @@ -1078,3 +1077,64 @@ pub enum KvError { #[error("kv: IO error: {error}")] IOError { error: String }, } + +#[derive(Debug, Serialize, Deserialize)] +pub struct SqliteRequest { + pub db: DBKey, + pub action: SqliteAction, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum SqliteAction { + New, + Write { statement: String, tx_id: Option }, + Read { query: String }, + BeginTx, + Commit { tx_id: u64 }, + Backup, +} + + +#[derive(Debug, Serialize, Deserialize)] +pub enum SqliteResponse { + Ok, + Read, + BeginTx { tx_id: u64 }, + Err { error: SqliteError }, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum SqlValue { + Integer(i64), + Real(f64), + Text(String), + Blob(Vec), + Boolean(bool), + Null, +} + +#[derive(Debug, Serialize, Deserialize, thiserror::Error)] +pub enum SqliteError { + #[error("sqlite: DbDoesNotExist")] + NoDb, + #[error("sqlite: DbAlreadyExists")] + DbAlreadyExists, + #[error("sqlite: NoTx")] + NoTx, + #[error("sqlite: No capability: {error}")] + NoCap { error: String }, + #[error("sqlite: UnexpectedResponse")] + UnexpectedResponse, + #[error("sqlite: NotAWriteKeyword")] + NotAWriteKeyword, + #[error("sqlite: NotAReadKeyword")] + NotAReadKeyword, + #[error("sqlite: Invalid Parameters")] + InvalidParameters, + #[error("sqlite: IO error: {error}")] + IOError { error: String }, + #[error("sqlite: rusqlite error: {error}")] + RusqliteError { error: String }, + #[error("sqlite: input bytes/json/key error: {error}")] + InputError { error: String }, +} \ No newline at end of file