sqlite: move to runtime

This commit is contained in:
bitful-pannul 2023-12-18 20:16:54 -03:00
parent 9b536effe0
commit e5c5d58d28
5 changed files with 698 additions and 4 deletions

41
Cargo.lock generated
View File

@ -1842,6 +1842,12 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
[[package]]
name = "fallible-streaming-iterator"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fastrand"
version = "2.0.0"
@ -2226,6 +2232,15 @@ dependencies = [
"fxhash",
]
[[package]]
name = "hashlink"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7"
dependencies = [
"hashbrown 0.14.0",
]
[[package]]
name = "headers"
version = "0.3.9"
@ -2821,6 +2836,17 @@ dependencies = [
"zstd-sys",
]
[[package]]
name = "libsqlite3-sys"
version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf4e226dcd58b4be396f7bd3c20da8fdee2911400705297ba7d2d7cc2c30f716"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "libz-sys"
version = "1.1.12"
@ -4081,6 +4107,20 @@ dependencies = [
"tokio",
]
[[package]]
name = "rusqlite"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a78046161564f5e7cd9008aff3b2990b3850dc8e0349119b98e8f251e099f24d"
dependencies = [
"bitflags 2.4.0",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink",
"libsqlite3-sys",
"smallvec 1.11.0",
]
[[package]]
name = "rustc-demangle"
version = "0.1.23"
@ -5279,6 +5319,7 @@ dependencies = [
"rusoto_core",
"rusoto_credential",
"rusoto_s3",
"rusqlite",
"serde",
"serde_json",
"serde_urlencoded",

View File

@ -78,3 +78,4 @@ wasmtime = "15.0.1"
wasmtime-wasi = "15.0.1"
zip = "0.6"
rocksdb = { version = "0.21.0", features = ["multi-threaded-cf"] }
rusqlite = { version = "0.30.0", features = ["bundled"] }

View File

@ -19,6 +19,7 @@ mod kv;
mod net;
mod register;
mod state;
mod sqlite;
mod terminal;
mod timer;
mod types;
@ -34,6 +35,8 @@ const ETH_RPC_CHANNEL_CAPACITY: usize = 32;
const VFS_CHANNEL_CAPACITY: usize = 1_000;
const CAP_CHANNEL_CAPACITY: usize = 1_000;
const KV_CHANNEL_CAPACITY: usize = 1_000;
const SQLITE_CHANNEL_CAPACITY: usize = 1_000;
const VERSION: &str = env!("CARGO_PKG_VERSION");
@ -159,6 +162,9 @@ async fn main() {
// kv sender and receiver
let (kv_sender, kv_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(KV_CHANNEL_CAPACITY);
// sqlite sender and receiver
let (sqlite_sender, sqlite_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(SQLITE_CHANNEL_CAPACITY);
// http server channel w/ websockets (eyre)
let (http_server_sender, http_server_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(HTTP_CHANNEL_CAPACITY);
@ -330,7 +336,16 @@ async fn main() {
state_sender,
true,
),
(ProcessId::new(Some("kv"), "sys", "uqbar"), kv_sender, true),
(
ProcessId::new(Some("kv"), "sys", "uqbar"),
kv_sender,
true,
),
(
ProcessId::new(Some("sqlite"), "sys", "uqbar"),
sqlite_sender,
true,
),
];
#[cfg(feature = "llm")]
@ -406,6 +421,14 @@ async fn main() {
caps_oracle_sender.clone(),
home_directory_path.clone(),
));
tasks.spawn(sqlite::sqlite(
our.name.clone(),
kernel_message_sender.clone(),
print_sender.clone(),
sqlite_receiver,
caps_oracle_sender.clone(),
home_directory_path.clone(),
));
tasks.spawn(http::server::http_server(
our.name.clone(),
http_server_port,

569
src/sqlite.rs Normal file
View File

@ -0,0 +1,569 @@
use anyhow::Result;
use dashmap::DashMap;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::fs;
use tokio::sync::Mutex;
use rusqlite::Connection;
use rusqlite::types::{FromSql, FromSqlError, ToSql, ValueRef};
use crate::types::*;
pub async fn sqlite(
our_node: String,
send_to_loop: MessageSender,
send_to_terminal: PrintSender,
mut recv_from_loop: MessageReceiver,
send_to_caps_oracle: CapMessageSender,
home_directory_path: String,
) -> anyhow::Result<()> {
let sqlite_path = format!("{}/sqlite", &home_directory_path);
if let Err(e) = fs::create_dir_all(&sqlite_path).await {
panic!("failed creating sqlite dir! {:?}", e);
}
let open_dbs: Arc<DashMap<DBKey, Mutex<Connection>>> = Arc::new(DashMap::new());
let txs: Arc<DashMap<u64, Vec<(String, Vec<SqlValue>)>>> = Arc::new(DashMap::new());
let mut process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> =
HashMap::new();
loop {
tokio::select! {
Some(km) = recv_from_loop.recv() => {
if our_node.clone() != km.source.node {
println!(
"sqlite: request must come from our_node={}, got: {}",
our_node,
km.source.node,
);
continue;
}
let queue = process_queues
.entry(km.source.process.clone())
.or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
.clone();
{
let mut queue_lock = queue.lock().await;
queue_lock.push_back(km.clone());
}
// clone Arcs
let our_node = our_node.clone();
let send_to_caps_oracle = send_to_caps_oracle.clone();
let send_to_terminal = send_to_terminal.clone();
let send_to_loop = send_to_loop.clone();
let open_dbs = open_dbs.clone();
let txs = txs.clone();
let sqlite_path = sqlite_path.clone();
tokio::spawn(async move {
let mut queue_lock = queue.lock().await;
if let Some(km) = queue_lock.pop_front() {
if let Err(e) = handle_request(
our_node.clone(),
km.clone(),
open_dbs.clone(),
txs.clone(),
send_to_loop.clone(),
send_to_terminal.clone(),
send_to_caps_oracle.clone(),
sqlite_path.clone(),
)
.await
{
let _ = send_to_loop
.send(make_error_message(our_node.clone(), &km, e))
.await;
}
}
});
}
}
}
}
async fn handle_request(
our_node: String,
km: KernelMessage,
open_dbs: Arc<DashMap<DBKey, Mutex<Connection>>>,
txs: Arc<DashMap<u64, Vec<(String, Vec<SqlValue>)>>>,
send_to_loop: MessageSender,
send_to_terminal: PrintSender,
send_to_caps_oracle: CapMessageSender,
sqlite_path: String,
) -> Result<(), SqliteError> {
let KernelMessage {
id,
source,
message,
payload,
..
} = km.clone();
let Message::Request(Request {
ipc,
expects_response,
metadata,
..
}) = message.clone()
else {
return Err(SqliteError::InputError {
error: "not a request".into(),
});
};
let request: SqliteRequest = match serde_json::from_slice(&ipc) {
Ok(r) => r,
Err(e) => {
println!("sqlite: got invalid Request: {}", e);
return Err(SqliteError::InputError {
error: "didn't serialize to SqliteRequest.".into(),
});
}
};
check_caps(
our_node.clone(),
source.clone(),
open_dbs.clone(),
send_to_caps_oracle.clone(),
&request,
sqlite_path.clone(),
)
.await?;
let (ipc, bytes) = match request.action {
SqliteAction::New => {
// handled in check_caps
//
(serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None)
}
SqliteAction::Read { query } => {
let db = match open_dbs.get(&request.db) {
Some(db) => db,
None => {
return Err(SqliteError::NoDb);
}
};
let db = db.lock().await;
let parameters = get_json_params(payload)?;
let mut statement = db.prepare(&query)?;
let column_names: Vec<String> = statement
.column_names()
.iter()
.map(|c| c.to_string())
.collect();
let results: Vec<HashMap<String, serde_json::Value>> = statement
.query_map(rusqlite::params_from_iter(parameters.iter()), |row| {
let mut map = HashMap::new();
for (i, column_name) in column_names.iter().enumerate() {
let value: SqlValue = row.get(i)?;
let value_json = match value {
SqlValue::Integer(int) => serde_json::Value::Number(int.into()),
SqlValue::Real(real) => serde_json::Value::Number(serde_json::Number::from_f64(real).unwrap()),
SqlValue::Text(text) => serde_json::Value::String(text),
SqlValue::Blob(blob) => serde_json::Value::String(base64::encode(blob)), // or another representation if you prefer
_ => serde_json::Value::Null,
};
map.insert(column_name.clone(), value_json);
}
Ok(map)
})?
.collect::<Result<Vec<_>, _>>()?;
let results = serde_json::json!(results).to_string();
let results_bytes = results.as_bytes().to_vec();
(serde_json::to_vec(&SqliteResponse::Read).unwrap(), Some(results_bytes))
}
SqliteAction::Write { statement, tx_id } => {
let db = match open_dbs.get(&request.db) {
Some(db) => db,
None => {
return Err(SqliteError::NoDb);
}
};
let db = db.lock().await;
let parameters = get_json_params(payload)?;
match tx_id {
Some(tx_id) => {
txs.entry(tx_id)
.or_insert_with(Vec::new)
.push((statement.clone(), parameters));
},
None => {
let mut stmt = db.prepare(&statement)?;
stmt.execute(rusqlite::params_from_iter(parameters.iter()))?;
},
};
(serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None)
}
SqliteAction::BeginTx => {
let tx_id = rand::random::<u64>();
txs.insert(tx_id, Vec::new());
(
serde_json::to_vec(&SqliteResponse::BeginTx { tx_id }).unwrap(),
None,
)
}
SqliteAction::Commit { tx_id } => {
let db = match open_dbs.get(&request.db) {
Some(db) => db,
None => {
return Err(SqliteError::NoDb);
}
};
let mut db = db.lock().await;
let txs = match txs.remove(&tx_id).map(|(_, tx)| tx) {
None => {
return Err(SqliteError::NoTx);
}
Some(tx) => tx,
};
let tx = db.transaction()?;
for (query, params) in txs {
tx.execute(&query, rusqlite::params_from_iter(params.iter()))?;
}
tx.commit()?;
(serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None)
}
SqliteAction::Backup => {
// execute WAL flush.
//
(serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None)
}
};
if let Some(target) = km.rsvp.or_else(|| {
expects_response.map(|_| Address {
node: our_node.clone(),
process: source.process.clone(),
})
}) {
let response = KernelMessage {
id,
source: Address {
node: our_node.clone(),
process: SQLITE_PROCESS_ID.clone(),
},
target,
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc,
metadata,
},
None,
)),
payload: bytes.map(|bytes| Payload {
mime: Some("application/octet-stream".into()),
bytes,
}),
signed_capabilities: None,
};
let _ = send_to_loop.send(response).await;
} else {
send_to_terminal
.send(Printout {
verbosity: 2,
content: format!(
"sqlite: not sending response: {:?}",
serde_json::from_slice::<SqliteResponse>(&ipc)
),
})
.await
.unwrap();
}
Ok(())
}
async fn check_caps(
our_node: String,
source: Address,
open_dbs: Arc<DashMap<DBKey, Mutex<Connection>>>,
mut send_to_caps_oracle: CapMessageSender,
request: &SqliteRequest,
sqlite_path: String,
) -> Result<(), SqliteError> {
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
let src_package_id = PackageId::new(source.process.package(), source.process.publisher());
match &request.action {
SqliteAction::Write { .. }
| SqliteAction::BeginTx
| SqliteAction::Commit { .. } => {
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
process: SQLITE_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
"kind": "write",
"db": request.db.to_string(),
}))
.unwrap(),
},
responder: send_cap_bool,
})
.await?;
let has_cap = recv_cap_bool.await?;
if !has_cap {
return Err(SqliteError::NoCap {
error: request.action.to_string(),
});
}
Ok(())
}
SqliteAction::Read { .. } => {
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
process: SQLITE_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
"kind": "read",
"db": request.db.to_string(),
}))
.unwrap(),
},
responder: send_cap_bool,
})
.await?;
let has_cap = recv_cap_bool.await?;
if !has_cap {
return Err(SqliteError::NoCap {
error: request.action.to_string(),
});
}
Ok(())
}
SqliteAction::New => {
if src_package_id != request.db.package_id {
return Err(SqliteError::NoCap {
error: request.action.to_string(),
});
}
add_capability(
"read",
&request.db.to_string(),
&our_node,
&source,
&mut send_to_caps_oracle,
)
.await?;
add_capability(
"write",
&request.db.to_string(),
&our_node,
&source,
&mut send_to_caps_oracle,
)
.await?;
let db_path = format!("{}{}", sqlite_path, request.db.to_string());
fs::create_dir_all(&db_path).await?;
let db = Connection::open(&db_path)?;
db.execute("PRAGMA journal_mode=WAL;", [])?;
open_dbs.insert(request.db.clone(), Mutex::new(db));
Ok(())
}
SqliteAction::Backup => {
if source.process != *STATE_PROCESS_ID {
return Err(SqliteError::NoCap {
error: request.action.to_string(),
});
}
Ok(())
}
}
}
async fn add_capability(
kind: &str,
db: &str,
our_node: &str,
source: &Address,
send_to_caps_oracle: &mut CapMessageSender,
) -> Result<(), SqliteError> {
let cap = Capability {
issuer: Address {
node: our_node.to_string(),
process: SQLITE_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({ "kind": kind, "db": db })).unwrap(),
};
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
send_to_caps_oracle
.send(CapMessage::Add {
on: source.process.clone(),
cap,
responder: send_cap_bool,
})
.await?;
let _ = recv_cap_bool.await?;
Ok(())
}
fn json_to_sqlite(value: &serde_json::Value) -> Result<SqlValue, SqliteError> {
match value {
serde_json::Value::Number(n) => {
if let Some(int_val) = n.as_i64() {
Ok(SqlValue::Integer(int_val))
} else if let Some(float_val) = n.as_f64() {
Ok(SqlValue::Real(float_val))
} else {
Err(SqliteError::InvalidParameters)
}
},
serde_json::Value::String(s) => {
match base64::decode(&s) {
Ok(decoded_bytes) => {
// convert to SQLite Blob if it's a valid base64 string
Ok(SqlValue::Blob(decoded_bytes))
},
Err(_) => {
// if it's not base64, just use the string itself
Ok(SqlValue::Text(s.clone()))
}
}
},
serde_json::Value::Bool(b) => {
Ok(SqlValue::Boolean(*b))
},
serde_json::Value::Null => {
Ok(SqlValue::Null)
},
_ => {
Err(SqliteError::InvalidParameters)
}
}
}
fn get_json_params(payload: Option<Payload>) -> Result<Vec<SqlValue>, SqliteError> {
match payload {
None => Ok(vec![]),
Some(payload) => {
match serde_json::from_slice::<serde_json::Value>(&payload.bytes) {
Ok(serde_json::Value::Array(vec)) => {
vec.iter().map(|value| json_to_sqlite(value)).collect::<Result<Vec<_>, _>>()
},
_ => Err(SqliteError::InvalidParameters),
}
}
}
}
fn make_error_message(our_name: String, km: &KernelMessage, error: SqliteError) -> KernelMessage {
KernelMessage {
id: km.id,
source: Address {
node: our_name.clone(),
process: KV_PROCESS_ID.clone(),
},
target: match &km.rsvp {
None => km.source.clone(),
Some(rsvp) => rsvp.clone(),
},
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec(&SqliteResponse::Err { error: error }).unwrap(),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
}
}
impl ToSql for SqlValue {
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput> {
match self {
SqlValue::Integer(i) => i.to_sql(),
SqlValue::Real(f) => f.to_sql(),
SqlValue::Text(ref s) => s.to_sql(),
SqlValue::Blob(ref b) => b.to_sql(),
SqlValue::Boolean(b) => b.to_sql(),
SqlValue::Null => Ok(rusqlite::types::ToSqlOutput::Owned(rusqlite::types::Value::Null)),
}
}
}
impl FromSql for SqlValue {
fn column_result(value: ValueRef<'_>) -> Result<Self, FromSqlError> {
match value {
ValueRef::Integer(i) => Ok(SqlValue::Integer(i)),
ValueRef::Real(f) => Ok(SqlValue::Real(f)),
ValueRef::Text(t) => {
let text_str = std::str::from_utf8(t).map_err(|_| FromSqlError::InvalidType)?;
Ok(SqlValue::Text(text_str.to_string()))
},
ValueRef::Blob(b) => Ok(SqlValue::Blob(b.to_vec())),
_ => Err(FromSqlError::InvalidType),
}
}
}
impl std::fmt::Display for SqliteAction {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl From<std::io::Error> for SqliteError {
fn from(err: std::io::Error) -> Self {
SqliteError::IOError {
error: err.to_string(),
}
}
}
impl From<rusqlite::Error> for SqliteError {
fn from(err: rusqlite::Error) -> Self {
SqliteError::RusqliteError {
error: err.to_string(),
}
}
}
impl From<tokio::sync::oneshot::error::RecvError> for SqliteError {
fn from(err: tokio::sync::oneshot::error::RecvError) -> Self {
SqliteError::NoCap {
error: err.to_string(),
}
}
}
impl From<tokio::sync::mpsc::error::SendError<CapMessage>> for SqliteError {
fn from(err: tokio::sync::mpsc::error::SendError<CapMessage>) -> Self {
SqliteError::NoCap {
error: err.to_string(),
}
}
}

View File

@ -15,6 +15,7 @@ lazy_static::lazy_static! {
pub static ref VFS_PROCESS_ID: ProcessId = ProcessId::new(Some("vfs"), "sys", "uqbar");
pub static ref STATE_PROCESS_ID: ProcessId = ProcessId::new(Some("state"), "sys", "uqbar");
pub static ref KV_PROCESS_ID: ProcessId = ProcessId::new(Some("kv"), "sys", "uqbar");
pub static ref SQLITE_PROCESS_ID: ProcessId = ProcessId::new(Some("sqlite"), "sys", "uqbar");
}
//
@ -1067,10 +1068,8 @@ pub enum KvError {
KeyNotFound,
#[error("kv: no Tx found")]
NoTx,
#[error("kv: No capability error: {error}")]
#[error("kv: No capability: {error}")]
NoCap { error: String },
#[error("kv: RejectForeign")]
RejectForeign,
#[error("kv: rocksdb internal error: {error}")]
RocksDBError { action: String, error: String },
#[error("kv: input bytes/json/key error: {error}")]
@ -1078,3 +1077,64 @@ pub enum KvError {
#[error("kv: IO error: {error}")]
IOError { error: String },
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SqliteRequest {
pub db: DBKey,
pub action: SqliteAction,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum SqliteAction {
New,
Write { statement: String, tx_id: Option<u64> },
Read { query: String },
BeginTx,
Commit { tx_id: u64 },
Backup,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum SqliteResponse {
Ok,
Read,
BeginTx { tx_id: u64 },
Err { error: SqliteError },
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SqlValue {
Integer(i64),
Real(f64),
Text(String),
Blob(Vec<u8>),
Boolean(bool),
Null,
}
#[derive(Debug, Serialize, Deserialize, thiserror::Error)]
pub enum SqliteError {
#[error("sqlite: DbDoesNotExist")]
NoDb,
#[error("sqlite: DbAlreadyExists")]
DbAlreadyExists,
#[error("sqlite: NoTx")]
NoTx,
#[error("sqlite: No capability: {error}")]
NoCap { error: String },
#[error("sqlite: UnexpectedResponse")]
UnexpectedResponse,
#[error("sqlite: NotAWriteKeyword")]
NotAWriteKeyword,
#[error("sqlite: NotAReadKeyword")]
NotAReadKeyword,
#[error("sqlite: Invalid Parameters")]
InvalidParameters,
#[error("sqlite: IO error: {error}")]
IOError { error: String },
#[error("sqlite: rusqlite error: {error}")]
RusqliteError { error: String },
#[error("sqlite: input bytes/json/key error: {error}")]
InputError { error: String },
}