Mercurial > lbo > hg > leveldb-rs
changeset 118:c2539cd2d021
Write proper comparators for key formats and use them in filters/table
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Mon, 26 Dec 2016 09:23:16 +0000 |
parents | 6147b3a3eeea |
children | b5cdae35b1a4 |
files | src/filter.rs src/filter_block.rs src/key_types.rs src/memtable.rs src/options.rs src/skipmap.rs src/table_reader.rs src/types.rs |
diffstat | 8 files changed, 243 insertions(+), 94 deletions(-) [+] |
line wrap: on
line diff
--- a/src/filter.rs Sun Dec 25 10:47:08 2016 +0000 +++ b/src/filter.rs Mon Dec 26 09:23:16 2016 +0000 @@ -162,6 +162,12 @@ internal: FP, } +impl<FP: FilterPolicy> InternalFilterPolicy<FP> { + pub fn new(inner: FP) -> InternalFilterPolicy<FP> { + InternalFilterPolicy { internal: inner } + } +} + impl<FP: FilterPolicy> FilterPolicy for InternalFilterPolicy<FP> { fn name(&self) -> &'static str { self.internal.name() @@ -187,6 +193,7 @@ #[cfg(test)] mod tests { use super::*; + use key_types::LookupKey; const _BITS_PER_KEY: u32 = 12; @@ -197,6 +204,7 @@ "908070605040302010".as_bytes()]; data } + fn create_filter() -> Vec<u8> { let fpol = BloomPolicy::new(_BITS_PER_KEY); let filter = fpol.create_filter(&input_data()); @@ -205,6 +213,20 @@ filter } + + fn create_internalkey_filter() -> Vec<u8> { + let fpol = InternalFilterPolicy::new(BloomPolicy::new(_BITS_PER_KEY)); + let input: Vec<Vec<u8>> = input_data() + .into_iter() + .map(|k| LookupKey::new(k, 123).internal_key().to_vec()) + .collect(); + let input_ = input.iter().map(|k| k.as_slice()).collect(); + + let filter = fpol.create_filter(&input_); + + filter + } + #[test] fn test_filter() { let f = create_filter(); @@ -215,6 +237,12 @@ } } + // This test verifies that InternalFilterPolicy works correctly. + #[test] + fn test_filter_internal_keys_identical() { + assert_eq!(create_filter(), create_internalkey_filter()); + } + #[test] fn hash_test() { let d1 = vec![0x62];
--- a/src/filter_block.rs Sun Dec 25 10:47:08 2016 +0000 +++ b/src/filter_block.rs Mon Dec 26 09:23:16 2016 +0000 @@ -144,6 +144,8 @@ return true; } + println!("{:?}", key); + let filter_begin = self.offset_of(get_filter_index(blk_offset, self.filter_base_lg2)); let filter_end = self.offset_of(get_filter_index(blk_offset, self.filter_base_lg2) + 1);
--- a/src/key_types.rs Sun Dec 25 10:47:08 2016 +0000 +++ b/src/key_types.rs Mon Dec 26 09:23:16 2016 +0000 @@ -1,11 +1,16 @@ +use options::{CompressionType, int_to_compressiontype}; +use types::{ValueType, SequenceNumber, cmp}; -use types::{ValueType, SequenceNumber}; +use std::cmp::Ordering; use integer_encoding::{FixedInt, VarInt}; // The following typedefs are used to distinguish between the different key formats used internally // by different modules. +// TODO: At some point, convert those into actual types with conversions between them. That's a lot +// of boilerplate, but increases type safety. + /// A MemtableKey consists of the following elements: [keylen, key, tag, (vallen, value)] where /// keylen is a varint32 encoding the length of key+tag. tag is a fixed 8 bytes segment encoding /// the entry type and the sequence number. vallen and value are optional components at the end. @@ -141,6 +146,52 @@ } } +pub fn parse_internal_key<'a>(ikey: InternalKey<'a>) -> (CompressionType, u64, UserKey<'a>) { + assert!(ikey.len() >= 8); + + let (ctype, seq) = parse_tag(FixedInt::decode_fixed(&ikey[ikey.len() - 8..])); + let ctype = int_to_compressiontype(ctype as u32).unwrap_or(CompressionType::CompressionNone); + + return (ctype, seq, &ikey[0..ikey.len() - 8]); +} + +/// An internal comparator wrapping a user-supplied comparator. This comparator is used to compare +/// memtable keys, which contain length prefixes and a sequence number. +/// The ordering is determined by asking the wrapped comparator; ties are broken by *reverse* +/// ordering the sequence numbers. (This means that when having an entry abx/4 and searching for +/// abx/5, then abx/4 is counted as "greater-or-equal", making snapshot functionality work at all) +pub fn memtable_key_cmp(a: &[u8], b: &[u8]) -> Ordering { + let (akeylen, akeyoff, atag, _, _) = parse_memtable_key(a); + let (bkeylen, bkeyoff, btag, _, _) = parse_memtable_key(b); + + let userkey_a = &a[akeyoff..akeyoff + akeylen]; + let userkey_b = &b[bkeyoff..bkeyoff + bkeylen]; + + match cmp(userkey_a, userkey_b) { + Ordering::Less => Ordering::Less, + Ordering::Greater => Ordering::Greater, + Ordering::Equal => { + let (_, aseq) = parse_tag(atag); + let (_, bseq) = parse_tag(btag); + + // reverse! + bseq.cmp(&aseq) + } + } +} + +/// Same as memtable_key_cmp, but for InternalKeys. +pub fn internal_key_cmp(a: &[u8], b: &[u8]) -> Ordering { + let (_, seqa, keya) = parse_internal_key(a); + let (_, seqb, keyb) = parse_internal_key(b); + + match cmp(keya, keyb) { + Ordering::Less => Ordering::Less, + Ordering::Greater => Ordering::Greater, + Ordering::Equal => seqb.cmp(&seqa), + } +} + #[cfg(test)] mod tests { use super::*;
--- a/src/memtable.rs Sun Dec 25 10:47:08 2016 +0000 +++ b/src/memtable.rs Mon Dec 26 09:23:16 2016 +0000 @@ -11,7 +11,7 @@ impl MemTable { pub fn new() -> MemTable { - MemTable { map: SkipMap::new() } + MemTable { map: SkipMap::new_memtable_map() } } pub fn approx_mem_usage(&self) -> usize { self.map.approx_memory()
--- a/src/options.rs Sun Dec 25 10:47:08 2016 +0000 +++ b/src/options.rs Mon Dec 26 09:23:16 2016 +0000 @@ -26,6 +26,7 @@ // pub logger: Logger, pub write_buffer_size: usize, pub max_open_files: usize, + // pub block_cache: Cache, pub block_size: usize, pub block_restart_interval: usize, pub compression_type: CompressionType,
--- a/src/skipmap.rs Sun Dec 25 10:47:08 2016 +0000 +++ b/src/skipmap.rs Mon Dec 26 09:23:16 2016 +0000 @@ -1,6 +1,6 @@ -use types::{cmp, LdbIterator}; +use types::{cmp, CmpFn, LdbIterator}; use rand::{Rng, SeedableRng, StdRng}; -use key_types::{parse_tag, parse_memtable_key}; +use key_types::memtable_key_cmp; use std::cmp::Ordering; use std::mem::{replace, size_of}; @@ -8,33 +8,6 @@ const MAX_HEIGHT: usize = 12; const BRANCHING_FACTOR: u32 = 4; -/// An internal comparator wrapping a user-supplied comparator. This comparator is used to compare -/// memtable keys, which contain length prefixes and a sequence number. -/// The ordering is determined by asking the wrapped comparator; ties are broken by *reverse* -/// ordering the sequence numbers. (This means that when having an entry abx/4 and searching for -/// abx/5, then abx/4 is counted as "greater-or-equal", making snapshot functionality work at all) -fn memtable_key_cmp(a: &[u8], b: &[u8]) -> Ordering { - let (akeylen, akeyoff, atag, _, _) = parse_memtable_key(a); - let (bkeylen, bkeyoff, btag, _, _) = parse_memtable_key(b); - - let userkey_a = &a[akeyoff..akeyoff + akeylen]; - let userkey_b = &b[bkeyoff..bkeyoff + bkeylen]; - - let userkey_order = cmp(userkey_a, userkey_b); - println!("{:?}", userkey_order); - - if userkey_order != Ordering::Equal { - userkey_order - } else { - // look at sequence number, in reverse order - let (_, aseq) = parse_tag(atag); - let (_, bseq) = parse_tag(btag); - - // reverse! - bseq.cmp(&aseq) - } -} - /// A node in a skipmap contains links to the next node and others that are further away (skips); /// `skips[0]` is the immediate element after, that is, the element contained in `next`. struct Node { @@ -54,17 +27,19 @@ len: usize, // approximation of memory used. approx_mem: usize, - cmp: Box<Fn(&[u8], &[u8]) -> Ordering>, + cmp: Box<CmpFn>, } impl SkipMap { - fn new_standard() -> SkipMap { + /// Used for testing: Uses the standard comparator. + pub fn new_memtable_map() -> SkipMap { let mut skm = SkipMap::new(); - skm.cmp = Box::new(cmp); + skm.cmp = Box::new(memtable_key_cmp); skm } - pub fn new() -> SkipMap { + /// Returns a SkipMap that uses the memtable comparator (see above). + fn new() -> SkipMap { let mut s = Vec::new(); s.resize(MAX_HEIGHT, None); @@ -78,7 +53,7 @@ rand: StdRng::from_seed(&[0xde, 0xad, 0xbe, 0xef]), len: 0, approx_mem: size_of::<Self>() + MAX_HEIGHT * size_of::<Option<*mut Node>>(), - cmp: Box::new(memtable_key_cmp), + cmp: Box::new(cmp), } } @@ -351,7 +326,7 @@ use types::*; pub fn make_skipmap() -> SkipMap { - let mut skm = SkipMap::new_standard(); + let mut skm = SkipMap::new(); let keys = vec!["aba", "abb", "abc", "abd", "abe", "abf", "abg", "abh", "abi", "abj", "abk", "abl", "abm", "abn", "abo", "abp", "abq", "abr", "abs", "abt", "abu", "abv", "abw", "abx", "aby", "abz"]; @@ -411,7 +386,7 @@ #[test] fn test_iterator_0() { - let skm = SkipMap::new_standard(); + let skm = SkipMap::new(); let mut i = 0; for (_, _) in skm.iter() {
--- a/src/table_reader.rs Sun Dec 25 10:47:08 2016 +0000 +++ b/src/table_reader.rs Mon Dec 26 09:23:16 2016 +0000 @@ -1,11 +1,11 @@ use block::{Block, BlockIter}; use blockhandle::BlockHandle; -use filter::FilterPolicy; +use filter::{InternalFilterPolicy, FilterPolicy}; use filter_block::FilterBlockReader; use options::{self, CompressionType, Options}; use table_builder::{self, Footer}; -use types::{cmp, LdbIterator}; -use key_types::InternalKey; +use types::{cmp, CmpFn, LdbIterator}; +use key_types::{internal_key_cmp, InternalKey}; use std::io::{self, Read, Seek, SeekFrom, Result}; use std::cmp::Ordering; @@ -32,26 +32,6 @@ Ok(buf) } -/// Reads a block at location. -fn read_block<R: Read + Seek>(f: &mut R, location: &BlockHandle) -> Result<TableBlock> { - // The block is denoted by offset and length in BlockHandle. A block in an encoded - // table is followed by 1B compression type and 4B checksum. - let buf = try!(read_bytes(f, location)); - let compress = try!(read_bytes(f, - &BlockHandle::new(location.offset() + location.size(), - table_builder::TABLE_BLOCK_COMPRESS_LEN))); - let cksum = try!(read_bytes(f, - &BlockHandle::new(location.offset() + location.size() + - table_builder::TABLE_BLOCK_COMPRESS_LEN, - table_builder::TABLE_BLOCK_CKSUM_LEN))); - Ok(TableBlock { - block: Block::new(buf), - checksum: u32::decode_fixed(&cksum), - compression: options::int_to_compressiontype(compress[0] as u32) - .unwrap_or(CompressionType::CompressionNone), - }) -} - struct TableBlock { block: Block, checksum: u32, @@ -59,6 +39,26 @@ } impl TableBlock { + /// Reads a block at location. + fn read_block<R: Read + Seek>(f: &mut R, location: &BlockHandle) -> Result<TableBlock> { + // The block is denoted by offset and length in BlockHandle. A block in an encoded + // table is followed by 1B compression type and 4B checksum. + let buf = try!(read_bytes(f, location)); + let compress = try!(read_bytes(f, + &BlockHandle::new(location.offset() + location.size(), + table_builder::TABLE_BLOCK_COMPRESS_LEN))); + let cksum = try!(read_bytes(f, + &BlockHandle::new(location.offset() + location.size() + + table_builder::TABLE_BLOCK_COMPRESS_LEN, + table_builder::TABLE_BLOCK_CKSUM_LEN))); + Ok(TableBlock { + block: Block::new(buf), + checksum: u32::decode_fixed(&cksum), + compression: options::int_to_compressiontype(compress[0] as u32) + .unwrap_or(CompressionType::CompressionNone), + }) + } + /// Verify checksum of block fn verify(&self) -> bool { let mut digest = crc32::Digest::new(crc32::CASTAGNOLI); @@ -74,6 +74,7 @@ file_size: usize, opt: Options, + cmp: Box<CmpFn>, footer: Footer, indexblock: Block, @@ -81,11 +82,12 @@ } impl<R: Read + Seek, FP: FilterPolicy> Table<R, FP> { - pub fn new(mut file: R, size: usize, fp: FP, opt: Options) -> Result<Table<R, FP>> { + /// Creates a new table reader operating on unformatted keys (i.e., UserKey). + fn new_raw(mut file: R, size: usize, fp: FP, opt: Options) -> Result<Table<R, FP>> { let footer = try!(read_footer(&mut file, size)); - let indexblock = try!(read_block(&mut file, &footer.index)); - let metaindexblock = try!(read_block(&mut file, &footer.meta_index)); + let indexblock = try!(TableBlock::read_block(&mut file, &footer.index)); + let metaindexblock = try!(TableBlock::read_block(&mut file, &footer.meta_index)); if !indexblock.verify() || !metaindexblock.verify() { return Err(io::Error::new(io::ErrorKind::InvalidData, @@ -115,14 +117,28 @@ file: file, file_size: size, opt: opt, + cmp: Box::new(cmp), footer: footer, filters: filter_block_reader, indexblock: indexblock.block, }) } + /// Creates a new table reader operating on internal keys (i.e., InternalKey). This means that + /// a different comparator (internal_key_cmp) and a different filter policy + /// (InternalFilterPolicy) are used. + pub fn new(file: R, + size: usize, + fp: FP, + opt: Options) + -> Result<Table<R, InternalFilterPolicy<FP>>> { + let mut t = try!(Table::new_raw(file, size, InternalFilterPolicy::new(fp), opt)); + t.cmp = Box::new(internal_key_cmp); + Ok(t) + } + fn read_block(&mut self, location: &BlockHandle) -> Result<TableBlock> { - let b = try!(read_block(&mut self.file, location)); + let b = try!(TableBlock::read_block(&mut self.file, location)); if !b.verify() { Err(io::Error::new(io::ErrorKind::InvalidData, "Data block failed verification")) @@ -263,7 +279,7 @@ self.index_block.seek(to); if let Some((past_block, handle)) = self.index_block.current() { - if cmp(to, &past_block) == Ordering::Less { + if (self.table.cmp)(to, &past_block) == Ordering::Less { // ok, found right block: continue if let Ok(()) = self.load_block(&handle) { self.current_block.seek(to); @@ -324,9 +340,11 @@ #[cfg(test)] mod tests { use filter::BloomPolicy; + use filter::InternalFilterPolicy; use options::Options; use table_builder::TableBuilder; use types::LdbIterator; + use key_types::LookupKey; use std::io::Cursor; @@ -366,6 +384,39 @@ (d, size) } + fn build_internal_table() -> (Vec<u8>, usize) { + + let mut d = Vec::with_capacity(512); + let mut opt = Options::default(); + opt.block_restart_interval = 2; + opt.block_size = 32; + + let mut i = 0 as u64; + let data: Vec<(Vec<u8>, &'static str)> = build_data() + .into_iter() + .map(|(k, v)| { + i += 1; + (LookupKey::new(k.as_bytes(), i).internal_key().to_vec(), v) + }) + .collect(); + + { + let mut b = + TableBuilder::new(opt, &mut d, InternalFilterPolicy::new(BloomPolicy::new(4))); + + for &(ref k, ref v) in data.iter() { + b.add(k.as_slice(), v.as_bytes()); + } + + b.finish(); + + } + + let size = d.len(); + + (d, size) + } + #[test] fn test_table_reader_checksum() { let (mut src, size) = build_table(); @@ -373,10 +424,10 @@ src[45] = 0; - let mut table = Table::new(Cursor::new(&src as &[u8]), - size, - BloomPolicy::new(4), - Options::default()) + let mut table = Table::new_raw(Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4), + Options::default()) .unwrap(); assert!(table.filters.is_some()); @@ -407,10 +458,10 @@ let (src, size) = build_table(); let data = build_data(); - let mut table = Table::new(Cursor::new(&src as &[u8]), - size, - BloomPolicy::new(4), - Options::default()) + let mut table = Table::new_raw(Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4), + Options::default()) .unwrap(); let iter = table.iter(); let mut i = 0; @@ -428,10 +479,10 @@ fn test_table_iterator_filter() { let (src, size) = build_table(); - let mut table = Table::new(Cursor::new(&src as &[u8]), - size, - BloomPolicy::new(4), - Options::default()) + let mut table = Table::new_raw(Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4), + Options::default()) .unwrap(); let filter_reader = table.filters.clone().unwrap(); let mut iter = table.iter(); @@ -451,10 +502,10 @@ fn test_table_iterator_state_behavior() { let (src, size) = build_table(); - let mut table = Table::new(Cursor::new(&src as &[u8]), - size, - BloomPolicy::new(4), - Options::default()) + let mut table = Table::new_raw(Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4), + Options::default()) .unwrap(); let mut iter = table.iter(); @@ -484,10 +535,10 @@ let (src, size) = build_table(); let data = build_data(); - let mut table = Table::new(Cursor::new(&src as &[u8]), - size, - BloomPolicy::new(4), - Options::default()) + let mut table = Table::new_raw(Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4), + Options::default()) .unwrap(); let mut iter = table.iter(); let mut i = 0; @@ -519,10 +570,10 @@ fn test_table_iterator_seek() { let (src, size) = build_table(); - let mut table = Table::new(Cursor::new(&src as &[u8]), - size, - BloomPolicy::new(4), - Options::default()) + let mut table = Table::new_raw(Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4), + Options::default()) .unwrap(); let mut iter = table.iter(); @@ -540,10 +591,10 @@ fn test_table_get() { let (src, size) = build_table(); - let mut table = Table::new(Cursor::new(&src as &[u8]), - size, - BloomPolicy::new(4), - Options::default()) + let mut table = Table::new_raw(Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4), + Options::default()) .unwrap(); assert!(table.get("aaa".as_bytes()).is_none()); @@ -553,4 +604,43 @@ assert_eq!(table.get("zzz".as_bytes()), Some("111".as_bytes().to_vec())); assert!(table.get("zz1".as_bytes()).is_none()); } + + // This test verifies that the table and filters work with internal keys. This means: + // The table contains keys in InternalKey format and it uses a filter wrapped by + // InternalFilterPolicy. + // All the other tests use raw keys that don't have any internal structure; this is fine in + // general, but here we want to see that the other infrastructure works too. + #[test] + fn test_table_internal_keys() { + use key_types::LookupKey; + + let (src, size) = build_internal_table(); + + let mut table = Table::new(Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4), + Options::default()) + .unwrap(); + let filter_reader = table.filters.clone().unwrap(); + + // Check that we're actually using internal keys + for (ref k, _) in table.iter() { + assert_eq!(k.len(), 3 + 8); + } + + let mut iter = table.iter(); + + loop { + if let Some((k, _)) = iter.next() { + let lk = LookupKey::new(&k, 123); + let userkey = lk.user_key(); + + assert!(filter_reader.key_may_match(iter.current_block_off, userkey)); + assert!(!filter_reader.key_may_match(iter.current_block_off, + "somerandomkey".as_bytes())); + } else { + break; + } + } + } }