kv & sqlite type semantics

This commit is contained in:
bitful-pannul 2023-12-19 13:31:18 -03:00
parent 1a3fba7a70
commit 85abd4afb0
5 changed files with 24 additions and 34 deletions

View File

@ -423,7 +423,7 @@ checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "uqbar_process_lib"
version = "0.4.0"
source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=7e96a12#7e96a120145f3813bde39e3bad47e73407ccbd9a"
source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=8342b1a#8342b1a131401fb5d141dab8c90e79aa6d2bc909"
dependencies = [
"anyhow",
"bincode",

View File

@ -17,7 +17,7 @@ rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10.8"
uqbar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "7e96a12" }
uqbar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "8342b1a" }
wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "efcc759" }
[lib]

View File

@ -23,7 +23,7 @@ pub async fn kv(
panic!("failed creating kv dir! {:?}", e);
}
let open_kvs: Arc<DashMap<DBKey, OptimisticTransactionDB>> = Arc::new(DashMap::new());
let open_kvs: Arc<DashMap<(PackageId, String), OptimisticTransactionDB>> = Arc::new(DashMap::new());
let txs: Arc<DashMap<u64, Vec<(KvAction, Option<Vec<u8>>)>>> = Arc::new(DashMap::new());
let mut process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> =
@ -89,7 +89,7 @@ pub async fn kv(
async fn handle_request(
our_node: String,
km: KernelMessage,
open_kvs: Arc<DashMap<DBKey, OptimisticTransactionDB>>,
open_kvs: Arc<DashMap<(PackageId, String), OptimisticTransactionDB>>,
txs: Arc<DashMap<u64, Vec<(KvAction, Option<Vec<u8>>)>>>,
send_to_loop: MessageSender,
send_to_terminal: PrintSender,
@ -141,7 +141,7 @@ async fn handle_request(
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
}
KvAction::Get { key } => {
let db = match open_kvs.get(&request.db) {
let db = match open_kvs.get(&(request.package_id, request.db)) {
None => {
return Err(KvError::NoDb);
}
@ -173,7 +173,7 @@ async fn handle_request(
)
}
KvAction::Set { key, tx_id } => {
let db = match open_kvs.get(&request.db) {
let db = match open_kvs.get(&(request.package_id, request.db)) {
None => {
return Err(KvError::NoDb);
}
@ -203,7 +203,7 @@ async fn handle_request(
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
}
KvAction::Delete { key, tx_id } => {
let db = match open_kvs.get(&request.db) {
let db = match open_kvs.get(&(request.package_id, request.db)) {
None => {
return Err(KvError::NoDb);
}
@ -226,7 +226,7 @@ async fn handle_request(
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
}
KvAction::Commit { tx_id } => {
let db = match open_kvs.get(&request.db) {
let db = match open_kvs.get(&(request.package_id, request.db)) {
None => {
return Err(KvError::NoDb);
}
@ -321,7 +321,7 @@ async fn handle_request(
async fn check_caps(
our_node: String,
source: Address,
open_kvs: Arc<DashMap<DBKey, OptimisticTransactionDB>>,
open_kvs: Arc<DashMap<(PackageId, String), OptimisticTransactionDB>>,
mut send_to_caps_oracle: CapMessageSender,
request: &KvRequest,
kv_path: String,
@ -386,7 +386,7 @@ async fn check_caps(
Ok(())
}
KvAction::New { .. } => {
if src_package_id != request.db.package_id {
if src_package_id != request.package_id {
return Err(KvError::NoCap {
error: request.action.to_string(),
});
@ -415,7 +415,7 @@ async fn check_caps(
let db = OptimisticTransactionDB::open_default(&db_path)?;
open_kvs.insert(request.db.clone(), db);
open_kvs.insert((request.package_id.clone(), request.db.clone()), db);
Ok(())
}
KvAction::Backup { .. } => {
@ -486,12 +486,6 @@ impl std::fmt::Display for KvAction {
}
}
impl std::fmt::Display for DBKey {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "/{}/{}", self.package_id, self.db)
}
}
impl From<tokio::sync::oneshot::error::RecvError> for KvError {
fn from(err: tokio::sync::oneshot::error::RecvError) -> Self {
KvError::NoCap {

View File

@ -23,7 +23,7 @@ pub async fn sqlite(
panic!("failed creating sqlite dir! {:?}", e);
}
let open_dbs: Arc<DashMap<DBKey, Mutex<Connection>>> = Arc::new(DashMap::new());
let open_dbs: Arc<DashMap<(PackageId, String), Mutex<Connection>>> = Arc::new(DashMap::new());
let txs: Arc<DashMap<u64, Vec<(String, Vec<SqlValue>)>>> = Arc::new(DashMap::new());
let mut process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> =
@ -89,7 +89,7 @@ pub async fn sqlite(
async fn handle_request(
our_node: String,
km: KernelMessage,
open_dbs: Arc<DashMap<DBKey, Mutex<Connection>>>,
open_dbs: Arc<DashMap<(PackageId, String), Mutex<Connection>>>,
txs: Arc<DashMap<u64, Vec<(String, Vec<SqlValue>)>>>,
send_to_loop: MessageSender,
send_to_terminal: PrintSender,
@ -142,7 +142,7 @@ async fn handle_request(
(serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None)
}
SqliteAction::Read { query } => {
let db = match open_dbs.get(&request.db) {
let db = match open_dbs.get(&(request.package_id, request.db)) {
Some(db) => db,
None => {
return Err(SqliteError::NoDb);
@ -188,7 +188,7 @@ async fn handle_request(
)
}
SqliteAction::Write { statement, tx_id } => {
let db = match open_dbs.get(&request.db) {
let db = match open_dbs.get(&(request.package_id, request.db)) {
Some(db) => db,
None => {
return Err(SqliteError::NoDb);
@ -221,7 +221,7 @@ async fn handle_request(
)
}
SqliteAction::Commit { tx_id } => {
let db = match open_dbs.get(&request.db) {
let db = match open_dbs.get(&(request.package_id, request.db)) {
Some(db) => db,
None => {
return Err(SqliteError::NoDb);
@ -300,7 +300,7 @@ async fn handle_request(
async fn check_caps(
our_node: String,
source: Address,
open_dbs: Arc<DashMap<DBKey, Mutex<Connection>>>,
open_dbs: Arc<DashMap<(PackageId, String), Mutex<Connection>>>,
mut send_to_caps_oracle: CapMessageSender,
request: &SqliteRequest,
sqlite_path: String,
@ -362,7 +362,7 @@ async fn check_caps(
Ok(())
}
SqliteAction::New => {
if src_package_id != request.db.package_id {
if src_package_id != request.package_id {
return Err(SqliteError::NoCap {
error: request.action.to_string(),
});
@ -392,7 +392,7 @@ async fn check_caps(
let db = Connection::open(&db_path)?;
db.execute("PRAGMA journal_mode=WAL;", [])?;
open_dbs.insert(request.db.clone(), Mutex::new(db));
open_dbs.insert((request.package_id.clone(), request.db.clone()), Mutex::new(db));
Ok(())
}
SqliteAction::Backup => {

View File

@ -929,7 +929,7 @@ pub enum StateError {
RocksDBError { action: String, error: String },
#[error("kernel_state: startup error")]
StartupError { action: String },
#[error("vfs: Bytes payload required for {action}")]
#[error("kernel_state: bytes payload required for {action}")]
BadBytes { action: String },
#[error("kernel_state: bad request error: {error}")]
BadRequest { error: String },
@ -1049,15 +1049,10 @@ impl VfsError {
}
}
#[derive(Debug, Serialize, Clone, Deserialize, PartialEq, Eq, Hash)]
pub struct DBKey {
pub package_id: PackageId,
pub db: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct KvRequest {
pub db: DBKey,
pub package_id: PackageId,
pub db: String,
pub action: KvAction,
}
@ -1102,7 +1097,8 @@ pub enum KvError {
#[derive(Debug, Serialize, Deserialize)]
pub struct SqliteRequest {
pub db: DBKey,
pub package_id: PackageId,
pub db: String,
pub action: SqliteAction,
}