kv: iterator tweaks..

This commit is contained in:
bitful-pannul 2024-12-20 02:39:07 +02:00
parent 4e170fb0bf
commit 45fc56b379

View File

@ -25,13 +25,8 @@ struct KvState {
/// access order of dbs, used to cull if we hit the fds limit
access_order: Arc<Mutex<UniqueQueue<(PackageId, String)>>>,
txs: Arc<DashMap<u64, Vec<(KvAction, Option<Vec<u8>>)>>>,
/// track active iterators: (package_id, db_name) -> (iterator_id -> current position)
iterators: Arc<
DashMap<
(PackageId, String),
DashMap<u64, Vec<u8>>, // Store last seen key instead of iterator
>,
>,
/// track active iterators: (package_id, db_name, iterator_id) -> (prefix, current_key)
iterators: Arc<DashMap<(PackageId, String, u64), (Vec<u8>, Vec<u8>)>>,
fds_limit: u64,
}
@ -114,22 +109,26 @@ impl KvState {
db: String,
prefix: Option<Vec<u8>>,
) -> Result<u64, KvError> {
// Ensure DB exists
let db_key = (package_id.clone(), db.clone());
let _db = self.open_kvs.get(&db_key).ok_or(KvError::NoDb)?;
// Generate a random iterator ID and ensure it's unique
let iterators = self
.iterators
.entry(db_key.clone())
.or_insert_with(|| DashMap::new());
if !self.open_kvs.contains_key(&db_key) {
return Err(KvError::NoDb);
}
// Generate unique iterator ID
let mut iterator_id = random::<u64>();
while iterators.contains_key(&iterator_id) {
while self
.iterators
.contains_key(&(package_id.clone(), db.clone(), iterator_id))
{
iterator_id = random::<u64>();
}
// Store the starting position (prefix or empty vec for start)
iterators.insert(iterator_id, prefix.unwrap_or_default());
// Store initial state: (prefix, current_key)
self.iterators.insert(
(package_id, db, iterator_id),
(prefix.unwrap_or_default(), Vec::new()),
);
Ok(iterator_id)
}
@ -141,67 +140,54 @@ impl KvState {
iterator_id: u64,
count: u64,
) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, bool), KvError> {
let db_key = (package_id.clone(), db.clone());
let db = self.open_kvs.get(&db_key).ok_or(KvError::NoDb)?;
let iter_key = (package_id.clone(), db.clone(), iterator_id);
let db_key = (package_id, db);
let db_iters = self.iterators.get(&db_key).ok_or(KvError::NoDb)?;
let last_key = db_iters
.get(&iterator_id)
// Get DB and iterator state
let db = self.open_kvs.get(&db_key).ok_or(KvError::NoDb)?;
let (prefix, current_key) = self
.iterators
.get(&iter_key)
.ok_or(KvError::NoIterator)?
.clone();
let mut entries = Vec::new();
let mut done = true;
// Create a fresh iterator starting from our last position
let mode = if last_key.is_empty() {
rocksdb::IteratorMode::Start
// Create the appropriate iterator
let mut iter = if !prefix.is_empty() {
if !current_key.is_empty() {
db.iterator(rocksdb::IteratorMode::From(
&current_key,
rocksdb::Direction::Forward,
))
} else {
db.prefix_iterator(&prefix)
}
} else {
rocksdb::IteratorMode::From(&last_key, rocksdb::Direction::Forward)
db.iterator(rocksdb::IteratorMode::Start)
};
let mut iter = db.iterator(mode);
let mut count_remaining = count;
let mut items_collected = 0;
let mut last_key = None;
while let Some(item) = iter.next() {
if count_remaining == 0 {
done = false;
break;
}
while let Some(Ok((key, value))) = iter.next() {
let key_vec = key.to_vec();
entries.push((key_vec.clone(), value.to_vec()));
last_key = Some(key_vec);
items_collected += 1;
match item {
Ok((key, value)) => {
let key_vec = key.to_vec();
if !key_vec.starts_with(&last_key) && !last_key.is_empty() {
// We've moved past our prefix
break;
}
entries.push((key_vec.clone(), value.to_vec()));
if let Some(mut last_key_entry) = db_iters.get_mut(&iterator_id) {
*last_key_entry = key_vec;
}
count_remaining -= 1;
}
Err(e) => {
return Err(KvError::RocksDBError {
action: "iter_next".to_string(),
error: e.to_string(),
});
if items_collected >= count {
// Not done, save the last key
if let Some(last_key) = last_key {
self.iterators.insert(iter_key, (prefix, last_key));
}
return Ok((entries, false));
}
}
// if we're done, automatically close the iterator
if done {
if let Some(db_iters) = self.iterators.get_mut(&db_key) {
db_iters.remove(&iterator_id);
if db_iters.is_empty() {
self.iterators.remove(&db_key);
}
}
}
Ok((entries, done))
// We've exhausted the iterator
self.iterators.remove(&iter_key);
Ok((entries, true))
}
async fn handle_iter_close(
@ -210,13 +196,8 @@ impl KvState {
db: String,
iterator_id: u64,
) -> Result<(), KvError> {
let db_key = (package_id, db);
if let Some(db_iters) = self.iterators.get_mut(&db_key) {
db_iters.remove(&iterator_id);
if db_iters.is_empty() {
self.iterators.remove(&db_key);
}
}
let iter_key = (package_id, db, iterator_id);
self.iterators.remove(&iter_key);
Ok(())
}
}
@ -344,7 +325,7 @@ async fn handle_request(
let request: KvRequest = match serde_json::from_slice(&body) {
Ok(r) => r,
Err(e) => {
println!("kv: got invalid Request: {}", e);
// println!("kv: got invalid Request: {}", e);
return Err(KvError::InputError {
error: "didn't serialize to KvAction.".into(),
});