changeset 118:c2539cd2d021

Write proper comparators for key formats and use them in filters/table
author Lewin Bormann <lbo@spheniscida.de>
date Mon, 26 Dec 2016 09:23:16 +0000
parents 6147b3a3eeea
children b5cdae35b1a4
files src/filter.rs src/filter_block.rs src/key_types.rs src/memtable.rs src/options.rs src/skipmap.rs src/table_reader.rs src/types.rs
diffstat 8 files changed, 243 insertions(+), 94 deletions(-) [+]
line wrap: on
line diff
--- a/src/filter.rs	Sun Dec 25 10:47:08 2016 +0000
+++ b/src/filter.rs	Mon Dec 26 09:23:16 2016 +0000
@@ -162,6 +162,12 @@
     internal: FP,
 }
 
+impl<FP: FilterPolicy> InternalFilterPolicy<FP> {
+    pub fn new(inner: FP) -> InternalFilterPolicy<FP> {
+        InternalFilterPolicy { internal: inner }
+    }
+}
+
 impl<FP: FilterPolicy> FilterPolicy for InternalFilterPolicy<FP> {
     fn name(&self) -> &'static str {
         self.internal.name()
@@ -187,6 +193,7 @@
 #[cfg(test)]
 mod tests {
     use super::*;
+    use key_types::LookupKey;
 
     const _BITS_PER_KEY: u32 = 12;
 
@@ -197,6 +204,7 @@
                         "908070605040302010".as_bytes()];
         data
     }
+
     fn create_filter() -> Vec<u8> {
         let fpol = BloomPolicy::new(_BITS_PER_KEY);
         let filter = fpol.create_filter(&input_data());
@@ -205,6 +213,20 @@
         filter
     }
 
+
+    fn create_internalkey_filter() -> Vec<u8> {
+        let fpol = InternalFilterPolicy::new(BloomPolicy::new(_BITS_PER_KEY));
+        let input: Vec<Vec<u8>> = input_data()
+            .into_iter()
+            .map(|k| LookupKey::new(k, 123).internal_key().to_vec())
+            .collect();
+        let input_ = input.iter().map(|k| k.as_slice()).collect();
+
+        let filter = fpol.create_filter(&input_);
+
+        filter
+    }
+
     #[test]
     fn test_filter() {
         let f = create_filter();
@@ -215,6 +237,12 @@
         }
     }
 
+    // This test verifies that InternalFilterPolicy works correctly.
+    #[test]
+    fn test_filter_internal_keys_identical() {
+        assert_eq!(create_filter(), create_internalkey_filter());
+    }
+
     #[test]
     fn hash_test() {
         let d1 = vec![0x62];
--- a/src/filter_block.rs	Sun Dec 25 10:47:08 2016 +0000
+++ b/src/filter_block.rs	Mon Dec 26 09:23:16 2016 +0000
@@ -144,6 +144,8 @@
             return true;
         }
 
+        println!("{:?}", key);
+
         let filter_begin = self.offset_of(get_filter_index(blk_offset, self.filter_base_lg2));
         let filter_end = self.offset_of(get_filter_index(blk_offset, self.filter_base_lg2) + 1);
 
--- a/src/key_types.rs	Sun Dec 25 10:47:08 2016 +0000
+++ b/src/key_types.rs	Mon Dec 26 09:23:16 2016 +0000
@@ -1,11 +1,16 @@
+use options::{CompressionType, int_to_compressiontype};
+use types::{ValueType, SequenceNumber, cmp};
 
-use types::{ValueType, SequenceNumber};
+use std::cmp::Ordering;
 
 use integer_encoding::{FixedInt, VarInt};
 
 // The following typedefs are used to distinguish between the different key formats used internally
 // by different modules.
 
+// TODO: At some point, convert those into actual types with conversions between them. That's a lot
+// of boilerplate, but increases type safety.
+
 /// A MemtableKey consists of the following elements: [keylen, key, tag, (vallen, value)] where
 /// keylen is a varint32 encoding the length of key+tag. tag is a fixed 8 bytes segment encoding
 /// the entry type and the sequence number. vallen and value are optional components at the end.
