Mercurial > lbo > hg > leveldb-rs
changeset 123:bab23aa835e0
Finally integrate comparators properly everywhere
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 31 Dec 2016 15:33:20 +0100 |
parents | 14dbd87dd144 |
children | 778b1fa2063e |
files | src/block.rs src/key_types.rs src/memtable.rs src/merging_iter.rs src/options.rs src/skipmap.rs src/table_builder.rs src/table_reader.rs src/test_util.rs src/types.rs |
diffstat | 10 files changed, 247 insertions(+), 150 deletions(-) [+] |
line wrap: on
line diff
--- a/src/block.rs Mon Dec 26 11:22:17 2016 +0000 +++ b/src/block.rs Sat Dec 31 15:33:20 2016 +0100 @@ -3,7 +3,7 @@ use std::rc::Rc; use options::Options; -use types::{LdbIterator, cmp}; +use types::LdbIterator; use integer_encoding::FixedInt; use integer_encoding::VarInt; @@ -28,6 +28,7 @@ /// N_RESTARTS contains the number of restarts. pub struct Block { block: Rc<BlockContents>, + opt: Options, } impl Block { @@ -37,6 +38,8 @@ BlockIter { block: self.block.clone(), + opt: self.opt.clone(), + offset: 0, restarts_off: restart_offset, current_entry_offset: 0, @@ -51,14 +54,18 @@ self.block.clone() } - pub fn new(contents: BlockContents) -> Block { + pub fn new(opt: Options, contents: BlockContents) -> Block { assert!(contents.len() > 4); - Block { block: Rc::new(contents) } + Block { + block: Rc::new(contents), + opt: opt, + } } } pub struct BlockIter { block: Rc<BlockContents>, + opt: Options, // start of next entry offset: usize, // offset of restarts area @@ -205,7 +212,7 @@ // At a restart, the shared part is supposed to be 0. assert_eq!(shared, 0); - let c = cmp(to, &self.block[self.offset..self.offset + non_shared]); + let c = self.opt.cmp.cmp(to, &self.block[self.offset..self.offset + non_shared]); if c == Ordering::Less { right = middle - 1; @@ -220,7 +227,7 @@ // Linear search from here on while let Some((k, _)) = self.next() { - if cmp(k.as_slice(), to) >= Ordering::Equal { + if self.opt.cmp.cmp(k.as_slice(), to) >= Ordering::Equal { return; } } @@ -283,7 +290,8 @@ pub fn add(&mut self, key: &[u8], val: &[u8]) { assert!(self.counter <= self.opt.block_restart_interval); - assert!(self.buffer.is_empty() || cmp(self.last_key.as_slice(), key) == Ordering::Less); + assert!(self.buffer.is_empty() || + self.opt.cmp.cmp(self.last_key.as_slice(), key) == Ordering::Less); let mut shared = 0; @@ -387,7 +395,7 @@ assert_eq!(blockc.len(), 8); assert_eq!(blockc, vec![0, 0, 0, 0, 1, 0, 0, 0]); - let block = Block::new(blockc); + let block = Block::new(Options::default(), blockc); for _ in block.iter() { panic!("expected 0 iterations"); @@ -404,7 +412,7 @@ } let block_contents = builder.finish(); - let block = Block::new(block_contents).iter(); + let block = Block::new(Options::default(), block_contents).iter(); let mut i = 0; assert!(!block.valid()); @@ -422,14 +430,14 @@ let mut o = Options::default(); o.block_restart_interval = 3; let data = get_data(); - let mut builder = BlockBuilder::new(o); + let mut builder = BlockBuilder::new(o.clone()); for &(k, v) in data.iter() { builder.add(k, v); } let block_contents = builder.finish(); - let mut block = Block::new(block_contents).iter(); + let mut block = Block::new(o.clone(), block_contents).iter(); assert!(!block.valid()); assert_eq!(block.next(), @@ -451,7 +459,7 @@ o.block_restart_interval = 3; let data = get_data(); - let mut builder = BlockBuilder::new(o); + let mut builder = BlockBuilder::new(o.clone()); for &(k, v) in data.iter() { builder.add(k, v); @@ -459,7 +467,7 @@ let block_contents = builder.finish(); - let mut block = Block::new(block_contents).iter(); + let mut block = Block::new(o.clone(), block_contents).iter(); block.seek(&"prefix_key2".as_bytes()); assert!(block.valid()); @@ -494,7 +502,7 @@ let block_contents = builder.finish(); - let mut block = Block::new(block_contents).iter(); + let mut block = Block::new(o.clone(), block_contents).iter(); block.seek_to_last(); assert!(block.valid());
--- a/src/key_types.rs Mon Dec 26 11:22:17 2016 +0000 +++ b/src/key_types.rs Sat Dec 31 15:33:20 2016 +0100 @@ -1,7 +1,8 @@ use options::{CompressionType, int_to_compressiontype}; -use types::{ValueType, SequenceNumber, cmp}; +use types::{ValueType, SequenceNumber, Cmp}; use std::cmp::Ordering; +use std::sync::Arc; use integer_encoding::{FixedInt, VarInt}; @@ -28,6 +29,7 @@ /// A LookupKey is the first part of a memtable key, consisting of [keylen: varint32, key: *u8, /// tag: u64] /// keylen is the length of key plus 8 (for the tag; this for LevelDB compatibility) +#[derive(Clone, Debug)] pub struct LookupKey { key: Vec<u8>, key_offset: usize, @@ -146,6 +148,7 @@ } } +/// Parse a key in InternalKey format. pub fn parse_internal_key<'a>(ikey: InternalKey<'a>) -> (CompressionType, u64, UserKey<'a>) { assert!(ikey.len() >= 8); @@ -155,40 +158,51 @@ return (ctype, seq, &ikey[0..ikey.len() - 8]); } +/// Same as memtable_key_cmp, but for InternalKeys. +#[derive(Clone)] +pub struct InternalKeyCmp(pub Arc<Box<Cmp>>); + +impl Cmp for InternalKeyCmp { + fn cmp(&self, a: &[u8], b: &[u8]) -> Ordering { + let (_, seqa, keya) = parse_internal_key(a); + let (_, seqb, keyb) = parse_internal_key(b); + + match self.0.cmp(keya, keyb) { + Ordering::Less => Ordering::Less, + Ordering::Greater => Ordering::Greater, + // reverse comparison! + Ordering::Equal => seqb.cmp(&seqa), + } + } +} + /// An internal comparator wrapping a user-supplied comparator. This comparator is used to compare /// memtable keys, which contain length prefixes and a sequence number. /// The ordering is determined by asking the wrapped comparator; ties are broken by *reverse* /// ordering the sequence numbers. (This means that when having an entry abx/4 and searching for /// abx/5, then abx/4 is counted as "greater-or-equal", making snapshot functionality work at all) -pub fn memtable_key_cmp(a: &[u8], b: &[u8]) -> Ordering { - let (akeylen, akeyoff, atag, _, _) = parse_memtable_key(a); - let (bkeylen, bkeyoff, btag, _, _) = parse_memtable_key(b); - - let userkey_a = &a[akeyoff..akeyoff + akeylen]; - let userkey_b = &b[bkeyoff..bkeyoff + bkeylen]; +#[derive(Clone)] +pub struct MemtableKeyCmp(pub Arc<Box<Cmp>>); - match cmp(userkey_a, userkey_b) { - Ordering::Less => Ordering::Less, - Ordering::Greater => Ordering::Greater, - Ordering::Equal => { - let (_, aseq) = parse_tag(atag); - let (_, bseq) = parse_tag(btag); +impl Cmp for MemtableKeyCmp { + fn cmp(&self, a: &[u8], b: &[u8]) -> Ordering { + let (akeylen, akeyoff, atag, _, _) = parse_memtable_key(a); + let (bkeylen, bkeyoff, btag, _, _) = parse_memtable_key(b); + + let userkey_a = &a[akeyoff..akeyoff + akeylen]; + let userkey_b = &b[bkeyoff..bkeyoff + bkeylen]; - // reverse! - bseq.cmp(&aseq) - } - } -} + match self.0.cmp(userkey_a, userkey_b) { + Ordering::Less => Ordering::Less, + Ordering::Greater => Ordering::Greater, + Ordering::Equal => { + let (_, aseq) = parse_tag(atag); + let (_, bseq) = parse_tag(btag); -/// Same as memtable_key_cmp, but for InternalKeys. -pub fn internal_key_cmp(a: &[u8], b: &[u8]) -> Ordering { - let (_, seqa, keya) = parse_internal_key(a); - let (_, seqb, keyb) = parse_internal_key(b); - - match cmp(keya, keyb) { - Ordering::Less => Ordering::Less, - Ordering::Greater => Ordering::Greater, - Ordering::Equal => seqb.cmp(&seqa), + // reverse! + bseq.cmp(&aseq) + } + } } }
--- a/src/memtable.rs Mon Dec 26 11:22:17 2016 +0000 +++ b/src/memtable.rs Sat Dec 31 15:33:20 2016 +0100 @@ -1,19 +1,34 @@ -use std::cmp::Ordering; +use key_types::{LookupKey, UserKey, InternalKey, MemtableKey, MemtableKeyCmp, parse_memtable_key, + build_memtable_key}; +use types::{ValueType, SequenceNumber, Status, LdbIterator}; +use skipmap::{SkipMap, SkipMapIter}; +use options::Options; -use key_types::{LookupKey, UserKey, InternalKey, MemtableKey, parse_memtable_key, - build_memtable_key}; -use types::{ValueType, SequenceNumber, Status, LdbIterator, cmp}; -use skipmap::{SkipMap, SkipMapIter}; +use std::sync::Arc; /// Provides Insert/Get/Iterate, based on the SkipMap implementation. /// MemTable uses MemtableKeys internally, that is, it stores key and value in the [Skipmap] key. pub struct MemTable { map: SkipMap, + opt: Options, } impl MemTable { - pub fn new() -> MemTable { - MemTable { map: SkipMap::new_memtable_map() } + /// Returns a new MemTable. + /// This wraps opt.cmp inside a MemtableKey-specific comparator. + pub fn new(mut opt: Options) -> MemTable { + opt.cmp = Arc::new(Box::new(MemtableKeyCmp(opt.cmp.clone()))); + MemTable::new_raw(opt) + } + + /// Doesn't wrap the comparator in a MemtableKeyCmp. + fn new_raw(opt: Options) -> MemTable { + // Not using SkipMap::new_memtable_map(), as opt.cmp will already be wrapped by + // MemTable::new() + MemTable { + map: SkipMap::new(opt.clone()), + opt: opt, + } } pub fn approx_mem_usage(&self) -> usize { self.map.approx_memory() @@ -30,12 +45,13 @@ if let Some(e) = iter.current() { let foundkey: MemtableKey = e.0; - let (lkeylen, lkeyoff, _, _, _) = parse_memtable_key(key.memtable_key()); + // let (lkeylen, lkeyoff, _, _, _) = parse_memtable_key(key.memtable_key()); let (fkeylen, fkeyoff, tag, vallen, valoff) = parse_memtable_key(foundkey); // Compare user key -- if equal, proceed - if cmp(&key.memtable_key()[lkeyoff..lkeyoff + lkeylen], - &foundkey[fkeyoff..fkeyoff + fkeylen]) == Ordering::Equal { + println!("{:?}", (key, foundkey)); + // equality doesn't need custom comparator + if key.user_key() == &foundkey[fkeyoff..fkeyoff + fkeylen] { if tag & 0xff == ValueType::TypeValue as u64 { return Result::Ok(foundkey[valoff..valoff + vallen].to_vec()); } else { @@ -132,9 +148,10 @@ use super::*; use key_types::*; use types::*; + use options::Options; fn get_memtable() -> MemTable { - let mut mt = MemTable::new(); + let mut mt = MemTable::new(Options::default()); let entries = vec![(115, "abc", "122"), (120, "abc", "123"), (121, "abd", "124"), @@ -155,7 +172,7 @@ #[test] fn test_memtable_add() { - let mut mt = MemTable::new(); + let mut mt = MemTable::new_raw(Options::default()); mt.add(123, ValueType::TypeValue, "abc".as_bytes(),
--- a/src/merging_iter.rs Mon Dec 26 11:22:17 2016 +0000 +++ b/src/merging_iter.rs Sat Dec 31 15:33:20 2016 +0100 @@ -1,6 +1,8 @@ -use types::{cmp, LdbIterator}; +use options::Options; +use types::{Cmp, LdbIterator}; use std::cmp::Ordering; +use std::sync::Arc; // Warning: This module is kinda messy. The original implementation is // not that much better though :-) @@ -24,16 +26,19 @@ iters: Vec<&'a mut LdbIterator<Item = (&'b [u8], &'b [u8])>>, current: Option<usize>, direction: Direction, + cmp: Arc<Box<Cmp>>, } impl<'a, 'b: 'a> MergingIter<'a, 'b> { /// Construct a new merging iterator. - pub fn new(iters: Vec<&'a mut LdbIterator<Item = (&'b [u8], &'b [u8])>>) + pub fn new(opt: Options, + iters: Vec<&'a mut LdbIterator<Item = (&'b [u8], &'b [u8])>>) -> MergingIter<'a, 'b> { let mi = MergingIter { iters: iters, current: None, direction: Direction::Fwd, + cmp: opt.cmp, }; mi } @@ -60,7 +65,7 @@ if i != current { self.iters[i].seek(key); if let Some((current_key, _)) = self.iters[i].current() { - if cmp(current_key, key) == Ordering::Equal { + if self.cmp.cmp(current_key, key) == Ordering::Equal { self.iters[i].next(); } } @@ -105,7 +110,7 @@ for i in 1..self.iters.len() { if let Some(current) = self.iters[i].current() { if let Some(smallest) = self.iters[next_ix].current() { - if cmp(current.0, smallest.0) == ord { + if self.cmp.cmp(current.0, smallest.0) == ord { next_ix = i; } } else { @@ -184,6 +189,7 @@ mod tests { use super::*; + use options::Options; use test_util::TestLdbIter; use types::LdbIterator; use skipmap::tests; @@ -194,7 +200,7 @@ let mut iter = skm.iter(); let mut iter2 = skm.iter(); - let mut miter = MergingIter::new(vec![&mut iter]); + let mut miter = MergingIter::new(Options::default(), vec![&mut iter]); loop { if let Some((k, v)) = miter.next() { @@ -216,7 +222,7 @@ let mut iter = skm.iter(); let mut iter2 = skm.iter(); - let mut miter = MergingIter::new(vec![&mut iter, &mut iter2]); + let mut miter = MergingIter::new(Options::default(), vec![&mut iter, &mut iter2]); loop { if let Some((k, v)) = miter.next() { @@ -238,7 +244,7 @@ let mut iter = skm.iter(); let mut iter2 = skm.iter(); - let mut miter = MergingIter::new(vec![&mut iter, &mut iter2]); + let mut miter = MergingIter::new(Options::default(), vec![&mut iter, &mut iter2]); let first = miter.next(); miter.next(); @@ -261,7 +267,7 @@ let mut it2 = TestLdbIter::new(vec![(b("abb"), val), (b("abd"), val)]); let expected = vec![b("aba"), b("abb"), b("abc"), b("abd"), b("abe")]; - let iter = MergingIter::new(vec![&mut it1, &mut it2]); + let iter = MergingIter::new(Options::default(), vec![&mut it1, &mut it2]); let mut i = 0; for (k, _) in iter { @@ -278,7 +284,7 @@ let mut it1 = TestLdbIter::new(vec![(b("aba"), val), (b("abc"), val), (b("abe"), val)]); let mut it2 = TestLdbIter::new(vec![(b("abb"), val), (b("abd"), val)]); - let mut iter = MergingIter::new(vec![&mut it1, &mut it2]); + let mut iter = MergingIter::new(Options::default(), vec![&mut it1, &mut it2]); assert!(!iter.valid()); iter.next(); @@ -305,7 +311,7 @@ let mut it1 = TestLdbIter::new(vec![(b("aba"), val), (b("abc"), val), (b("abe"), val)]); let mut it2 = TestLdbIter::new(vec![(b("abb"), val), (b("abd"), val)]); - let mut iter = MergingIter::new(vec![&mut it1, &mut it2]); + let mut iter = MergingIter::new(Options::default(), vec![&mut it1, &mut it2]); iter.next(); iter.next();
--- a/src/options.rs Mon Dec 26 11:22:17 2016 +0000 +++ b/src/options.rs Sat Dec 31 15:33:20 2016 +0100 @@ -1,10 +1,9 @@ use block::Block; use cache::Cache; -use types::SequenceNumber; +use types::{Cmp, DefaultCmp, SequenceNumber}; use std::default::Default; -use std::sync::Mutex; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; const KB: usize = 1 << 10; const MB: usize = KB * KB; @@ -31,6 +30,10 @@ /// #[derive(Clone)] pub struct Options { + // NOTE: do NOT set this to something different than DefaultCmp, otherwise some things will + // break (at the moment). Comparators would need extra functionality to fix this (e.g., string + // separator finding) + pub cmp: Arc<Box<Cmp>>, pub create_if_missing: bool, pub error_if_exists: bool, pub paranoid_checks: bool, @@ -47,6 +50,7 @@ impl Default for Options { fn default() -> Options { Options { + cmp: Arc::new(Box::new(DefaultCmp)), create_if_missing: true, error_if_exists: false, paranoid_checks: false,
--- a/src/skipmap.rs Mon Dec 26 11:22:17 2016 +0000 +++ b/src/skipmap.rs Sat Dec 31 15:33:20 2016 +0100 @@ -1,9 +1,12 @@ -use types::{cmp, CmpFn, LdbIterator}; + +use key_types::MemtableKeyCmp; +use options::Options; +use types::LdbIterator; use rand::{Rng, SeedableRng, StdRng}; -use key_types::memtable_key_cmp; use std::cmp::Ordering; use std::mem::{replace, size_of}; +use std::sync::Arc; const MAX_HEIGHT: usize = 12; const BRANCHING_FACTOR: u32 = 4; @@ -27,19 +30,19 @@ len: usize, // approximation of memory used. approx_mem: usize, - cmp: Box<CmpFn>, + opt: Options, } impl SkipMap { - /// Used for testing: Uses the standard comparator. - pub fn new_memtable_map() -> SkipMap { - let mut skm = SkipMap::new(); - skm.cmp = Box::new(memtable_key_cmp); + /// Returns a SkipMap that wraps the comparator from opt inside a MemtableKeyCmp + pub fn new_memtable_map(mut opt: Options) -> SkipMap { + opt.cmp = Arc::new(Box::new(MemtableKeyCmp(opt.cmp.clone()))); + let skm = SkipMap::new(opt); skm } - /// Returns a SkipMap that uses the memtable comparator (see above). - fn new() -> SkipMap { + /// Returns a SkipMap that uses the comparator from opt + pub fn new(opt: Options) -> SkipMap { let mut s = Vec::new(); s.resize(MAX_HEIGHT, None); @@ -53,7 +56,7 @@ rand: StdRng::from_seed(&[0xde, 0xad, 0xbe, 0xef]), len: 0, approx_mem: size_of::<Self>() + MAX_HEIGHT * size_of::<Option<*mut Node>>(), - cmp: Box::new(cmp), + opt: opt, } } @@ -92,7 +95,7 @@ loop { unsafe { if let Some(next) = (*current).skips[level] { - let ord = (self.cmp)((*next).key.as_slice(), key); + let ord = self.opt.cmp.cmp((*next).key.as_slice(), key); match ord { Ordering::Less => { @@ -117,7 +120,7 @@ unsafe { if current.is_null() { return None; - } else if (self.cmp)(&(*current).key, key) == Ordering::Less { + } else if self.opt.cmp.cmp(&(*current).key, key) == Ordering::Less { return None; } else { return Some(&(*current)); @@ -135,7 +138,7 @@ loop { unsafe { if let Some(next) = (*current).skips[level] { - let ord = (self.cmp)((*next).key.as_slice(), key); + let ord = self.opt.cmp.cmp((*next).key.as_slice(), key); match ord { Ordering::Less => { @@ -156,7 +159,7 @@ if current.is_null() || (*current).key.is_empty() { // If we're past the end for some reason or at the head return None; - } else if (self.cmp)(&(*current).key, key) != Ordering::Less { + } else if self.opt.cmp.cmp(&(*current).key, key) != Ordering::Less { return None; } else { return Some(&(*current)); @@ -182,7 +185,7 @@ unsafe { if let Some(next) = (*current).skips[level] { // If the wanted position is after the current node - let ord = (self.cmp)(&(*next).key, &key); + let ord = self.opt.cmp.cmp(&(*next).key, &key); assert!(ord != Ordering::Equal, "No duplicates allowed"); @@ -323,10 +326,11 @@ #[cfg(test)] pub mod tests { use super::*; + use options::Options; use types::*; pub fn make_skipmap() -> SkipMap { - let mut skm = SkipMap::new(); + let mut skm = SkipMap::new(Options::default()); let keys = vec!["aba", "abb", "abc", "abd", "abe", "abf", "abg", "abh", "abi", "abj", "abk", "abl", "abm", "abn", "abo", "abp", "abq", "abr", "abs", "abt", "abu", "abv", "abw", "abx", "aby", "abz"]; @@ -386,7 +390,7 @@ #[test] fn test_iterator_0() { - let skm = SkipMap::new(); + let skm = SkipMap::new(Options::default()); let mut i = 0; for (_, _) in skm.iter() {
--- a/src/table_builder.rs Mon Dec 26 11:22:17 2016 +0000 +++ b/src/table_builder.rs Sat Dec 31 15:33:20 2016 +0100 @@ -2,12 +2,13 @@ use blockhandle::BlockHandle; use filter::{FilterPolicy, NoFilterPolicy}; use filter_block::FilterBlockBuilder; -use key_types::InternalKey; use options::{CompressionType, Options}; -use types::cmp; +use key_types::{InternalKey, InternalKeyCmp}; +use types::Cmp; use std::io::Write; use std::cmp::Ordering; +use std::sync::Arc; use crc::crc32; use crc::Hasher32; @@ -21,7 +22,7 @@ pub const TABLE_BLOCK_COMPRESS_LEN: usize = 1; pub const TABLE_BLOCK_CKSUM_LEN: usize = 4; -fn find_shortest_sep<'a>(lo: InternalKey<'a>, hi: InternalKey<'a>) -> Vec<u8> { +fn find_shortest_sep<'a>(cmp: &Arc<Box<Cmp>>, lo: InternalKey<'a>, hi: InternalKey<'a>) -> Vec<u8> { let min; if lo.len() < hi.len() { @@ -40,9 +41,10 @@ return Vec::from(lo); } else { if lo[diff_at] < 0xff && lo[diff_at] + 1 < hi[diff_at] { - let mut result = Vec::from(&lo[0..diff_at + 1]); + let mut result = lo.to_vec(); result[diff_at] += 1; - assert_eq!(cmp(&result, hi), Ordering::Less); + println!("{:?}", (&result, hi)); + assert_eq!(cmp.cmp(&result, hi), Ordering::Less); return result; } return Vec::from(lo); @@ -124,7 +126,15 @@ /// It's recommended that you use InternalFilterPolicy as FilterPol, as that policy extracts the /// underlying user keys from the InternalKeys used as keys in the table. impl<'a, Dst: Write, FilterPol: FilterPolicy> TableBuilder<'a, Dst, FilterPol> { - pub fn new(opt: Options, dst: Dst, fpol: FilterPol) -> TableBuilder<'a, Dst, FilterPol> { + /// Create a new table builder. + /// The comparator in opt will be wrapped in a InternalKeyCmp. + pub fn new(mut opt: Options, dst: Dst, fpol: FilterPol) -> TableBuilder<'a, Dst, FilterPol> { + opt.cmp = Arc::new(Box::new(InternalKeyCmp(opt.cmp.clone()))); + TableBuilder::new_raw(opt, dst, fpol) + } + + /// Like new(), but doesn't wrap the comparator in an InternalKeyCmp (for testing) + pub fn new_raw(opt: Options, dst: Dst, fpol: FilterPol) -> TableBuilder<'a, Dst, FilterPol> { TableBuilder { opt: opt.clone(), dst: dst, @@ -141,9 +151,13 @@ self.num_entries } + /// Add a key to the table. The key as to be lexically greater or equal to the last one added. pub fn add(&mut self, key: InternalKey<'a>, val: &[u8]) { assert!(self.data_block.is_some()); - assert!(self.num_entries == 0 || cmp(&self.prev_block_last_key, key) == Ordering::Less); + + if !self.prev_block_last_key.is_empty() { + assert!(self.opt.cmp.cmp(&self.prev_block_last_key, key) == Ordering::Less); + } if self.data_block.as_ref().unwrap().size_estimate() > self.opt.block_size { self.write_data_block(key); @@ -166,7 +180,7 @@ assert!(self.data_block.is_some()); let block = self.data_block.take().unwrap(); - let sep = find_shortest_sep(&block.last_key(), next_key); + let sep = find_shortest_sep(&self.opt.cmp, &block.last_key(), next_key); self.prev_block_last_key = Vec::from(block.last_key()); let contents = block.finish(); @@ -216,7 +230,16 @@ // If there's a pending data block, write it if self.data_block.as_ref().unwrap().entries() > 0 { - self.write_data_block(&[0xff as u8; 1]); + // Find a key reliably past the last key + // NOTE: This only works if the basic comparator is DefaultCmp. (not a problem as long + // as we don't accept comparators from users) + let mut past_block = + Vec::with_capacity(self.data_block.as_ref().unwrap().last_key().len() + 1); + // Push 255 to the beginning + past_block.extend_from_slice(&[0xff; 1]); + past_block.extend_from_slice(self.data_block.as_ref().unwrap().last_key()); + + self.write_data_block(&past_block); } // Create metaindex block @@ -258,20 +281,24 @@ use blockhandle::BlockHandle; use filter::BloomPolicy; use options::Options; + use types::{DefaultCmp, Cmp}; + + use std::sync::Arc; #[test] fn test_shortest_sep() { - assert_eq!(find_shortest_sep("abcd".as_bytes(), "abcf".as_bytes()), + let cmp = Arc::new(Box::new(DefaultCmp) as Box<Cmp>); + assert_eq!(find_shortest_sep(&cmp, "abcd".as_bytes(), "abcf".as_bytes()), "abce".as_bytes()); - assert_eq!(find_shortest_sep("abcdefghi".as_bytes(), "abcffghi".as_bytes()), - "abce".as_bytes()); - assert_eq!(find_shortest_sep("a".as_bytes(), "a".as_bytes()), + assert_eq!(find_shortest_sep(&cmp, "abcdefghi".as_bytes(), "abcffghi".as_bytes()), + "abceefghi".as_bytes()); + assert_eq!(find_shortest_sep(&cmp, "a".as_bytes(), "a".as_bytes()), "a".as_bytes()); - assert_eq!(find_shortest_sep("a".as_bytes(), "b".as_bytes()), + assert_eq!(find_shortest_sep(&cmp, "a".as_bytes(), "b".as_bytes()), "a".as_bytes()); - assert_eq!(find_shortest_sep("abc".as_bytes(), "zzz".as_bytes()), - "b".as_bytes()); - assert_eq!(find_shortest_sep("".as_bytes(), "".as_bytes()), + assert_eq!(find_shortest_sep(&cmp, "abc".as_bytes(), "zzz".as_bytes()), + "bbc".as_bytes()); + assert_eq!(find_shortest_sep(&cmp, "".as_bytes(), "".as_bytes()), "".as_bytes()); } @@ -294,7 +321,7 @@ let mut d = Vec::with_capacity(512); let mut opt = Options::default(); opt.block_restart_interval = 3; - let mut b = TableBuilder::new(opt, &mut d, BloomPolicy::new(4)); + let mut b = TableBuilder::new_raw(opt, &mut d, BloomPolicy::new(4)); let data = vec![("abc", "def"), ("abd", "dee"), ("bcd", "asa"), ("bsr", "a00")]; @@ -312,7 +339,7 @@ let mut d = Vec::with_capacity(512); let mut opt = Options::default(); opt.block_restart_interval = 3; - let mut b = TableBuilder::new(opt, &mut d, BloomPolicy::new(4)); + let mut b = TableBuilder::new_raw(opt, &mut d, BloomPolicy::new(4)); // Test two equal consecutive keys let data = vec![("abc", "def"), ("abc", "dee"), ("bcd", "asa"), ("bsr", "a00")];
--- a/src/table_reader.rs Mon Dec 26 11:22:17 2016 +0000 +++ b/src/table_reader.rs Sat Dec 31 15:33:20 2016 +0100 @@ -3,13 +3,14 @@ use cache::CacheID; use filter::{InternalFilterPolicy, FilterPolicy}; use filter_block::FilterBlockReader; +use key_types::{InternalKeyCmp, InternalKey}; use options::{self, CompressionType, Options}; use table_builder::{self, Footer}; -use types::{cmp, CmpFn, LdbIterator}; -use key_types::{internal_key_cmp, InternalKey}; +use types::LdbIterator; +use std::cmp::Ordering; use std::io::{self, Read, Seek, SeekFrom, Result}; -use std::cmp::Ordering; +use std::sync::Arc; use integer_encoding::FixedInt; use crc::crc32::{self, Hasher32}; @@ -41,7 +42,10 @@ impl TableBlock { /// Reads a block at location. - fn read_block<R: Read + Seek>(f: &mut R, location: &BlockHandle) -> Result<TableBlock> { + fn read_block<R: Read + Seek>(opt: Options, + f: &mut R, + location: &BlockHandle) + -> Result<TableBlock> { // The block is denoted by offset and length in BlockHandle. A block in an encoded // table is followed by 1B compression type and 4B checksum. let buf = try!(read_bytes(f, location)); @@ -53,7 +57,7 @@ table_builder::TABLE_BLOCK_COMPRESS_LEN, table_builder::TABLE_BLOCK_CKSUM_LEN))); Ok(TableBlock { - block: Block::new(buf), + block: Block::new(opt, buf), checksum: u32::decode_fixed(&cksum), compression: options::int_to_compressiontype(compress[0] as u32) .unwrap_or(CompressionType::CompressionNone), @@ -76,7 +80,6 @@ cache_id: CacheID, opt: Options, - cmp: Box<CmpFn>, footer: Footer, indexblock: Block, @@ -85,11 +88,12 @@ impl<R: Read + Seek, FP: FilterPolicy> Table<R, FP> { /// Creates a new table reader operating on unformatted keys (i.e., UserKey). - fn new_raw(mut file: R, size: usize, fp: FP, opt: Options) -> Result<Table<R, FP>> { + fn new_raw(opt: Options, mut file: R, size: usize, fp: FP) -> Result<Table<R, FP>> { let footer = try!(read_footer(&mut file, size)); - let indexblock = try!(TableBlock::read_block(&mut file, &footer.index)); - let metaindexblock = try!(TableBlock::read_block(&mut file, &footer.meta_index)); + let indexblock = try!(TableBlock::read_block(opt.clone(), &mut file, &footer.index)); + let metaindexblock = + try!(TableBlock::read_block(opt.clone(), &mut file, &footer.meta_index)); if !indexblock.verify() || !metaindexblock.verify() { return Err(io::Error::new(io::ErrorKind::InvalidData, @@ -122,7 +126,6 @@ file_size: size, cache_id: cache_id, opt: opt, - cmp: Box::new(cmp), footer: footer, filters: filter_block_reader, indexblock: indexblock.block, @@ -132,18 +135,18 @@ /// Creates a new table reader operating on internal keys (i.e., InternalKey). This means that /// a different comparator (internal_key_cmp) and a different filter policy /// (InternalFilterPolicy) are used. - pub fn new(file: R, + pub fn new(mut opt: Options, + file: R, size: usize, - fp: FP, - opt: Options) + fp: FP) -> Result<Table<R, InternalFilterPolicy<FP>>> { - let mut t = try!(Table::new_raw(file, size, InternalFilterPolicy::new(fp), opt)); - t.cmp = Box::new(internal_key_cmp); + opt.cmp = Arc::new(Box::new(InternalKeyCmp(opt.cmp.clone()))); + let t = try!(Table::new_raw(opt, file, size, InternalFilterPolicy::new(fp))); Ok(t) } fn read_block(&mut self, location: &BlockHandle) -> Result<TableBlock> { - let b = try!(TableBlock::read_block(&mut self.file, location)); + let b = try!(TableBlock::read_block(self.opt.clone(), &mut self.file, location)); if !b.verify() { Err(io::Error::new(io::ErrorKind::InvalidData, "Data block failed verification")) @@ -172,6 +175,7 @@ current_block: self.indexblock.iter(), // just for filling in here current_block_off: 0, index_block: self.indexblock.iter(), + opt: self.opt.clone(), table: self, init: false, }; @@ -221,6 +225,7 @@ /// into the data blocks. pub struct TableIterator<'a, R: 'a + Read + Seek, FP: 'a + FilterPolicy> { table: &'a mut Table<R, FP>, + opt: Options, current_block: BlockIter, current_block_off: usize, index_block: BlockIter, @@ -284,7 +289,7 @@ self.index_block.seek(to); if let Some((past_block, handle)) = self.index_block.current() { - if (self.table.cmp)(to, &past_block) == Ordering::Less { + if self.opt.cmp.cmp(to, &past_block) == Ordering::Less { // ok, found right block: continue if let Ok(()) = self.load_block(&handle) { self.current_block.seek(to); @@ -365,7 +370,7 @@ ("zzz", "111")] } - + // Build a table containing raw keys (no format) fn build_table() -> (Vec<u8>, usize) { let mut d = Vec::with_capacity(512); let mut opt = Options::default(); @@ -373,7 +378,8 @@ opt.block_size = 32; { - let mut b = TableBuilder::new(opt, &mut d, BloomPolicy::new(4)); + // Uses the standard comparator in opt. + let mut b = TableBuilder::new_raw(opt, &mut d, BloomPolicy::new(4)); let data = build_data(); for &(k, v) in data.iter() { @@ -389,8 +395,8 @@ (d, size) } + // Build a table containing keys in InternalKey format. fn build_internal_table() -> (Vec<u8>, usize) { - let mut d = Vec::with_capacity(512); let mut opt = Options::default(); opt.block_restart_interval = 2; @@ -406,6 +412,7 @@ .collect(); { + // Uses InternalKeyCmp let mut b = TableBuilder::new(opt, &mut d, InternalFilterPolicy::new(BloomPolicy::new(4))); @@ -429,10 +436,10 @@ src[45] = 0; - let mut table = Table::new_raw(Cursor::new(&src as &[u8]), + let mut table = Table::new_raw(Options::default(), + Cursor::new(&src as &[u8]), size, - BloomPolicy::new(4), - Options::default()) + BloomPolicy::new(4)) .unwrap(); assert!(table.filters.is_some()); @@ -463,10 +470,10 @@ let (src, size) = build_table(); let data = build_data(); - let mut table = Table::new_raw(Cursor::new(&src as &[u8]), + let mut table = Table::new_raw(Options::default(), + Cursor::new(&src as &[u8]), size, - BloomPolicy::new(4), - Options::default()) + BloomPolicy::new(4)) .unwrap(); let iter = table.iter(); let mut i = 0; @@ -484,10 +491,10 @@ fn test_table_iterator_filter() { let (src, size) = build_table(); - let mut table = Table::new_raw(Cursor::new(&src as &[u8]), + let mut table = Table::new_raw(Options::default(), + Cursor::new(&src as &[u8]), size, - BloomPolicy::new(4), - Options::default()) + BloomPolicy::new(4)) .unwrap(); let filter_reader = table.filters.clone().unwrap(); let mut iter = table.iter(); @@ -507,10 +514,10 @@ fn test_table_iterator_state_behavior() { let (src, size) = build_table(); - let mut table = Table::new_raw(Cursor::new(&src as &[u8]), + let mut table = Table::new_raw(Options::default(), + Cursor::new(&src as &[u8]), size, - BloomPolicy::new(4), - Options::default()) + BloomPolicy::new(4)) .unwrap(); let mut iter = table.iter(); @@ -540,10 +547,10 @@ let (src, size) = build_table(); let data = build_data(); - let mut table = Table::new_raw(Cursor::new(&src as &[u8]), + let mut table = Table::new_raw(Options::default(), + Cursor::new(&src as &[u8]), size, - BloomPolicy::new(4), - Options::default()) + BloomPolicy::new(4)) .unwrap(); let mut iter = table.iter(); let mut i = 0; @@ -575,10 +582,10 @@ fn test_table_iterator_seek() { let (src, size) = build_table(); - let mut table = Table::new_raw(Cursor::new(&src as &[u8]), + let mut table = Table::new_raw(Options::default(), + Cursor::new(&src as &[u8]), size, - BloomPolicy::new(4), - Options::default()) + BloomPolicy::new(4)) .unwrap(); let mut iter = table.iter(); @@ -596,10 +603,10 @@ fn test_table_get() { let (src, size) = build_table(); - let mut table = Table::new_raw(Cursor::new(&src as &[u8]), + let mut table = Table::new_raw(Options::default(), + Cursor::new(&src as &[u8]), size, - BloomPolicy::new(4), - Options::default()) + BloomPolicy::new(4)) .unwrap(); assert!(table.get("aaa".as_bytes()).is_none()); @@ -621,10 +628,10 @@ let (src, size) = build_internal_table(); - let mut table = Table::new(Cursor::new(&src as &[u8]), + let mut table = Table::new(Options::default(), + Cursor::new(&src as &[u8]), size, - BloomPolicy::new(4), - Options::default()) + BloomPolicy::new(4)) .unwrap(); let filter_reader = table.filters.clone().unwrap();
--- a/src/test_util.rs Mon Dec 26 11:22:17 2016 +0000 +++ b/src/test_util.rs Sat Dec 31 15:33:20 2016 +0100 @@ -1,4 +1,4 @@ -use types::{cmp, LdbIterator}; +use types::{Cmp, DefaultCmp, LdbIterator}; use std::cmp::Ordering; pub struct TestLdbIter<'a> { @@ -50,7 +50,7 @@ } fn seek(&mut self, k: &[u8]) { self.ix = 0; - while self.ix < self.v.len() && cmp(self.v[self.ix].0, k) == Ordering::Less { + while self.ix < self.v.len() && DefaultCmp.cmp(self.v[self.ix].0, k) == Ordering::Less { self.ix += 1; } }
--- a/src/types.rs Mon Dec 26 11:22:17 2016 +0000 +++ b/src/types.rs Sat Dec 31 15:33:20 2016 +0100 @@ -21,10 +21,20 @@ IOError(String), } -pub type CmpFn = Fn(&[u8], &[u8]) -> Ordering; +/// Comparator trait, supporting types that can be nested (i.e., add additional functionality on +/// top of an inner comparator) +pub trait Cmp { + fn cmp(&self, &[u8], &[u8]) -> Ordering; +} -pub fn cmp(a: &[u8], b: &[u8]) -> Ordering { - a.cmp(b) +/// Lexical comparator. +#[derive(Clone)] +pub struct DefaultCmp; + +impl Cmp for DefaultCmp { + fn cmp(&self, a: &[u8], b: &[u8]) -> Ordering { + a.cmp(b) + } } pub struct Range<'a> {