indexedlog: add methods on Log to do prefix lookups

Summary:
This exposes the underlying lookup functions from `Index`.

Alternatively we can allow access to `Index` and provide an `iter_started_from`
method on `Log` which takes a raw offset. I have been trying to avoid exposing
raw offsets in public interfaces, as they would change after `flush()` and cause
problems.

Reviewed By: markbt

Differential Revision: D13498303

fbshipit-source-id: 8b00a2a36a9383e3edb6fd7495a005bc985fd461
This commit is contained in:
Jun Wu 2018-12-20 15:48:20 -08:00 committed by Facebook Github Bot
parent 3237b77e4c
commit b3893b3d3c

View File

@ -33,7 +33,7 @@
use atomicwrites::{AllowOverwrite, AtomicFile};
use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
use index::{self, Index, InsertKey, LeafValueIter};
use index::{self, Index, InsertKey, LeafValueIter, PrefixIter};
use lock::ScopedFileLock;
use memmap::Mmap;
use std::borrow::Cow;
@ -183,6 +183,16 @@ pub struct LogLookupIter<'a> {
log: &'a Log,
}
/// Iterator over keys and [LogLookupIter], filtered by an index prefix.
///
/// It is a wrapper around [index::PrefixIter].
pub struct LogPrefixIter<'a> {
inner_iter: PrefixIter<'a>,
errored: bool,
log: &'a Log,
index: &'a Index,
}
/// Metadata about index names, logical [Log] and [Index] file lengths.
/// Used internally.
#[derive(PartialEq, Eq, Debug)]
@ -409,6 +419,48 @@ impl Log {
}
}
/// Look up keys and entries using the given prefix.
/// The `index_id` is the index of `index_defs` passed to [Log::open].
///
/// Return an iterator that yields `(key, iter)`, where `key` is the full
/// key, `iter` is [LogLookupIter] that allows iteration through matched
/// entries.
pub fn lookup_prefix<K: AsRef<[u8]>>(
&self,
index_id: usize,
prefix: K,
) -> io::Result<LogPrefixIter> {
let index = self.indexes.get(index_id).unwrap();
let inner_iter = index.scan_prefix(prefix)?;
Ok(LogPrefixIter {
inner_iter,
errored: false,
log: self,
index,
})
}
/// Look up keys and entries using the given hex prefix.
/// The length of the hex string can be odd.
///
/// Return an iterator that yields `(key, iter)`, where `key` is the full
/// key, `iter` is [LogLookupIter] that allows iteration through matched
/// entries.
pub fn lookup_prefix_hex<K: AsRef<[u8]>>(
&self,
index_id: usize,
hex_prefix: K,
) -> io::Result<LogPrefixIter> {
let index = self.indexes.get(index_id).unwrap();
let inner_iter = index.scan_prefix_hex(hex_prefix)?;
Ok(LogPrefixIter {
inner_iter,
errored: false,
log: self,
index,
})
}
/// Return an iterator for all entries.
pub fn iter(&self) -> LogIter {
LogIter {
@ -762,6 +814,31 @@ impl<'a> Iterator for LogIter<'a> {
}
}
impl<'a> Iterator for LogPrefixIter<'a> {
type Item = io::Result<(Cow<'a, [u8]>, LogLookupIter<'a>)>;
fn next(&mut self) -> Option<Self::Item> {
if self.errored {
return None;
}
match self.inner_iter.next() {
None => None,
Some(Err(err)) => {
self.errored = true;
Some(Err(err))
}
Some(Ok((key, link_offset))) => {
let iter = LogLookupIter {
inner_iter: link_offset.values(self.index),
errored: false,
log: self.log,
};
Some(Ok((key, iter)))
}
}
}
}
impl LogMetadata {
const HEADER: &'static [u8] = b"meta\0";
@ -1049,6 +1126,47 @@ mod tests {
assert!(log.lookup(1, b"23").is_err());
}
#[test]
fn test_lookup_prefix() {
let dir = TempDir::new("log").unwrap();
let index_func = |data: &[u8]| vec![IndexOutput::Reference(0..(data.len() - 1) as u64)];
let mut log = Log::open(
dir.path(),
vec![
IndexDef {
func: Box::new(index_func),
name: "simple",
lag_threshold: 0,
},
],
).unwrap();
let entries = vec![&b"aaa"[..], b"bb", b"bb"];
for entry in entries.iter() {
log.append(entry).unwrap();
}
// 0x61 == b'a'. 0x6 will match both keys: "aa" and "b".
// "aa" matches the value "aaa", "b" matches the entries ["bb", "bb"]
let mut iter = log.lookup_prefix_hex(0, b"6").unwrap();
assert_eq!(iter.next().unwrap().unwrap().0.as_ref(), b"aa");
assert_eq!(
iter.next()
.unwrap()
.unwrap()
.1
.collect::<Result<Vec<_>, _>>()
.unwrap(),
vec![b"bb", b"bb"]
);
assert!(iter.next().is_none());
let mut iter = log.lookup_prefix(0, b"b").unwrap();
assert_eq!(iter.next().unwrap().unwrap().0.as_ref(), b"b");
assert!(iter.next().is_none());
}
#[test]
fn test_index_func() {
let dir = TempDir::new("log").unwrap();