mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-18 22:21:50 +03:00
dbs: move back into own folders
This commit is contained in:
parent
7a42f3c1d2
commit
1327bbcb56
24
src/kv.rs
24
src/kv.rs
@ -17,9 +17,9 @@ pub async fn kv(
|
||||
send_to_caps_oracle: CapMessageSender,
|
||||
home_directory_path: String,
|
||||
) -> anyhow::Result<()> {
|
||||
let vfs_path = format!("{}/vfs", &home_directory_path);
|
||||
let kv_path = format!("{}/kv", &home_directory_path);
|
||||
|
||||
if let Err(e) = fs::create_dir_all(&vfs_path).await {
|
||||
if let Err(e) = fs::create_dir_all(&kv_path).await {
|
||||
panic!("failed creating kv dir! {:?}", e);
|
||||
}
|
||||
|
||||
@ -59,7 +59,7 @@ pub async fn kv(
|
||||
let send_to_loop = send_to_loop.clone();
|
||||
let open_kvs = open_kvs.clone();
|
||||
let txs = txs.clone();
|
||||
let vfs_path = vfs_path.clone();
|
||||
let kv_path = kv_path.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut queue_lock = queue.lock().await;
|
||||
@ -72,7 +72,7 @@ pub async fn kv(
|
||||
send_to_loop.clone(),
|
||||
send_to_terminal.clone(),
|
||||
send_to_caps_oracle.clone(),
|
||||
vfs_path.clone(),
|
||||
kv_path.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@ -95,7 +95,7 @@ async fn handle_request(
|
||||
send_to_loop: MessageSender,
|
||||
send_to_terminal: PrintSender,
|
||||
send_to_caps_oracle: CapMessageSender,
|
||||
vfs_path: String,
|
||||
kv_path: String,
|
||||
) -> Result<(), KvError> {
|
||||
let KernelMessage {
|
||||
id,
|
||||
@ -132,7 +132,7 @@ async fn handle_request(
|
||||
open_kvs.clone(),
|
||||
send_to_caps_oracle.clone(),
|
||||
&request,
|
||||
vfs_path.clone(),
|
||||
kv_path.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@ -328,7 +328,7 @@ async fn check_caps(
|
||||
open_kvs: Arc<DashMap<(PackageId, String), OptimisticTransactionDB>>,
|
||||
mut send_to_caps_oracle: CapMessageSender,
|
||||
request: &KvRequest,
|
||||
vfs_path: String,
|
||||
kv_path: String,
|
||||
) -> Result<(), KvError> {
|
||||
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
|
||||
let src_package_id = PackageId::new(source.process.package(), source.process.publisher());
|
||||
@ -418,8 +418,8 @@ async fn check_caps(
|
||||
}
|
||||
|
||||
let db_path = format!(
|
||||
"{}/{}/kv/{}",
|
||||
vfs_path,
|
||||
"{}/{}/{}",
|
||||
kv_path,
|
||||
request.package_id.to_string(),
|
||||
request.db.to_string()
|
||||
);
|
||||
@ -431,11 +431,7 @@ async fn check_caps(
|
||||
Ok(())
|
||||
}
|
||||
KvAction::Backup { .. } => {
|
||||
if source.process != *STATE_PROCESS_ID {
|
||||
return Err(KvError::NoCap {
|
||||
error: request.action.to_string(),
|
||||
});
|
||||
}
|
||||
// caps
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -37,9 +37,9 @@ pub async fn sqlite(
|
||||
send_to_caps_oracle: CapMessageSender,
|
||||
home_directory_path: String,
|
||||
) -> anyhow::Result<()> {
|
||||
let vfs_path = format!("{}/vfs", &home_directory_path);
|
||||
let sqlite_path = format!("{}/sqlite", &home_directory_path);
|
||||
|
||||
if let Err(e) = fs::create_dir_all(&vfs_path).await {
|
||||
if let Err(e) = fs::create_dir_all(&sqlite_path).await {
|
||||
panic!("failed creating sqlite dir! {:?}", e);
|
||||
}
|
||||
|
||||
@ -79,7 +79,7 @@ pub async fn sqlite(
|
||||
let open_dbs = open_dbs.clone();
|
||||
|
||||
let txs = txs.clone();
|
||||
let vfs_path = vfs_path.clone();
|
||||
let sqlite_path = sqlite_path.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut queue_lock = queue.lock().await;
|
||||
@ -92,7 +92,7 @@ pub async fn sqlite(
|
||||
send_to_loop.clone(),
|
||||
send_to_terminal.clone(),
|
||||
send_to_caps_oracle.clone(),
|
||||
vfs_path.clone(),
|
||||
sqlite_path.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@ -115,7 +115,7 @@ async fn handle_request(
|
||||
send_to_loop: MessageSender,
|
||||
send_to_terminal: PrintSender,
|
||||
send_to_caps_oracle: CapMessageSender,
|
||||
vfs_path: String,
|
||||
sqlite_path: String,
|
||||
) -> Result<(), SqliteError> {
|
||||
let KernelMessage {
|
||||
id,
|
||||
@ -152,7 +152,7 @@ async fn handle_request(
|
||||
open_dbs.clone(),
|
||||
send_to_caps_oracle.clone(),
|
||||
&request,
|
||||
vfs_path.clone(),
|
||||
sqlite_path.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@ -284,8 +284,13 @@ async fn handle_request(
|
||||
(serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None)
|
||||
}
|
||||
SqliteAction::Backup => {
|
||||
// execute WAL flush.
|
||||
//
|
||||
for db_ref in open_dbs.iter() {
|
||||
let db = db_ref.value().lock().await;
|
||||
let result: rusqlite::Result<()> = db.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |_| Ok(())).map(|_| ());
|
||||
if let Err(e) = result {
|
||||
return Err(SqliteError::RusqliteError { error: e.to_string() });
|
||||
}
|
||||
}
|
||||
(serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None)
|
||||
}
|
||||
};
|
||||
@ -342,7 +347,7 @@ async fn check_caps(
|
||||
open_dbs: Arc<DashMap<(PackageId, String), Mutex<Connection>>>,
|
||||
mut send_to_caps_oracle: CapMessageSender,
|
||||
request: &SqliteRequest,
|
||||
vfs_path: String,
|
||||
sqlite_path: String,
|
||||
) -> Result<(), SqliteError> {
|
||||
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
|
||||
let src_package_id = PackageId::new(source.process.package(), source.process.publisher());
|
||||
@ -429,8 +434,8 @@ async fn check_caps(
|
||||
}
|
||||
|
||||
let db_path = format!(
|
||||
"{}/{}/sqlite/{}",
|
||||
vfs_path,
|
||||
"{}/{}/{}",
|
||||
sqlite_path,
|
||||
request.package_id.to_string(),
|
||||
request.db.to_string()
|
||||
);
|
||||
@ -449,11 +454,6 @@ async fn check_caps(
|
||||
}
|
||||
SqliteAction::Backup => {
|
||||
// flushing WALs for backup
|
||||
// check caps.
|
||||
for db_ref in open_dbs.iter() {
|
||||
let db = db_ref.value().lock().await;
|
||||
db.execute("pragma wal_checkpoint", [])?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -532,7 +532,7 @@ fn make_error_message(our_name: String, km: &KernelMessage, error: SqliteError)
|
||||
id: km.id,
|
||||
source: Address {
|
||||
node: our_name.clone(),
|
||||
process: KV_PROCESS_ID.clone(),
|
||||
process: SQLITE_PROCESS_ID.clone(),
|
||||
},
|
||||
target: match &km.rsvp {
|
||||
None => km.source.clone(),
|
||||
|
@ -218,7 +218,7 @@ async fn handle_request(
|
||||
}
|
||||
}
|
||||
StateAction::Backup => {
|
||||
let checkpoint_dir = format!("{}/vfs/kernel_backup", &home_directory_path);
|
||||
let checkpoint_dir = format!("{}/kernel/backup", &home_directory_path);
|
||||
|
||||
if Path::new(&checkpoint_dir).exists() {
|
||||
fs::remove_dir_all(&checkpoint_dir).await?;
|
||||
|
Loading…
Reference in New Issue
Block a user