@@ -141,6 +146,52 @@
     }
 }
 
+pub fn parse_internal_key<'a>(ikey: InternalKey<'a>) -> (CompressionType, u64, UserKey<'a>) {
+    assert!(ikey.len() >= 8);
+
+    let (ctype, seq) = parse_tag(FixedInt::decode_fixed(&ikey[ikey.len() - 8..]));
+    let ctype = int_to_compressiontype(ctype as u32).unwrap_or(CompressionType::CompressionNone);
+
+    return (ctype, seq, &ikey[0..ikey.len() - 8]);
+}
+
+/// An internal comparator wrapping a user-supplied comparator. This comparator is used to compare
+/// memtable keys, which contain length prefixes and a sequence number.
+/// The ordering is determined by asking the wrapped comparator; ties are broken by *reverse*
+/// ordering the sequence numbers. (This means that when having an entry abx/4 and searching for
+/// abx/5, then abx/4 is counted as "greater-or-equal", making snapshot functionality work at all)
+pub fn memtable_key_cmp(a: &[u8], b: &[u8]) -> Ordering {
+    let (akeylen, akeyoff, atag, _, _) = parse_memtable_key(a);
+    let (bkeylen, bkeyoff, btag, _, _) = parse_memtable_key(b);
+
+    let userkey_a = &a[akeyoff..akeyoff + akeylen];
+    let userkey_b = &b[bkeyoff..bkeyoff + bkeylen];
+
+    match cmp(userkey_a, userkey_b) {
+        Ordering::Less => Ordering::Less,
+        Ordering::Greater => Ordering::Greater,
+        Ordering::Equal => {
+            let (_, aseq) = parse_tag(atag);
+            let (_, bseq) = parse_tag(btag);
+
+            // reverse!
+            bseq.cmp(&aseq)
+        }
+    }
+}
+
+/// Same as memtable_key_cmp, but for InternalKeys.
+pub fn internal_key_cmp(a: &[u8], b: &[u8]) -> Ordering {
+    let (_, seqa, keya) = parse_internal_key(a);
+    let (_, seqb, keyb) = parse_internal_key(b);
+
+    match cmp(keya, keyb) {
+        Ordering::Less => Ordering::Less,
+        Ordering::Greater => Ordering::Greater,
+        Ordering::Equal => seqb.cmp(&seqa),
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
--- a/src/memtable.rs	Sun Dec 25 10:47:08 2016 +0000
+++ b/src/memtable.rs	Mon Dec 26 09:23:16 2016 +0000
@@ -11,7 +11,7 @@
 
 impl MemTable {
     pub fn new() -> MemTable {
-        MemTable { map: SkipMap::new() }
+        MemTable { map: SkipMap::new_memtable_map() }
     }
     pub fn approx_mem_usage(&self) -> usize {
         self.map.approx_memory()
--- a/src/options.rs	Sun Dec 25 10:47:08 2016 +0000
+++ b/src/options.rs	Mon Dec 26 09:23:16 2016 +0000
@@ -26,6 +26,7 @@
     // pub logger: Logger,
     pub write_buffer_size: usize,
     pub max_open_files: usize,
+    // pub block_cache: Cache,
     pub block_size: usize,
     pub block_restart_interval: usize,
     pub compression_type: CompressionType,
--- a/src/skipmap.rs	Sun Dec 25 10:47:08 2016 +0000
+++ b/src/skipmap.rs	Mon Dec 26 09:23:16 2016 +0000
@@ -1,6 +1,6 @@
-use types::{cmp, LdbIterator};
+use types::{cmp, CmpFn, LdbIterator};
 use rand::{Rng, SeedableRng, StdRng};
-use key_types::{parse_tag, parse_memtable_key};
+use key_types::memtable_key_cmp;
 
 use std::cmp::Ordering;
 use std::mem::{replace, size_of};
@@ -8,33 +8,6 @@
 const MAX_HEIGHT: usize = 12;
 const BRANCHING_FACTOR: u32 = 4;
 
-/// An internal comparator wrapping a user-supplied comparator. This comparator is used to compare
-/// memtable keys, which contain length prefixes and a sequence number.
-/// The ordering is determined by asking the wrapped comparator; ties are broken by *reverse*
-/// ordering the sequence numbers. (This means that when having an entry abx/4 and searching for
-/// abx/5, then abx/4 is counted as "greater-or-equal", making snapshot functionality work at all)
-fn memtable_key_cmp(a: &[u8], b: &[u8]) -> Ordering {
-    let (akeylen, akeyoff, atag, _, _) = parse_memtable_key(a);
-    let (bkeylen, bkeyoff, btag, _, _) = parse_memtable_key(b);
-
-    let userkey_a = &a[akeyoff..akeyoff + akeylen];
-    let userkey_b = &b[bkeyoff..bkeyoff + bkeylen];
-
-    let userkey_order = cmp(userkey_a, userkey_b);
-    println!("{:?}", userkey_order);
-
-    if userkey_order != Ordering::Equal {
-        userkey_order
-    } else {
-        // look at sequence number, in reverse order
-        let (_, aseq) = parse_tag(atag);
-        let (_, bseq) = parse_tag(btag);
-
-        // reverse!
-        bseq.cmp(&aseq)
-    }
-}
-
 /// A node in a skipmap contains links to the next node and others that are further away (skips);
 /// `skips[0]` is the immediate element after, that is, the element contained in `next`.
 struct Node {
@@ -54,17 +27,19 @@
     len: usize,
     // approximation of memory used.
     approx_mem: usize,
-    cmp: Box<Fn(&[u8], &[u8]) -> Ordering>,
+    cmp: Box<CmpFn>,
 }
 
 impl SkipMap {
-    fn new_standard() -> SkipMap {
+    /// Used for testing: Uses the standard comparator.
+    pub fn new_memtable_map() -> SkipMap {
         let mut skm = SkipMap::new();
-        skm.cmp = Box::new(cmp);
+        skm.cmp = Box::new(memtable_key_cmp);
         skm
     }
 
-    pub fn new() -> SkipMap {
+    /// Returns a SkipMap that uses the memtable comparator (see above).
+    fn new() -> SkipMap {
         let mut s = Vec::new();
         s.resize(MAX_HEIGHT, None);
 
@@ -78,7 +53,7 @@
             rand: StdRng::from_seed(&[0xde, 0xad, 0xbe, 0xef]),
             len: 0,
             approx_mem: size_of::<Self>() + MAX_HEIGHT * size_of::<Option<*mut Node>>(),
-            cmp: Box::new(memtable_key_cmp),
+            cmp: Box::new(cmp),
         }
     }
 
@@ -351,7 +326,7 @@
     use types::*;
 
     pub fn make_skipmap() -> SkipMap {
-        let mut skm = SkipMap::new_standard();
+        let mut skm = SkipMap::new();
         let keys = vec!["aba", "abb", "abc", "abd", "abe", "abf", "abg", "abh", "abi", "abj",
                         "abk", "abl", "abm", "abn", "abo", "abp", "abq", "abr", "abs", "abt",
                         "abu", "abv", "abw", "abx", "aby", "abz"];
@@ -411,7 +386,7 @@
 
     #[test]
     fn test_iterator_0() {
-        let skm = SkipMap::new_standard();
+        let skm = SkipMap::new();
         let mut i = 0;
 
         for (_, _) in skm.iter() {
--- a/src/table_reader.rs	Sun Dec 25 10:47:08 2016 +0000
+++ b/src/table_reader.rs	Mon Dec 26 09:23:16 2016 +0000
@@ -1,11 +1,11 @@
 use block::{Block, BlockIter};
 use blockhandle::BlockHandle;
-use filter::FilterPolicy;
+use filter::{InternalFilterPolicy, FilterPolicy};
 use filter_block::FilterBlockReader;
 use options::{self, CompressionType, Options};
 use table_builder::{self, Footer};
-use types::{cmp, LdbIterator};
-use key_types::InternalKey;
+use types::{cmp, CmpFn, LdbIterator};
+use key_types::{internal_key_cmp, InternalKey};
 
 use std::io::{self, Read, Seek, SeekFrom, Result};
 use std::cmp::Ordering;
@@ -32,26 +32,6 @@
     Ok(buf)
 }
 
-/// Reads a block at location.
-fn read_block<R: Read + Seek>(f: &mut R, location: &BlockHandle) -> Result<TableBlock> {
-    // The block is denoted by offset and length in BlockHandle. A block in an encoded
-    // table is followed by 1B compression type and 4B checksum.
-    let buf = try!(read_bytes(f, location));
-    let compress = try!(read_bytes(f,
-                                   &BlockHandle::new(location.offset() + location.size(),
-                                                     table_builder::TABLE_BLOCK_COMPRESS_LEN)));
-    let cksum = try!(read_bytes(f,
-                                &BlockHandle::new(location.offset() + location.size() +
-                                                  table_builder::TABLE_BLOCK_COMPRESS_LEN,
-                                                  table_builder::TABLE_BLOCK_CKSUM_LEN)));
-    Ok(TableBlock {
-        block: Block::new(buf),
-        checksum: u32::decode_fixed(&cksum),
-        compression: options::int_to_compressiontype(compress[0] as u32)
-            .unwrap_or(CompressionType::CompressionNone),
-    })
-}
-
 struct TableBlock {
     block: Block,
     checksum: u32,
@@ -59,6 +39,26 @@
 }
 
 impl TableBlock {
+    /// Reads a block at location.
+    fn read_block<R: Read + Seek>(f: &mut R, location: &BlockHandle) -> Result<TableBlock> {
+        // The block is denoted by offset and length in BlockHandle. A block in an encoded
+        // table is followed by 1B compression type and 4B checksum.
+        let buf = try!(read_bytes(f, location));
+        let compress = try!(read_bytes(f,
+                                       &BlockHandle::new(location.offset() + location.size(),
+                                                         table_builder::TABLE_BLOCK_COMPRESS_LEN)));
+        let cksum = try!(read_bytes(f,
+                                    &BlockHandle::new(location.offset() + location.size() +
+                                                      table_builder::TABLE_BLOCK_COMPRESS_LEN,
+                                                      table_builder::TABLE_BLOCK_CKSUM_LEN)));
+        Ok(TableBlock {
+            block: Block::new(buf),
+            checksum: u32::decode_fixed(&cksum),
+            compression: options::int_to_compressiontype(compress[0] as u32)
+                .unwrap_or(CompressionType::CompressionNone),
+        })
+    }
+
     /// Verify checksum of block
     fn verify(&self) -> bool {
         let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
@@ -74,6 +74,7 @@
     file_size: usize,
 
     opt: Options,
+    cmp: Box<CmpFn>,
 
     footer: Footer,
     indexblock: Block,
@@ -81,11 +82,12 @@
 }
 
 impl<R: Read + Seek, FP: FilterPolicy> Table<R, FP> {
-    pub fn new(mut file: R, size: usize, fp: FP, opt: Options) -> Result<Table<R, FP>> {
+    /// Creates a new table reader operating on unformatted keys (i.e., UserKey).
+    fn new_raw(mut file: R, size: usize, fp: FP, opt: Options) -> Result<Table<R, FP>> {
         let footer = try!(read_footer(&mut file, size));
 
-        let indexblock = try!(read_block(&mut file, &footer.index));
-        let metaindexblock = try!(read_block(&mut file, &footer.meta_index));
+        let indexblock = try!(TableBlock::read_block(&mut file, &footer.index));
+        let metaindexblock = try!(TableBlock::read_block(&mut file, &footer.meta_index));
 
         if !indexblock.verify() || !metaindexblock.verify() {
             return Err(io::Error::new(io::ErrorKind::InvalidData,
@@ -115,14 +117,28 @@
             file: file,
             file_size: size,
             opt: opt,
+            cmp: Box::new(cmp),
             footer: footer,
             filters: filter_block_reader,
             indexblock: indexblock.block,
         })
     }
 
+    /// Creates a new table reader operating on internal keys (i.e., InternalKey). This means that
+    /// a different comparator (internal_key_cmp) and a different filter policy
+    /// (InternalFilterPolicy) are used.
+    pub fn new(file: R,
+               size: usize,
+               fp: FP,
+               opt: Options)
+               -> Result<Table<R, InternalFilterPolicy<FP>>> {
+        let mut t = try!(Table::new_raw(file, size, InternalFilterPolicy::new(fp), opt));
+        t.cmp = Box::new(internal_key_cmp);
+        Ok(t)
+    }
+
     fn read_block(&mut self, location: &BlockHandle) -> Result<TableBlock> {
-        let b = try!(read_block(&mut self.file, location));
+        let b = try!(TableBlock::read_block(&mut self.file, location));
 
         if !b.verify() {
             Err(io::Error::new(io::ErrorKind::InvalidData, "Data block failed verification"))
@@ -263,7 +279,7 @@
         self.index_block.seek(to);
 
         if let Some((past_block, handle)) = self.index_block.current() {
-            if cmp(to, &past_block) == Ordering::Less {
+            if (self.table.cmp)(to, &past_block) == Ordering::Less {
                 // ok, found right block: continue
                 if let Ok(()) = self.load_block(&handle) {
                     self.current_block.seek(to);
@@ -324,9 +340,11 @@
 #[cfg(test)]
 mod tests {
     use filter::BloomPolicy;
+    use filter::InternalFilterPolicy;
     use options::Options;
     use table_builder::TableBuilder;
     use types::LdbIterator;
+    use key_types::LookupKey;
 
     use std::io::Cursor;
 
@@ -366,6 +384,39 @@
         (d, size)
     }
 
+    fn build_internal_table() -> (Vec<u8>, usize) {
+
+        let mut d = Vec::with_capacity(512);
+        let mut opt = Options::default();
+        opt.block_restart_interval = 2;
+        opt.block_size = 32;
+
+        let mut i = 0 as u64;
+        let data: Vec<(Vec<u8>, &'static str)> = build_data()
+            .into_iter()
+            .map(|(k, v)| {
+                i += 1;
+                (LookupKey::new(k.as_bytes(), i).internal_key().to_vec(), v)
+            })
+            .collect();
+
+        {
+            let mut b =
+                TableBuilder::new(opt, &mut d, InternalFilterPolicy::new(BloomPolicy::new(4)));
+
+            for &(ref k, ref v) in data.iter() {
+                b.add(k.as_slice(), v.as_bytes());
+            }
+
+            b.finish();
+
+        }
+
+        let size = d.len();
+
+        (d, size)
+    }
+
     #[test]
     fn test_table_reader_checksum() {
         let (mut src, size) = build_table();
@@ -373,10 +424,10 @@
 
         src[45] = 0;
 
-        let mut table = Table::new(Cursor::new(&src as &[u8]),
-                                   size,
-                                   BloomPolicy::new(4),
-                                   Options::default())
+        let mut table = Table::new_raw(Cursor::new(&src as &[u8]),
+                                       size,
+                                       BloomPolicy::new(4),
+                                       Options::default())
             .unwrap();
 
         assert!(table.filters.is_some());
@@ -407,10 +458,10 @@
         let (src, size) = build_table();
         let data = build_data();
 
-        let mut table = Table::new(Cursor::new(&src as &[u8]),
-                                   size,
-                                   BloomPolicy::new(4),
-                                   Options::default())
+        let mut table = Table::new_raw(Cursor::new(&src as &[u8]),
+                                       size,
+                                       BloomPolicy::new(4),
+                                       Options::default())
             .unwrap();
         let iter = table.iter();
         let mut i = 0;
@@ -428,10 +479,10 @@
     fn test_table_iterator_filter() {
         let (src, size) = build_table();
 
-        let mut table = Table::new(Cursor::new(&src as &[u8]),
-                                   size,
-                                   BloomPolicy::new(4),
-                                   Options::default())
+        let mut table = Table::new_raw(Cursor::new(&src as &[u8]),
+                                       size,
+                                       BloomPolicy::new(4),
+                                       Options::default())
             .unwrap();
         let filter_reader = table.filters.clone().unwrap();
         let mut iter = table.iter();
@@ -451,10 +502,10 @@
     fn test_table_iterator_state_behavior() {
         let (src, size) = build_table();
 
-        let mut table = Table::new(Cursor::new(&src as &[u8]),
-                                   size,
-                                   BloomPolicy::new(4),
-                                   Options::default())
+        let mut table = Table::new_raw(Cursor::new(&src as &[u8]),
+                                       size,
+                                       BloomPolicy::new(4),
+                                       Options::default())
             .unwrap();
         let mut iter = table.iter();
 
@@ -484,10 +535,10 @@
         let (src, size) = build_table();
         let data = build_data();
 
-        let mut table = Table::new(Cursor::new(&src as &[u8]),
-                                   size,
-                                   BloomPolicy::new(4),
-                                   Options::default())
+        let mut table = Table::new_raw(Cursor::new(&src as &[u8]),
+                                       size,
+                                       BloomPolicy::new(4),
+                                       Options::default())
             .unwrap();
         let mut iter = table.iter();
         let mut i = 0;
@@ -519,10 +570,10 @@
     fn test_table_iterator_seek() {
         let (src, size) = build_table();
 
-        let mut table = Table::new(Cursor::new(&src as &[u8]),
-                                   size,
-                                   BloomPolicy::new(4),
-                                   Options::default())
+        let mut table = Table::new_raw(Cursor::new(&src as &[u8]),
+                                       size,
+                                       BloomPolicy::new(4),
+                                       Options::default())
             .unwrap();
         let mut iter = table.iter();
 
@@ -540,10 +591,10 @@
     fn test_table_get() {
         let (src, size) = build_table();
 
-        let mut table = Table::new(Cursor::new(&src as &[u8]),
-                                   size,
-                                   BloomPolicy::new(4),
-                                   Options::default())
+        let mut table = Table::new_raw(Cursor::new(&src as &[u8]),
+                                       size,
+                                       BloomPolicy::new(4),
+                                       Options::default())
             .unwrap();
 
         assert!(table.get("aaa".as_bytes()).is_none());
@@ -553,4 +604,43 @@
         assert_eq!(table.get("zzz".as_bytes()), Some("111".as_bytes().to_vec()));
         assert!(table.get("zz1".as_bytes()).is_none());
     }
+
+    // This test verifies that the table and filters work with internal keys. This means:
+    // The table contains keys in InternalKey format and it uses a filter wrapped by
+    // InternalFilterPolicy.
+    // All the other tests use raw keys that don't have any internal structure; this is fine in
+    // general, but here we want to see that the other infrastructure works too.
+    #[test]
+    fn test_table_internal_keys() {
+        use key_types::LookupKey;
+
+        let (src, size) = build_internal_table();
+
+        let mut table = Table::new(Cursor::new(&src as &[u8]),
+                                   size,
+                                   BloomPolicy::new(4),
+                                   Options::default())
+            .unwrap();
+        let filter_reader = table.filters.clone().unwrap();
+
+        // Check that we're actually using internal keys
+        for (ref k, _) in table.iter() {
+            assert_eq!(k.len(), 3 + 8);
+        }
+
+        let mut iter = table.iter();
+
+        loop {
+            if let Some((k, _)) = iter.next() {
+                let lk = LookupKey::new(&k, 123);
+                let userkey = lk.user_key();
+
+                assert!(filter_reader.key_may_match(iter.current_block_off, userkey));
+                assert!(!filter_reader.key_may_match(iter.current_block_off,
+                                                     "somerandomkey".as_bytes()));
+            } else {
+                break;
+            }
+        }
+    }
 }
--- a/src/types.rs	Sun Dec 25 10:47:08 2016 +0000
+++ b/src/types.rs	Mon Dec 26 09:23:16 2016 +0000
@@ -21,6 +21,8 @@
     IOError(String),
 }
 
+pub type CmpFn = Fn(&[u8], &[u8]) -> Ordering;
+
 pub fn cmp(a: &[u8], b: &[u8]) -> Ordering {
     a.cmp(b)
 }