Mercurial > lbo > hg > leveldb-rs
changeset 125:7c13a7d4f795
Move comparators to dedicated module and add better tests
This also allows for other "root" comparators than DefaultComparator
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 31 Dec 2016 17:41:25 +0100 |
parents | 778b1fa2063e |
children | 66c13972d977 |
files | src/cmp.rs src/key_types.rs src/lib.rs src/memtable.rs src/merging_iter.rs src/options.rs src/skipmap.rs src/table_builder.rs src/table_reader.rs src/test_util.rs src/types.rs |
diffstat | 11 files changed, 267 insertions(+), 145 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/cmp.rs Sat Dec 31 17:41:25 2016 +0100 @@ -0,0 +1,233 @@ +use key_types::{self, LookupKey}; +use types; + +use std::cmp::Ordering; +use std::sync::Arc; + +/// Comparator trait, supporting types that can be nested (i.e., add additional functionality on +/// top of an inner comparator) +pub trait Cmp { + fn cmp(&self, &[u8], &[u8]) -> Ordering; + fn find_shortest_sep(&self, &[u8], &[u8]) -> Vec<u8>; + fn find_short_succ(&self, &[u8]) -> Vec<u8>; +} + +/// Lexical comparator. +#[derive(Clone)] +pub struct DefaultCmp; + +impl Cmp for DefaultCmp { + fn cmp(&self, a: &[u8], b: &[u8]) -> Ordering { + a.cmp(b) + } + + fn find_shortest_sep(&self, a: &[u8], b: &[u8]) -> Vec<u8> { + if a == b { + return a.to_vec(); + } + + let min = if a.len() < b.len() { a.len() } else { b.len() }; + let mut diff_at = 0; + + while diff_at < min && a[diff_at] == b[diff_at] { + diff_at += 1; + } + + while diff_at < min { + let diff = a[diff_at]; + if diff < 0xff && diff + 1 < b[diff_at] { + let mut sep = Vec::from(&a[0..diff_at + 1]); + sep[diff_at] += 1; + assert!(self.cmp(&sep, b) == Ordering::Less); + return sep; + } + + diff_at += 1; + } + // extend one slice by a 0 byte + // e.g. abc/abd => abc abc\0 abd + let mut r = Vec::from(a); + r.extend_from_slice(&[0x00]); + return r; + } + + fn find_short_succ(&self, a: &[u8]) -> Vec<u8> { + let mut result = a.to_vec(); + for i in 0..a.len() { + if a[i] != 0xff { + result[i] += 1; + result.resize(i + 1, 0); + return result; + } + } + // Rare path + result.push(255); + return result; + } +} + +/// Same as memtable_key_cmp, but for InternalKeys. +#[derive(Clone)] +pub struct InternalKeyCmp(pub Arc<Box<Cmp>>); + +impl Cmp for InternalKeyCmp { + fn cmp(&self, a: &[u8], b: &[u8]) -> Ordering { + let (_, seqa, keya) = key_types::parse_internal_key(a); + let (_, seqb, keyb) = key_types::parse_internal_key(b); + + match self.0.cmp(keya, keyb) { + Ordering::Less => Ordering::Less, + Ordering::Greater => Ordering::Greater, + // reverse comparison! + Ordering::Equal => seqb.cmp(&seqa), + } + } + + fn find_shortest_sep(&self, a: &[u8], b: &[u8]) -> Vec<u8> { + let (_, seqa, keya) = key_types::parse_internal_key(a); + let (_, _, keyb) = key_types::parse_internal_key(b); + + let sep: Vec<u8> = self.0.find_shortest_sep(keya, keyb); + + if sep.len() < keya.len() && self.0.cmp(keya, &sep) == Ordering::Less { + return LookupKey::new(&sep, types::MAX_SEQUENCE_NUMBER).internal_key().to_vec(); + } + + return LookupKey::new(&sep, seqa).internal_key().to_vec(); + } + + fn find_short_succ(&self, a: &[u8]) -> Vec<u8> { + let (_, seq, key) = key_types::parse_internal_key(a); + let succ: Vec<u8> = self.0.find_short_succ(key); + return LookupKey::new(&succ, seq).internal_key().to_vec(); + } +} + +/// An internal comparator wrapping a user-supplied comparator. This comparator is used to compare +/// memtable keys, which contain length prefixes and a sequence number. +/// The ordering is determined by asking the wrapped comparator; ties are broken by *reverse* +/// ordering the sequence numbers. (This means that when having an entry abx/4 and searching for +/// abx/5, then abx/4 is counted as "greater-or-equal", making snapshot functionality work at all) +#[derive(Clone)] +pub struct MemtableKeyCmp(pub Arc<Box<Cmp>>); + +impl Cmp for MemtableKeyCmp { + fn cmp(&self, a: &[u8], b: &[u8]) -> Ordering { + let (akeylen, akeyoff, atag, _, _) = key_types::parse_memtable_key(a); + let (bkeylen, bkeyoff, btag, _, _) = key_types::parse_memtable_key(b); + + let userkey_a = &a[akeyoff..akeyoff + akeylen]; + let userkey_b = &b[bkeyoff..bkeyoff + bkeylen]; + + match self.0.cmp(userkey_a, userkey_b) { + Ordering::Less => Ordering::Less, + Ordering::Greater => Ordering::Greater, + Ordering::Equal => { + let (_, aseq) = key_types::parse_tag(atag); + let (_, bseq) = key_types::parse_tag(btag); + + // reverse! + bseq.cmp(&aseq) + } + } + } + + // The following two impls should not be used (by principle) although they should be correct. + // They will crash the program. + fn find_shortest_sep(&self, _: &[u8], _: &[u8]) -> Vec<u8> { + panic!("find* functions are invalid on MemtableKeyCmp"); + + // let (akeylen, akeyoff, atag, _, _) = key_types::parse_memtable_key(a); + // let (bkeylen, bkeyoff, _, _, _) = key_types::parse_memtable_key(a); + // let (atyp, aseq) = key_types::parse_tag(atag); + // + // let sep: Vec<u8> = self.0.find_shortest_sep(&a[akeyoff..akeyoff + akeylen], + // &b[bkeyoff..bkeyoff + bkeylen]); + // + // if sep.len() < akeylen && + // self.0.cmp(&a[akeyoff..akeyoff + akeylen], &sep) == Ordering::Less { + // return key_types::build_memtable_key(&sep, &[0; 0], atyp, types::MAX_SEQUENCE_NUMBER); + // } + // return key_types::build_memtable_key(&sep, &[0; 0], atyp, aseq); + // + } + + fn find_short_succ(&self, _: &[u8]) -> Vec<u8> { + panic!("find* functions are invalid on MemtableKeyCmp"); + + // let (keylen, keyoff, tag, _, _) = key_types::parse_memtable_key(a); + // let (typ, seq) = key_types::parse_tag(tag); + // + // let succ: Vec<u8> = self.0.find_short_succ(&a[keyoff..keyoff + keylen]); + // return key_types::build_memtable_key(&succ, &[0; 0], typ, seq); + // + } +} + +#[cfg(test)] +mod tests { + use super::*; + use key_types::LookupKey; + use types; + + use std::sync::Arc; + + #[test] + fn test_cmp_defaultcmp_shortest_sep() { + assert_eq!(DefaultCmp.find_shortest_sep("abcd".as_bytes(), "abcf".as_bytes()), + "abce".as_bytes()); + assert_eq!(DefaultCmp.find_shortest_sep("abc".as_bytes(), "acd".as_bytes()), + "abc\0".as_bytes()); + assert_eq!(DefaultCmp.find_shortest_sep("abcdefghi".as_bytes(), "abcffghi".as_bytes()), + "abce".as_bytes()); + assert_eq!(DefaultCmp.find_shortest_sep("a".as_bytes(), "a".as_bytes()), + "a".as_bytes()); + assert_eq!(DefaultCmp.find_shortest_sep("a".as_bytes(), "b".as_bytes()), + "a\0".as_bytes()); + assert_eq!(DefaultCmp.find_shortest_sep("abc".as_bytes(), "zzz".as_bytes()), + "b".as_bytes()); + assert_eq!(DefaultCmp.find_shortest_sep("".as_bytes(), "".as_bytes()), + "".as_bytes()); + } + + #[test] + fn test_cmp_defaultcmp_short_succ() { + assert_eq!(DefaultCmp.find_short_succ("abcd".as_bytes()), + "b".as_bytes()); + assert_eq!(DefaultCmp.find_short_succ("zzzz".as_bytes()), + "{".as_bytes()); + assert_eq!(DefaultCmp.find_short_succ(&[]), &[0xff]); + assert_eq!(DefaultCmp.find_short_succ(&[0xff, 0xff, 0xff]), + &[0xff, 0xff, 0xff, 0xff]); + } + + #[test] + fn test_cmp_internalkeycmp_shortest_sep() { + let cmp = InternalKeyCmp(Arc::new(Box::new(DefaultCmp))); + assert_eq!(cmp.find_shortest_sep(LookupKey::new("abcd".as_bytes(), 1).internal_key(), + LookupKey::new("abcf".as_bytes(), 2).internal_key()), + LookupKey::new("abce".as_bytes(), 1).internal_key()); + assert_eq!(cmp.find_shortest_sep(LookupKey::new("abc".as_bytes(), 1).internal_key(), + LookupKey::new("zzz".as_bytes(), 2).internal_key()), + LookupKey::new("b".as_bytes(), types::MAX_SEQUENCE_NUMBER).internal_key()); + assert_eq!(cmp.find_shortest_sep(LookupKey::new("abc".as_bytes(), 1).internal_key(), + LookupKey::new("acd".as_bytes(), 2).internal_key()), + LookupKey::new("abc\0".as_bytes(), 1).internal_key()); + assert_eq!(cmp.find_shortest_sep(LookupKey::new("abc".as_bytes(), 1).internal_key(), + LookupKey::new("abe".as_bytes(), 2).internal_key()), + LookupKey::new("abd".as_bytes(), 1).internal_key()); + assert_eq!(cmp.find_shortest_sep(LookupKey::new("".as_bytes(), 1).internal_key(), + LookupKey::new("".as_bytes(), 2).internal_key()), + LookupKey::new("".as_bytes(), 1).internal_key()); + assert_eq!(cmp.find_shortest_sep(LookupKey::new("abc".as_bytes(), 2).internal_key(), + LookupKey::new("abc".as_bytes(), 2).internal_key()), + LookupKey::new("abc".as_bytes(), 2).internal_key()); + } + + #[test] + #[should_panic] + fn test_cmp_memtablekeycmp_panics() { + let cmp = MemtableKeyCmp(Arc::new(Box::new(DefaultCmp))); + cmp.cmp(&[1, 2, 3], &[4, 5, 6]); + } +}
--- a/src/key_types.rs Sat Dec 31 15:39:59 2016 +0100 +++ b/src/key_types.rs Sat Dec 31 17:41:25 2016 +0100 @@ -1,8 +1,5 @@ use options::{CompressionType, int_to_compressiontype}; -use types::{ValueType, SequenceNumber, Cmp}; - -use std::cmp::Ordering; -use std::sync::Arc; +use types::{ValueType, SequenceNumber}; use integer_encoding::{FixedInt, VarInt}; @@ -78,10 +75,15 @@ } /// Parses a tag into (type, sequence number) -pub fn parse_tag(tag: u64) -> (u8, u64) { +pub fn parse_tag(tag: u64) -> (ValueType, u64) { let seq = tag >> 8; let typ = tag & 0xff; - (typ as u8, seq) + + match typ { + 0 => (ValueType::TypeDeletion, seq), + 1 => (ValueType::TypeValue, seq), + _ => (ValueType::TypeValue, seq), + } } /// A memtable key is a bytestring containing (keylen, key, tag, vallen, val). This function @@ -158,54 +160,6 @@ return (ctype, seq, &ikey[0..ikey.len() - 8]); } -/// Same as memtable_key_cmp, but for InternalKeys. -#[derive(Clone)] -pub struct InternalKeyCmp(pub Arc<Box<Cmp>>); - -impl Cmp for InternalKeyCmp { - fn cmp(&self, a: &[u8], b: &[u8]) -> Ordering { - let (_, seqa, keya) = parse_internal_key(a); - let (_, seqb, keyb) = parse_internal_key(b); - - match self.0.cmp(keya, keyb) { - Ordering::Less => Ordering::Less, - Ordering::Greater => Ordering::Greater, - // reverse comparison! - Ordering::Equal => seqb.cmp(&seqa), - } - } -} - -/// An internal comparator wrapping a user-supplied comparator. This comparator is used to compare -/// memtable keys, which contain length prefixes and a sequence number. -/// The ordering is determined by asking the wrapped comparator; ties are broken by *reverse* -/// ordering the sequence numbers. (This means that when having an entry abx/4 and searching for -/// abx/5, then abx/4 is counted as "greater-or-equal", making snapshot functionality work at all) -#[derive(Clone)] -pub struct MemtableKeyCmp(pub Arc<Box<Cmp>>); - -impl Cmp for MemtableKeyCmp { - fn cmp(&self, a: &[u8], b: &[u8]) -> Ordering { - let (akeylen, akeyoff, atag, _, _) = parse_memtable_key(a); - let (bkeylen, bkeyoff, btag, _, _) = parse_memtable_key(b); - - let userkey_a = &a[akeyoff..akeyoff + akeylen]; - let userkey_b = &b[bkeyoff..bkeyoff + bkeylen]; - - match self.0.cmp(userkey_a, userkey_b) { - Ordering::Less => Ordering::Less, - Ordering::Greater => Ordering::Greater, - Ordering::Equal => { - let (_, aseq) = parse_tag(atag); - let (_, bseq) = parse_tag(btag); - - // reverse! - bseq.cmp(&aseq) - } - } - } -} - #[cfg(test)] mod tests { use super::*;
--- a/src/lib.rs Sat Dec 31 15:39:59 2016 +0100 +++ b/src/lib.rs Sat Dec 31 17:41:25 2016 +0100 @@ -8,6 +8,7 @@ mod block; mod blockhandle; mod cache; +mod cmp; mod disk_env; mod env; mod filter;
--- a/src/memtable.rs Sat Dec 31 15:39:59 2016 +0100 +++ b/src/memtable.rs Sat Dec 31 17:41:25 2016 +0100 @@ -1,5 +1,6 @@ -use key_types::{LookupKey, UserKey, InternalKey, MemtableKey, MemtableKeyCmp, parse_memtable_key, - build_memtable_key}; +use key_types::{LookupKey, UserKey, InternalKey, MemtableKey}; +use cmp::MemtableKeyCmp; +use key_types::{parse_memtable_key, build_memtable_key}; use types::{ValueType, SequenceNumber, Status, LdbIterator}; use skipmap::{SkipMap, SkipMapIter}; use options::Options; @@ -49,8 +50,7 @@ let (fkeylen, fkeyoff, tag, vallen, valoff) = parse_memtable_key(foundkey); // Compare user key -- if equal, proceed - println!("{:?}", (key, foundkey)); - // equality doesn't need custom comparator + // We only care about user key equality here if key.user_key() == &foundkey[fkeyoff..fkeyoff + fkeylen] { if tag & 0xff == ValueType::TypeValue as u64 { return Result::Ok(foundkey[valoff..valoff + vallen].to_vec()); @@ -166,8 +166,8 @@ #[test] fn test_memtable_parse_tag() { - let tag = (12345 << 8) | 67; - assert_eq!(parse_tag(tag), (67, 12345)); + let tag = (12345 << 8) | 1; + assert_eq!(parse_tag(tag), (ValueType::TypeValue, 12345)); } #[test]
--- a/src/merging_iter.rs Sat Dec 31 15:39:59 2016 +0100 +++ b/src/merging_iter.rs Sat Dec 31 17:41:25 2016 +0100 @@ -1,5 +1,6 @@ +use cmp::Cmp; use options::Options; -use types::{Cmp, LdbIterator}; +use types::LdbIterator; use std::cmp::Ordering; use std::sync::Arc;
--- a/src/options.rs Sat Dec 31 15:39:59 2016 +0100 +++ b/src/options.rs Sat Dec 31 17:41:25 2016 +0100 @@ -1,6 +1,7 @@ use block::Block; use cache::Cache; -use types::{Cmp, DefaultCmp, SequenceNumber}; +use cmp::{Cmp, DefaultCmp}; +use types::SequenceNumber; use std::default::Default; use std::sync::{Arc, Mutex};
--- a/src/skipmap.rs Sat Dec 31 15:39:59 2016 +0100 +++ b/src/skipmap.rs Sat Dec 31 17:41:25 2016 +0100 @@ -1,5 +1,5 @@ -use key_types::MemtableKeyCmp; +use cmp::MemtableKeyCmp; use options::Options; use types::LdbIterator; use rand::{Rng, SeedableRng, StdRng};
--- a/src/table_builder.rs Sat Dec 31 15:39:59 2016 +0100 +++ b/src/table_builder.rs Sat Dec 31 17:41:25 2016 +0100 @@ -1,10 +1,10 @@ use block::{BlockBuilder, BlockContents}; use blockhandle::BlockHandle; +use cmp::InternalKeyCmp; use filter::{FilterPolicy, NoFilterPolicy}; use filter_block::FilterBlockBuilder; +use key_types::InternalKey; use options::{CompressionType, Options}; -use key_types::{InternalKey, InternalKeyCmp}; -use types::Cmp; use std::io::Write; use std::cmp::Ordering; @@ -22,35 +22,6 @@ pub const TABLE_BLOCK_COMPRESS_LEN: usize = 1; pub const TABLE_BLOCK_CKSUM_LEN: usize = 4; -fn find_shortest_sep<'a>(cmp: &Arc<Box<Cmp>>, lo: InternalKey<'a>, hi: InternalKey<'a>) -> Vec<u8> { - let min; - - if lo.len() < hi.len() { - min = lo.len(); - } else { - min = hi.len(); - } - - let mut diff_at = 0; - - while diff_at < min && lo[diff_at] == hi[diff_at] { - diff_at += 1; - } - - if diff_at == min { - return Vec::from(lo); - } else { - if lo[diff_at] < 0xff && lo[diff_at] + 1 < hi[diff_at] { - let mut result = lo.to_vec(); - result[diff_at] += 1; - println!("{:?}", (&result, hi)); - assert_eq!(cmp.cmp(&result, hi), Ordering::Less); - return result; - } - return Vec::from(lo); - } -} - /// Footer is a helper for encoding/decoding a table footer. #[derive(Debug)] pub struct Footer { @@ -180,7 +151,7 @@ assert!(self.data_block.is_some()); let block = self.data_block.take().unwrap(); - let sep = find_shortest_sep(&self.opt.cmp, &block.last_key(), next_key); + let sep = self.opt.cmp.find_shortest_sep(&block.last_key(), next_key); self.prev_block_last_key = Vec::from(block.last_key()); let contents = block.finish(); @@ -231,15 +202,9 @@ // If there's a pending data block, write it if self.data_block.as_ref().unwrap().entries() > 0 { // Find a key reliably past the last key - // NOTE: This only works if the basic comparator is DefaultCmp. (not a problem as long - // as we don't accept comparators from users) - let mut past_block = - Vec::with_capacity(self.data_block.as_ref().unwrap().last_key().len() + 1); - // Push 255 to the beginning - past_block.extend_from_slice(&[0xff; 1]); - past_block.extend_from_slice(self.data_block.as_ref().unwrap().last_key()); - - self.write_data_block(&past_block); + let key_past_last = + self.opt.cmp.find_short_succ(self.data_block.as_ref().unwrap().last_key()); + self.write_data_block(&key_past_last); } // Create metaindex block @@ -277,30 +242,10 @@ #[cfg(test)] mod tests { - use super::{find_shortest_sep, Footer, TableBuilder}; + use super::{Footer, TableBuilder}; use blockhandle::BlockHandle; use filter::BloomPolicy; use options::Options; - use types::{DefaultCmp, Cmp}; - - use std::sync::Arc; - - #[test] - fn test_shortest_sep() { - let cmp = Arc::new(Box::new(DefaultCmp) as Box<Cmp>); - assert_eq!(find_shortest_sep(&cmp, "abcd".as_bytes(), "abcf".as_bytes()), - "abce".as_bytes()); - assert_eq!(find_shortest_sep(&cmp, "abcdefghi".as_bytes(), "abcffghi".as_bytes()), - "abceefghi".as_bytes()); - assert_eq!(find_shortest_sep(&cmp, "a".as_bytes(), "a".as_bytes()), - "a".as_bytes()); - assert_eq!(find_shortest_sep(&cmp, "a".as_bytes(), "b".as_bytes()), - "a".as_bytes()); - assert_eq!(find_shortest_sep(&cmp, "abc".as_bytes(), "zzz".as_bytes()), - "bbc".as_bytes()); - assert_eq!(find_shortest_sep(&cmp, "".as_bytes(), "".as_bytes()), - "".as_bytes()); - } #[test] fn test_footer() {
--- a/src/table_reader.rs Sat Dec 31 15:39:59 2016 +0100 +++ b/src/table_reader.rs Sat Dec 31 17:41:25 2016 +0100 @@ -3,7 +3,8 @@ use cache::CacheID; use filter::{InternalFilterPolicy, FilterPolicy}; use filter_block::FilterBlockReader; -use key_types::{InternalKeyCmp, InternalKey}; +use key_types::InternalKey; +use cmp::InternalKeyCmp; use options::{self, CompressionType, Options}; use table_builder::{self, Footer}; use types::LdbIterator;
--- a/src/test_util.rs Sat Dec 31 15:39:59 2016 +0100 +++ b/src/test_util.rs Sat Dec 31 17:41:25 2016 +0100 @@ -1,4 +1,5 @@ -use types::{Cmp, DefaultCmp, LdbIterator}; +use types::LdbIterator; +use cmp::{Cmp, DefaultCmp}; use std::cmp::Ordering; pub struct TestLdbIter<'a> {
--- a/src/types.rs Sat Dec 31 15:39:59 2016 +0100 +++ b/src/types.rs Sat Dec 31 17:41:25 2016 +0100 @@ -1,7 +1,6 @@ //! A collection of fundamental and/or simple types used by other modules -use std::cmp::Ordering; - +#[derive(Debug, PartialOrd, PartialEq)] pub enum ValueType { TypeDeletion = 0, TypeValue = 1, @@ -10,6 +9,8 @@ /// Represents a sequence number of a single entry. pub type SequenceNumber = u64; +pub const MAX_SEQUENCE_NUMBER: SequenceNumber = (1 << 56) - 1; + #[derive(Clone, Debug)] #[allow(dead_code)] pub enum Status { @@ -21,22 +22,6 @@ IOError(String), } -/// Comparator trait, supporting types that can be nested (i.e., add additional functionality on -/// top of an inner comparator) -pub trait Cmp { - fn cmp(&self, &[u8], &[u8]) -> Ordering; -} - -/// Lexical comparator. -#[derive(Clone)] -pub struct DefaultCmp; - -impl Cmp for DefaultCmp { - fn cmp(&self, a: &[u8], b: &[u8]) -> Ordering { - a.cmp(b) - } -} - /// Denotes a key range pub struct Range<'a> { pub start: &'a [u8],