mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-29 19:41:39 +03:00
commit
a6ba2cd819
@ -5,7 +5,7 @@ use crate::kinode::process::settings::{
|
||||
};
|
||||
use kinode_process_lib::{
|
||||
await_message, call_init, eth, get_blob, get_capability, homepage, http, kernel_types, kimap,
|
||||
net, println, Address, Capability, LazyLoadBlob, Message, NodeId, ProcessId, Request, Response,
|
||||
net, println, Address, Capability, LazyLoadBlob, Message, ProcessId, Request, Response,
|
||||
SendError, SendErrorKind,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
259
kinode/src/kv.rs
259
kinode/src/kv.rs
@ -2,9 +2,9 @@ use crate::vfs::UniqueQueue;
|
||||
use dashmap::DashMap;
|
||||
use lib::types::core::{
|
||||
Address, CapMessage, CapMessageSender, Capability, FdManagerRequest, KernelMessage, KvAction,
|
||||
KvError, KvRequest, KvResponse, LazyLoadBlob, Message, MessageReceiver, MessageSender,
|
||||
PackageId, PrintSender, Printout, ProcessId, Request, Response, FD_MANAGER_PROCESS_ID,
|
||||
KV_PROCESS_ID,
|
||||
KvCapabilityKind, KvCapabilityParams, KvError, KvRequest, KvResponse, LazyLoadBlob, Message,
|
||||
MessageReceiver, MessageSender, PackageId, PrintSender, Printout, ProcessId, Request, Response,
|
||||
FD_MANAGER_PROCESS_ID, KV_PROCESS_ID,
|
||||
};
|
||||
use rocksdb::OptimisticTransactionDB;
|
||||
use std::{
|
||||
@ -46,48 +46,43 @@ impl KvState {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn open_db(&mut self, package_id: PackageId, db: String) -> Result<(), KvError> {
|
||||
let key = (package_id.clone(), db.clone());
|
||||
if self.open_kvs.contains_key(&key) {
|
||||
pub async fn open_db(&mut self, key: &(PackageId, String)) -> Result<(), KvError> {
|
||||
if self.open_kvs.contains_key(key) {
|
||||
let mut access_order = self.access_order.lock().await;
|
||||
access_order.remove(&key);
|
||||
access_order.push_back(key);
|
||||
access_order.remove(key);
|
||||
access_order.push_back(key.clone());
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if self.open_kvs.len() as u64 >= self.fds_limit {
|
||||
// close least recently used db
|
||||
let key = self.access_order.lock().await.pop_front().unwrap();
|
||||
self.remove_db(key.0, key.1).await;
|
||||
let to_close = self.access_order.lock().await.pop_front().unwrap();
|
||||
self.remove_db(&to_close).await;
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
let db_path = self.kv_path.join(format!("{package_id}")).join(&db);
|
||||
let db_path = self.kv_path.join(format!("{}", key.0)).join(&key.1);
|
||||
#[cfg(target_os = "windows")]
|
||||
let db_path = self
|
||||
.kv_path
|
||||
.join(format!(
|
||||
"{}_{}",
|
||||
package_id._package(),
|
||||
package_id._publisher()
|
||||
))
|
||||
.join(&db);
|
||||
.join(format!("{}_{}", key.0._package(), key.0._publisher()))
|
||||
.join(&key.1);
|
||||
|
||||
fs::create_dir_all(&db_path).await?;
|
||||
|
||||
self.open_kvs.insert(
|
||||
key,
|
||||
key.clone(),
|
||||
OptimisticTransactionDB::open_default(&db_path).map_err(rocks_to_kv_err)?,
|
||||
);
|
||||
let mut access_order = self.access_order.lock().await;
|
||||
access_order.push_back((package_id, db));
|
||||
access_order.push_back(key.clone());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn remove_db(&mut self, package_id: PackageId, db: String) {
|
||||
self.open_kvs.remove(&(package_id.clone(), db.to_string()));
|
||||
pub async fn remove_db(&mut self, key: &(PackageId, String)) {
|
||||
self.open_kvs.remove(key);
|
||||
let mut access_order = self.access_order.lock().await;
|
||||
access_order.remove(&(package_id, db));
|
||||
access_order.remove(key);
|
||||
}
|
||||
|
||||
pub async fn remove_least_recently_used_dbs(&mut self, n: u64) {
|
||||
@ -95,7 +90,7 @@ impl KvState {
|
||||
let mut lock = self.access_order.lock().await;
|
||||
let key = lock.pop_front().unwrap();
|
||||
drop(lock);
|
||||
self.remove_db(key.0, key.1).await;
|
||||
self.remove_db(&key).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -215,29 +210,33 @@ async fn handle_request(
|
||||
..
|
||||
}) = message
|
||||
else {
|
||||
return Err(KvError::InputError {
|
||||
error: "not a request".into(),
|
||||
});
|
||||
// we got a response -- safe to ignore
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let request: KvRequest = match serde_json::from_slice(&body) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
println!("kv: got invalid Request: {}", e);
|
||||
return Err(KvError::InputError {
|
||||
error: "didn't serialize to KvAction.".into(),
|
||||
});
|
||||
println!("kv: got invalid request: {e}");
|
||||
return Err(KvError::MalformedRequest);
|
||||
}
|
||||
};
|
||||
|
||||
check_caps(&source, state, send_to_caps_oracle, &request).await?;
|
||||
let db_key = (request.package_id, request.db);
|
||||
|
||||
check_caps(
|
||||
&source,
|
||||
state,
|
||||
send_to_caps_oracle,
|
||||
&request.action,
|
||||
&db_key,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// always open to ensure db exists
|
||||
state
|
||||
.open_db(request.package_id.clone(), request.db.clone())
|
||||
.await?;
|
||||
state.open_db(&db_key).await?;
|
||||
|
||||
let (body, bytes) = match &request.action {
|
||||
let (body, bytes) = match request.action {
|
||||
KvAction::Open => {
|
||||
// handled in check_caps.
|
||||
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
|
||||
@ -246,27 +245,24 @@ async fn handle_request(
|
||||
// handled in check_caps.
|
||||
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
|
||||
}
|
||||
KvAction::Get { key } => {
|
||||
let db = match state.open_kvs.get(&(request.package_id, request.db)) {
|
||||
KvAction::Get(key) => {
|
||||
let db = match state.open_kvs.get(&db_key) {
|
||||
None => {
|
||||
return Err(KvError::NoDb);
|
||||
return Err(KvError::NoDb(db_key.0, db_key.1));
|
||||
}
|
||||
Some(db) => db,
|
||||
};
|
||||
|
||||
match db.get(key) {
|
||||
match db.get(&key) {
|
||||
Ok(Some(value)) => (
|
||||
serde_json::to_vec(&KvResponse::Get { key: key.to_vec() }).unwrap(),
|
||||
serde_json::to_vec(&KvResponse::Get(key)).unwrap(),
|
||||
Some(value),
|
||||
),
|
||||
Ok(None) => {
|
||||
return Err(KvError::KeyNotFound);
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(KvError::RocksDBError {
|
||||
action: request.action.to_string(),
|
||||
error: e.to_string(),
|
||||
})
|
||||
return Err(rocks_to_kv_err(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -278,17 +274,15 @@ async fn handle_request(
|
||||
None,
|
||||
)
|
||||
}
|
||||
KvAction::Set { key, tx_id } => {
|
||||
let db = match state.open_kvs.get(&(request.package_id, request.db)) {
|
||||
KvAction::Set { ref key, tx_id } => {
|
||||
let db = match state.open_kvs.get(&db_key) {
|
||||
None => {
|
||||
return Err(KvError::NoDb);
|
||||
return Err(KvError::NoDb(db_key.0, db_key.1));
|
||||
}
|
||||
Some(db) => db,
|
||||
};
|
||||
let Some(blob) = blob else {
|
||||
return Err(KvError::InputError {
|
||||
error: "no blob".into(),
|
||||
});
|
||||
return Err(KvError::MalformedRequest);
|
||||
};
|
||||
|
||||
match tx_id {
|
||||
@ -296,22 +290,22 @@ async fn handle_request(
|
||||
db.put(key, blob.bytes).map_err(rocks_to_kv_err)?;
|
||||
}
|
||||
Some(tx_id) => {
|
||||
let mut tx = match state.txs.get_mut(tx_id) {
|
||||
let mut tx = match state.txs.get_mut(&tx_id) {
|
||||
None => {
|
||||
return Err(KvError::NoTx);
|
||||
return Err(KvError::NoTx(tx_id));
|
||||
}
|
||||
Some(tx) => tx,
|
||||
};
|
||||
tx.push((request.action.clone(), Some(blob.bytes)));
|
||||
tx.push((request.action, Some(blob.bytes)));
|
||||
}
|
||||
}
|
||||
|
||||
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
|
||||
}
|
||||
KvAction::Delete { key, tx_id } => {
|
||||
let db = match state.open_kvs.get(&(request.package_id, request.db)) {
|
||||
KvAction::Delete { ref key, tx_id } => {
|
||||
let db = match state.open_kvs.get(&db_key) {
|
||||
None => {
|
||||
return Err(KvError::NoDb);
|
||||
return Err(KvError::NoDb(db_key.0, db_key.1));
|
||||
}
|
||||
Some(db) => db,
|
||||
};
|
||||
@ -320,28 +314,28 @@ async fn handle_request(
|
||||
db.delete(key).map_err(rocks_to_kv_err)?;
|
||||
}
|
||||
Some(tx_id) => {
|
||||
let mut tx = match state.txs.get_mut(tx_id) {
|
||||
let mut tx = match state.txs.get_mut(&tx_id) {
|
||||
None => {
|
||||
return Err(KvError::NoTx);
|
||||
return Err(KvError::NoTx(tx_id));
|
||||
}
|
||||
Some(tx) => tx,
|
||||
};
|
||||
tx.push((request.action.clone(), None));
|
||||
tx.push((request.action, None));
|
||||
}
|
||||
}
|
||||
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
|
||||
}
|
||||
KvAction::Commit { tx_id } => {
|
||||
let db = match state.open_kvs.get(&(request.package_id, request.db)) {
|
||||
let db = match state.open_kvs.get(&db_key) {
|
||||
None => {
|
||||
return Err(KvError::NoDb);
|
||||
return Err(KvError::NoDb(db_key.0, db_key.1));
|
||||
}
|
||||
Some(db) => db,
|
||||
};
|
||||
|
||||
let txs = match state.txs.remove(tx_id).map(|(_, tx)| tx) {
|
||||
let txs = match state.txs.remove(&tx_id).map(|(_, tx)| tx) {
|
||||
None => {
|
||||
return Err(KvError::NoTx);
|
||||
return Err(KvError::NoTx(tx_id));
|
||||
}
|
||||
Some(tx) => tx,
|
||||
};
|
||||
@ -364,21 +358,10 @@ async fn handle_request(
|
||||
match tx.commit() {
|
||||
Ok(_) => (serde_json::to_vec(&KvResponse::Ok).unwrap(), None),
|
||||
Err(e) => {
|
||||
return Err(KvError::RocksDBError {
|
||||
action: request.action.to_string(),
|
||||
error: e.to_string(),
|
||||
})
|
||||
return Err(rocks_to_kv_err(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
KvAction::Backup => {
|
||||
// looping through open dbs and flushing their memtables
|
||||
for db_ref in state.open_kvs.iter() {
|
||||
let db = db_ref.value();
|
||||
db.flush().map_err(rocks_to_kv_err)?;
|
||||
}
|
||||
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(target) = km.rsvp.or_else(|| expects_response.map(|_| source)) {
|
||||
@ -412,128 +395,110 @@ async fn check_caps(
|
||||
source: &Address,
|
||||
state: &mut KvState,
|
||||
send_to_caps_oracle: &CapMessageSender,
|
||||
request: &KvRequest,
|
||||
action: &KvAction,
|
||||
db_key: &(PackageId, String),
|
||||
) -> Result<(), KvError> {
|
||||
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
|
||||
let src_package_id = PackageId::new(source.process.package(), source.process.publisher());
|
||||
|
||||
match &request.action {
|
||||
match &action {
|
||||
KvAction::Delete { .. }
|
||||
| KvAction::Set { .. }
|
||||
| KvAction::BeginTx
|
||||
| KvAction::Commit { .. } => {
|
||||
send_to_caps_oracle
|
||||
let Ok(()) = send_to_caps_oracle
|
||||
.send(CapMessage::Has {
|
||||
on: source.process.clone(),
|
||||
cap: Capability::new(
|
||||
state.our.as_ref().clone(),
|
||||
serde_json::json!({
|
||||
"kind": "write",
|
||||
"db": request.db.to_string(),
|
||||
serde_json::to_string(&KvCapabilityParams {
|
||||
kind: KvCapabilityKind::Write,
|
||||
db_key: db_key.clone(),
|
||||
})
|
||||
.to_string(),
|
||||
.unwrap(),
|
||||
),
|
||||
responder: send_cap_bool,
|
||||
})
|
||||
.await?;
|
||||
let has_cap = recv_cap_bool.await?;
|
||||
if !has_cap {
|
||||
return Err(KvError::NoCap {
|
||||
error: request.action.to_string(),
|
||||
});
|
||||
}
|
||||
.await
|
||||
else {
|
||||
return Err(KvError::NoWriteCap);
|
||||
};
|
||||
let Ok(true) = recv_cap_bool.await else {
|
||||
return Err(KvError::NoWriteCap);
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
KvAction::Get { .. } => {
|
||||
send_to_caps_oracle
|
||||
let Ok(()) = send_to_caps_oracle
|
||||
.send(CapMessage::Has {
|
||||
on: source.process.clone(),
|
||||
cap: Capability::new(
|
||||
state.our.as_ref().clone(),
|
||||
serde_json::json!({
|
||||
"kind": "read",
|
||||
"db": request.db.to_string(),
|
||||
serde_json::to_string(&KvCapabilityParams {
|
||||
kind: KvCapabilityKind::Read,
|
||||
db_key: db_key.clone(),
|
||||
})
|
||||
.to_string(),
|
||||
.unwrap(),
|
||||
),
|
||||
responder: send_cap_bool,
|
||||
})
|
||||
.await?;
|
||||
let has_cap = recv_cap_bool.await?;
|
||||
if !has_cap {
|
||||
return Err(KvError::NoCap {
|
||||
error: request.action.to_string(),
|
||||
});
|
||||
}
|
||||
.await
|
||||
else {
|
||||
return Err(KvError::NoReadCap);
|
||||
};
|
||||
let Ok(true) = recv_cap_bool.await else {
|
||||
return Err(KvError::NoReadCap);
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
KvAction::Open { .. } => {
|
||||
if src_package_id != request.package_id {
|
||||
return Err(KvError::NoCap {
|
||||
error: request.action.to_string(),
|
||||
});
|
||||
if src_package_id != db_key.0 {
|
||||
return Err(KvError::MismatchingPackageId);
|
||||
}
|
||||
|
||||
add_capability(
|
||||
"read",
|
||||
&request.db.to_string(),
|
||||
KvCapabilityKind::Read,
|
||||
&db_key,
|
||||
&state.our,
|
||||
&source,
|
||||
send_to_caps_oracle,
|
||||
)
|
||||
.await?;
|
||||
add_capability(
|
||||
"write",
|
||||
&request.db.to_string(),
|
||||
KvCapabilityKind::Write,
|
||||
&db_key,
|
||||
&state.our,
|
||||
&source,
|
||||
send_to_caps_oracle,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if state
|
||||
.open_kvs
|
||||
.contains_key(&(request.package_id.clone(), request.db.clone()))
|
||||
{
|
||||
if state.open_kvs.contains_key(&db_key) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
state
|
||||
.open_db(request.package_id.clone(), request.db.clone())
|
||||
.await?;
|
||||
state.open_db(&db_key).await?;
|
||||
Ok(())
|
||||
}
|
||||
KvAction::RemoveDb { .. } => {
|
||||
if src_package_id != request.package_id {
|
||||
return Err(KvError::NoCap {
|
||||
error: request.action.to_string(),
|
||||
});
|
||||
if src_package_id != db_key.0 {
|
||||
return Err(KvError::MismatchingPackageId);
|
||||
}
|
||||
|
||||
state
|
||||
.remove_db(request.package_id.clone(), request.db.clone())
|
||||
.await;
|
||||
state.remove_db(&db_key).await;
|
||||
|
||||
#[cfg(unix)]
|
||||
let db_path = state
|
||||
.kv_path
|
||||
.join(format!("{}", request.package_id))
|
||||
.join(&request.db);
|
||||
let db_path = state.kv_path.join(format!("{}", db_key.0)).join(&db_key.1);
|
||||
#[cfg(target_os = "windows")]
|
||||
let db_path = state
|
||||
.kv_path
|
||||
.join(format!(
|
||||
"{}_{}",
|
||||
request.package_id._package(),
|
||||
request.package_id._publisher()
|
||||
))
|
||||
.join(&request.db);
|
||||
.join(format!("{}_{}", db_key.0._package(), db_key.0._publisher()))
|
||||
.join(&db_key.1);
|
||||
|
||||
fs::remove_dir_all(&db_path).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
KvAction::Backup { .. } => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
@ -564,31 +529,37 @@ async fn handle_fd_request(km: KernelMessage, state: &mut KvState) -> anyhow::Re
|
||||
}
|
||||
|
||||
async fn add_capability(
|
||||
kind: &str,
|
||||
db: &str,
|
||||
kind: KvCapabilityKind,
|
||||
db_key: &(PackageId, String),
|
||||
our: &Address,
|
||||
source: &Address,
|
||||
send_to_caps_oracle: &CapMessageSender,
|
||||
) -> Result<(), KvError> {
|
||||
let cap = Capability {
|
||||
issuer: our.clone(),
|
||||
params: serde_json::json!({ "kind": kind, "db": db }).to_string(),
|
||||
params: serde_json::to_string(&KvCapabilityParams {
|
||||
kind,
|
||||
db_key: db_key.clone(),
|
||||
})
|
||||
.unwrap(),
|
||||
};
|
||||
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
|
||||
send_to_caps_oracle
|
||||
let Ok(()) = send_to_caps_oracle
|
||||
.send(CapMessage::Add {
|
||||
on: source.process.clone(),
|
||||
caps: vec![cap],
|
||||
responder: Some(send_cap_bool),
|
||||
})
|
||||
.await?;
|
||||
let _ = recv_cap_bool.await?;
|
||||
.await
|
||||
else {
|
||||
return Err(KvError::AddCapFailed);
|
||||
};
|
||||
let Ok(_) = recv_cap_bool.await else {
|
||||
return Err(KvError::AddCapFailed);
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn rocks_to_kv_err(error: rocksdb::Error) -> KvError {
|
||||
KvError::RocksDBError {
|
||||
action: "".into(),
|
||||
error: error.to_string(),
|
||||
}
|
||||
KvError::RocksDBError(error.to_string())
|
||||
}
|
||||
|
160
lib/src/kv.rs
160
lib/src/kv.rs
@ -1,79 +1,153 @@
|
||||
use crate::types::core::{CapMessage, PackageId};
|
||||
use crate::types::core::PackageId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror::Error;
|
||||
|
||||
/// IPC Request format for the kv:distro:sys runtime module.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
/// Actions are sent to a specific key value database. `db` is the name,
|
||||
/// `package_id` is the [`PackageId`] that created the database. Capabilities
|
||||
/// are checked: you can access another process's database if it has given
|
||||
/// you the read and/or write capability to do so.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct KvRequest {
|
||||
pub package_id: PackageId,
|
||||
pub db: String,
|
||||
pub action: KvAction,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
/// IPC Action format representing operations that can be performed on the
|
||||
/// key-value runtime module. These actions are included in a [`KvRequest`]
|
||||
/// sent to the `kv:distro:sys` runtime module.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum KvAction {
|
||||
/// Opens an existing key-value database or creates a new one if it doesn't exist.
|
||||
/// Requires `package_id` in [`KvRequest`] to match the package ID of the sender.
|
||||
/// The sender will own the database and can remove it with [`KvAction::RemoveDb`].
|
||||
///
|
||||
/// A successful open will respond with [`KvResponse::Ok`]. Any error will be
|
||||
/// contained in the [`KvResponse::Err`] variant.
|
||||
Open,
|
||||
/// Permanently deletes the entire key-value database.
|
||||
/// Requires `package_id` in [`KvRequest`] to match the package ID of the sender.
|
||||
/// Only the owner can remove the database.
|
||||
///
|
||||
/// A successful remove will respond with [`KvResponse::Ok`]. Any error will be
|
||||
/// contained in the [`KvResponse::Err`] variant.
|
||||
RemoveDb,
|
||||
/// Sets a value for the specified key in the database.
|
||||
///
|
||||
/// # Parameters
|
||||
/// * `key` - The key as a byte vector
|
||||
/// * `tx_id` - Optional transaction ID if this operation is part of a transaction
|
||||
/// * blob: [`Vec<u8>`] - Byte vector to store for the key
|
||||
///
|
||||
/// Using this action requires the sender to have the write capability
|
||||
/// for the database.
|
||||
///
|
||||
/// A successful set will respond with [`KvResponse::Ok`]. Any error will be
|
||||
/// contained in the [`KvResponse::Err`] variant.
|
||||
Set { key: Vec<u8>, tx_id: Option<u64> },
|
||||
/// Deletes a key-value pair from the database.
|
||||
///
|
||||
/// # Parameters
|
||||
/// * `key` - The key to delete as a byte vector
|
||||
/// * `tx_id` - Optional transaction ID if this operation is part of a transaction
|
||||
///
|
||||
/// Using this action requires the sender to have the write capability
|
||||
/// for the database.
|
||||
///
|
||||
/// A successful delete will respond with [`KvResponse::Ok`]. Any error will be
|
||||
/// contained in the [`KvResponse::Err`] variant.
|
||||
Delete { key: Vec<u8>, tx_id: Option<u64> },
|
||||
Get { key: Vec<u8> },
|
||||
/// Retrieves the value associated with the specified key.
|
||||
///
|
||||
/// # Parameters
|
||||
/// * The key to look up as a byte vector
|
||||
///
|
||||
/// Using this action requires the sender to have the read capability
|
||||
/// for the database.
|
||||
///
|
||||
/// A successful get will respond with [`KvResponse::Get`], where the response blob
|
||||
/// contains the value associated with the key if any. Any error will be
|
||||
/// contained in the [`KvResponse::Err`] variant.
|
||||
Get(Vec<u8>),
|
||||
/// Begins a new transaction for atomic operations.
|
||||
///
|
||||
/// Sending this will prompt a [`KvResponse::BeginTx`] response with the
|
||||
/// transaction ID. Any error will be contained in the [`KvResponse::Err`] variant.
|
||||
BeginTx,
|
||||
/// Commits all operations in the specified transaction.
|
||||
///
|
||||
/// # Parameters
|
||||
/// * `tx_id` - The ID of the transaction to commit
|
||||
///
|
||||
/// A successful commit will respond with [`KvResponse::Ok`]. Any error will be
|
||||
/// contained in the [`KvResponse::Err`] variant.
|
||||
Commit { tx_id: u64 },
|
||||
Backup,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum KvResponse {
|
||||
/// Indicates successful completion of an operation.
|
||||
/// Sent in response to actions Open, RemoveDb, Set, Delete, and Commit.
|
||||
Ok,
|
||||
/// Returns the transaction ID for a newly created transaction.
|
||||
///
|
||||
/// # Fields
|
||||
/// * `tx_id` - The ID of the newly created transaction
|
||||
BeginTx { tx_id: u64 },
|
||||
Get { key: Vec<u8> },
|
||||
/// Returns the value for the key that was retrieved from the database.
|
||||
///
|
||||
/// # Parameters
|
||||
/// * The retrieved key as a byte vector
|
||||
/// * blob: [`Vec<u8>`] - Byte vector associated with the key
|
||||
Get(Vec<u8>),
|
||||
/// Indicates an error occurred during the operation.
|
||||
Err(KvError),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Error)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Error)]
|
||||
pub enum KvError {
|
||||
#[error("DbDoesNotExist")]
|
||||
NoDb,
|
||||
#[error("KeyNotFound")]
|
||||
#[error("db [{0}, {1}] does not exist")]
|
||||
NoDb(PackageId, String),
|
||||
#[error("key not found")]
|
||||
KeyNotFound,
|
||||
#[error("no Tx found")]
|
||||
NoTx,
|
||||
#[error("No capability: {error}")]
|
||||
NoCap { error: String },
|
||||
#[error("rocksdb internal error: {error}")]
|
||||
RocksDBError { action: String, error: String },
|
||||
#[error("input bytes/json/key error: {error}")]
|
||||
InputError { error: String },
|
||||
#[error("IO error: {error}")]
|
||||
IOError { error: String },
|
||||
#[error("no transaction {0} found")]
|
||||
NoTx(u64),
|
||||
#[error("no write capability for requested DB")]
|
||||
NoWriteCap,
|
||||
#[error("no read capability for requested DB")]
|
||||
NoReadCap,
|
||||
#[error("request to open or remove DB with mismatching package ID")]
|
||||
MismatchingPackageId,
|
||||
#[error("failed to generate capability for new DB")]
|
||||
AddCapFailed,
|
||||
#[error("kv got a malformed request that either failed to deserialize or was missing a required blob")]
|
||||
MalformedRequest,
|
||||
#[error("RocksDB internal error: {0}")]
|
||||
RocksDBError(String),
|
||||
#[error("IO error: {0}")]
|
||||
IOError(String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for KvAction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
}
|
||||
/// The JSON parameters contained in all capabilities issued by `kv:distro:sys`.
|
||||
///
|
||||
/// # Fields
|
||||
/// * `kind` - The kind of capability, either [`KvCapabilityKind::Read`] or [`KvCapabilityKind::Write`]
|
||||
/// * `db_key` - The database key, a tuple of the [`PackageId`] that created the database and the database name
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct KvCapabilityParams {
|
||||
pub kind: KvCapabilityKind,
|
||||
pub db_key: (PackageId, String),
|
||||
}
|
||||
|
||||
impl From<tokio::sync::oneshot::error::RecvError> for KvError {
|
||||
fn from(err: tokio::sync::oneshot::error::RecvError) -> Self {
|
||||
KvError::NoCap {
|
||||
error: err.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tokio::sync::mpsc::error::SendError<CapMessage>> for KvError {
|
||||
fn from(err: tokio::sync::mpsc::error::SendError<CapMessage>) -> Self {
|
||||
KvError::NoCap {
|
||||
error: err.to_string(),
|
||||
}
|
||||
}
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum KvCapabilityKind {
|
||||
Read,
|
||||
Write,
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for KvError {
|
||||
fn from(err: std::io::Error) -> Self {
|
||||
KvError::IOError {
|
||||
error: err.to_string(),
|
||||
}
|
||||
KvError::IOError(err.to_string())
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user