From 45fc56b3793347b09c5c6e8824762640ccf06cca Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Fri, 20 Dec 2024 02:39:07 +0200 Subject: [PATCH] kv: iterator tweaks.. --- kinode/src/kv.rs | 123 ++++++++++++++++++++--------------------------- 1 file changed, 52 insertions(+), 71 deletions(-) diff --git a/kinode/src/kv.rs b/kinode/src/kv.rs index faed2b93..25a83de5 100644 --- a/kinode/src/kv.rs +++ b/kinode/src/kv.rs @@ -25,13 +25,8 @@ struct KvState { /// access order of dbs, used to cull if we hit the fds limit access_order: Arc>>, txs: Arc>)>>>, - /// track active iterators: (package_id, db_name) -> (iterator_id -> current position) - iterators: Arc< - DashMap< - (PackageId, String), - DashMap>, // Store last seen key instead of iterator - >, - >, + /// track active iterators: (package_id, db_name, iterator_id) -> (prefix, current_key) + iterators: Arc, Vec)>>, fds_limit: u64, } @@ -114,22 +109,26 @@ impl KvState { db: String, prefix: Option>, ) -> Result { + // Ensure DB exists let db_key = (package_id.clone(), db.clone()); - let _db = self.open_kvs.get(&db_key).ok_or(KvError::NoDb)?; - - // Generate a random iterator ID and ensure it's unique - let iterators = self - .iterators - .entry(db_key.clone()) - .or_insert_with(|| DashMap::new()); + if !self.open_kvs.contains_key(&db_key) { + return Err(KvError::NoDb); + } + // Generate unique iterator ID let mut iterator_id = random::(); - while iterators.contains_key(&iterator_id) { + while self + .iterators + .contains_key(&(package_id.clone(), db.clone(), iterator_id)) + { iterator_id = random::(); } - // Store the starting position (prefix or empty vec for start) - iterators.insert(iterator_id, prefix.unwrap_or_default()); + // Store initial state: (prefix, current_key) + self.iterators.insert( + (package_id, db, iterator_id), + (prefix.unwrap_or_default(), Vec::new()), + ); Ok(iterator_id) } @@ -141,67 +140,54 @@ impl KvState { iterator_id: u64, count: u64, ) -> Result<(Vec<(Vec, Vec)>, bool), KvError> { - let db_key = (package_id.clone(), db.clone()); - let db = self.open_kvs.get(&db_key).ok_or(KvError::NoDb)?; + let iter_key = (package_id.clone(), db.clone(), iterator_id); + let db_key = (package_id, db); - let db_iters = self.iterators.get(&db_key).ok_or(KvError::NoDb)?; - let last_key = db_iters - .get(&iterator_id) + // Get DB and iterator state + let db = self.open_kvs.get(&db_key).ok_or(KvError::NoDb)?; + let (prefix, current_key) = self + .iterators + .get(&iter_key) .ok_or(KvError::NoIterator)? .clone(); let mut entries = Vec::new(); - let mut done = true; - // Create a fresh iterator starting from our last position - let mode = if last_key.is_empty() { - rocksdb::IteratorMode::Start + // Create the appropriate iterator + let mut iter = if !prefix.is_empty() { + if !current_key.is_empty() { + db.iterator(rocksdb::IteratorMode::From( + ¤t_key, + rocksdb::Direction::Forward, + )) + } else { + db.prefix_iterator(&prefix) + } } else { - rocksdb::IteratorMode::From(&last_key, rocksdb::Direction::Forward) + db.iterator(rocksdb::IteratorMode::Start) }; - let mut iter = db.iterator(mode); - let mut count_remaining = count; + let mut items_collected = 0; + let mut last_key = None; - while let Some(item) = iter.next() { - if count_remaining == 0 { - done = false; - break; - } + while let Some(Ok((key, value))) = iter.next() { + let key_vec = key.to_vec(); + entries.push((key_vec.clone(), value.to_vec())); + last_key = Some(key_vec); + items_collected += 1; - match item { - Ok((key, value)) => { - let key_vec = key.to_vec(); - if !key_vec.starts_with(&last_key) && !last_key.is_empty() { - // We've moved past our prefix - break; - } - entries.push((key_vec.clone(), value.to_vec())); - if let Some(mut last_key_entry) = db_iters.get_mut(&iterator_id) { - *last_key_entry = key_vec; - } - count_remaining -= 1; - } - Err(e) => { - return Err(KvError::RocksDBError { - action: "iter_next".to_string(), - error: e.to_string(), - }); + if items_collected >= count { + // Not done, save the last key + if let Some(last_key) = last_key { + self.iterators.insert(iter_key, (prefix, last_key)); } + return Ok((entries, false)); } } - // if we're done, automatically close the iterator - if done { - if let Some(db_iters) = self.iterators.get_mut(&db_key) { - db_iters.remove(&iterator_id); - if db_iters.is_empty() { - self.iterators.remove(&db_key); - } - } - } - - Ok((entries, done)) + // We've exhausted the iterator + self.iterators.remove(&iter_key); + Ok((entries, true)) } async fn handle_iter_close( @@ -210,13 +196,8 @@ impl KvState { db: String, iterator_id: u64, ) -> Result<(), KvError> { - let db_key = (package_id, db); - if let Some(db_iters) = self.iterators.get_mut(&db_key) { - db_iters.remove(&iterator_id); - if db_iters.is_empty() { - self.iterators.remove(&db_key); - } - } + let iter_key = (package_id, db, iterator_id); + self.iterators.remove(&iter_key); Ok(()) } } @@ -344,7 +325,7 @@ async fn handle_request( let request: KvRequest = match serde_json::from_slice(&body) { Ok(r) => r, Err(e) => { - println!("kv: got invalid Request: {}", e); + // println!("kv: got invalid Request: {}", e); return Err(KvError::InputError { error: "didn't serialize to KvAction.".into(), });