changeset 90:ab7984e1e49c

Update memtable to exhibit correct snapshot/sequencing behavior That is: Given an entry X with sequence numbers 1 and 4, looking it up with a sequence number of 3 should yield the value of X at 1, and looking it up at 0 should give a "not found".
author Lewin Bormann <lbo@spheniscida.de>
date Fri, 02 Sep 2016 22:12:28 +0200
parents 89810ac42bd1
children f6b6a82fb7cd
files src/memtable.rs
diffstat 1 files changed, 171 insertions(+), 119 deletions(-) [+]
line wrap: on
line diff
--- a/src/memtable.rs	Fri Sep 02 22:10:51 2016 +0200
+++ b/src/memtable.rs	Fri Sep 02 22:12:28 2016 +0200
@@ -5,6 +5,40 @@
 
 use integer_encoding::{FixedInt, VarInt};
 
+/// An internal comparator wrapping a user-supplied comparator. This comparator is used to compare
+/// memtable keys, which contain length prefixes and a sequence number.
+/// The ordering is determined by asking the wrapped comparator; ties are broken by *reverse*
+/// ordering the sequence numbers. (This means that when having an entry abx/4 and searching for
+/// abx/5, then abx/4 is counted as "greater-or-equal", making snapshot functionality work at all)
+#[derive(Clone, Copy)]
+struct MemtableKeyComparator<C: Comparator> {
+    internal: C,
+}
+
+impl<C: Comparator> Comparator for MemtableKeyComparator<C> {
+    fn cmp(&self, a: &[u8], b: &[u8]) -> Ordering {
+        let (akeylen, akeyoff, atag, _, _) = parse_memtable_key(a);
+        let (bkeylen, bkeyoff, btag, _, _) = parse_memtable_key(b);
+
+        let userkey_a = &a[akeyoff..akeyoff + akeylen];
+        let userkey_b = &b[bkeyoff..bkeyoff + bkeylen];
+
+        let userkey_order = self.internal.cmp(userkey_a, userkey_b);
+        println!("{:?}", userkey_order);
+
+        if userkey_order != Ordering::Equal {
+            userkey_order
+        } else {
+            // look at sequence number, in reverse order
+            let (_, aseq) = parse_tag(atag);
+            let (_, bseq) = parse_tag(btag);
+
+            // reverse!
+            bseq.cmp(&aseq)
+        }
+    }
+}
+
 pub struct LookupKey {
     key: Vec<u8>,
     key_offset: usize,
@@ -12,17 +46,20 @@
 
 /// Encapsulates a user key + sequence number, which is used for lookups in the internal map
 /// implementation of a MemTable.
+/// Format: [keylen: varint32, key: *u8, tag: u64]
+/// keylen is the length of key plus 8 (for the tag; this for LevelDB compatibility)
 impl LookupKey {
     #[allow(unused_assignments)]
     fn new(k: &[u8], s: SequenceNumber) -> LookupKey {
         let mut key = Vec::with_capacity(k.len() + k.len().required_space() +
                                          <u64 as FixedInt>::required_space());
+        let internal_keylen = k.len() + 8;
         let mut i = 0;
 
-        key.reserve(8 + k.len().required_space() + k.len());
+        key.reserve(internal_keylen.required_space() + internal_keylen);
 
-        key.resize(k.len().required_space(), 0);
-        i += k.len().encode_var(&mut key[i..]);
+        key.resize(internal_keylen.required_space(), 0);
+        i += internal_keylen.encode_var(&mut key[i..]);
 
         key.extend_from_slice(k);
         i += k.len();
@@ -53,9 +90,82 @@
     }
 }
 
+/// Parses a tag into (type, sequence number)
+fn parse_tag(tag: u64) -> (u8, u64) {
+    let seq = tag >> 8;
+    let typ = tag & 0xff;
+    (typ as u8, seq)
+}
+
+/// A memtable key is a bytestring containing (keylen, key, tag, vallen, val). This function
+/// builds such a key. It's called key because the underlying Map implementation will only be
+/// concerned with keys; the value field is not used (instead, the value is encoded in the key,
+/// and for lookups we just search for the next bigger entry).
+/// keylen is the length of key + 8 (to account for the tag)
+fn build_memtable_key(key: &[u8], value: &[u8], t: ValueType, seq: SequenceNumber) -> Vec<u8> {
+    // We are using the original LevelDB approach here -- encoding key and value into the
+    // key that is used for insertion into the SkipMap.
+    // The format is: [key_size: varint32, key_data: [u8], flags: u64, value_size: varint32,
+    // value_data: [u8]]
+
+    let mut i = 0;
+    let keysize = key.len() + 8;
+    let valsize = value.len();
+
+    let mut buf = Vec::with_capacity(keysize + valsize + keysize.required_space() +
+                                     valsize.required_space() +
+                                     <u64 as FixedInt>::required_space());
+    buf.resize(keysize.required_space(), 0);
+    i += keysize.encode_var(&mut buf[i..]);
+
+    buf.extend(key.iter());
+    i += key.len();
+
+    let flag = (t as u64) | (seq << 8);
+    buf.resize(i + <u64 as FixedInt>::required_space(), 0);
+    flag.encode_fixed(&mut buf[i..]);
+    i += <u64 as FixedInt>::required_space();
+
+    buf.resize(i + valsize.required_space(), 0);
+    i += valsize.encode_var(&mut buf[i..]);
+
+    buf.extend(value.iter());
+    i += value.len();
+
+    assert_eq!(i, buf.len());
+    buf
+}
+
+/// Parses a memtable key and returns  (keylen, key offset, tag, vallen, val offset).
+/// If the key only contains (keylen, key, tag), the vallen and val offset return values will be
+/// meaningless.
+fn parse_memtable_key(mkey: &[u8]) -> (usize, usize, u64, usize, usize) {
+    let (keylen, mut i): (usize, usize) = VarInt::decode_var(&mkey);
+
+    let keyoff = i;
+    i += keylen - 8;
+
+    if mkey.len() > i {
+        let tag = FixedInt::decode_fixed(&mkey[i..i + 8]);
+        i += 8;
+
+        let (vallen, j): (usize, usize) = VarInt::decode_var(&mkey[i..]);
+
+        i += j;
+
+        let valoff = i;
+
+        return (keylen - 8, keyoff, tag, vallen, valoff);
+    } else {
+        return (keylen - 8, keyoff, 0, 0, 0);
+    }
+}
+
+
 /// Provides Insert/Get/Iterate, based on the SkipMap implementation.
 pub struct MemTable<C: Comparator> {
-    map: SkipMap<C>,
+    map: SkipMap<MemtableKeyComparator<C>>,
+    cmp: C,
 }
 
 impl MemTable<StandardComparator> {
@@ -66,77 +176,17 @@
 
 impl<C: Comparator> MemTable<C> {
     pub fn new_custom_cmp(comparator: C) -> MemTable<C> {
-        MemTable { map: SkipMap::new_with_cmp(comparator) }
+        MemTable {
+            map: SkipMap::new_with_cmp(MemtableKeyComparator { internal: comparator }),
+            cmp: comparator,
+        }
     }
     pub fn approx_mem_usage(&self) -> usize {
         self.map.approx_memory()
     }
 
     pub fn add(&mut self, seq: SequenceNumber, t: ValueType, key: &[u8], value: &[u8]) {
-        self.map.insert(Self::build_memtable_key(key, value, t, seq), Vec::new())
-    }
-
-    /// A memtable key is a bytestring containing (keylen, key, tag, vallen, val). This function
-    /// builds such a key. It's called key because the underlying Map implementation will only be
-    /// concerned with keys; the value field is not used (instead, the value is encoded in the key,
-    /// and for lookups we just search for the next bigger entry).
-    fn build_memtable_key(key: &[u8], value: &[u8], t: ValueType, seq: SequenceNumber) -> Vec<u8> {
-        // We are using the original LevelDB approach here -- encoding key and value into the
-        // key that is used for insertion into the SkipMap.
-        // The format is: [key_size: varint32, key_data: [u8], flags: u64, value_size: varint32,
-        // value_data: [u8]]
-
-        let mut i = 0;
-        let keysize = key.len();
-        let valsize = value.len();
-
-        let mut buf = Vec::with_capacity(keysize + valsize + keysize.required_space() +
-                                         valsize.required_space() +
-                                         <u64 as FixedInt>::required_space());
-        buf.resize(keysize.required_space(), 0);
-        i += keysize.encode_var(&mut buf[i..]);
-
-        buf.extend(key.iter());
-        i += key.len();
-
-        let flag = (t as u64) | (seq << 8);
-        buf.resize(i + <u64 as FixedInt>::required_space(), 0);
-        flag.encode_fixed(&mut buf[i..]);
-        i += <u64 as FixedInt>::required_space();
-
-        buf.resize(i + valsize.required_space(), 0);
-        i += valsize.encode_var(&mut buf[i..]);
-
-        buf.extend(value.iter());
-        i += value.len();
-
-        assert_eq!(i, buf.len());
-        buf
-    }
-
-    /// Parses a memtable key and returns  (keylen, key offset, tag, vallen, val offset).
-    /// If the key only contains (keylen, key, tag), the vallen and val offset return values will be
-    /// meaningless.
-    fn parse_memtable_key(mkey: &[u8]) -> (usize, usize, u64, usize, usize) {
-        let (keylen, mut i): (usize, usize) = VarInt::decode_var(&mkey);
-
-        let keyoff = i;
-        i += keylen;
-
-        if mkey.len() > i {
-            let tag = FixedInt::decode_fixed(&mkey[i..i + 8]);
-            i += 8;
-
-            let (vallen, j): (usize, usize) = VarInt::decode_var(&mkey[i..]);
-
-            i += j;
-
-            let valoff = i;
-
-            return (keylen, keyoff, tag, vallen, valoff);
-        } else {
-            return (keylen, keyoff, 0, 0, 0);
-        }
+        self.map.insert(build_memtable_key(key, value, t, seq), Vec::new())
     }
 
     #[allow(unused_variables)]
@@ -146,11 +196,13 @@
 
         if let Some(e) = iter.current() {
             let foundkey = e.0;
-            let (lkeylen, lkeyoff, _, _, _) = Self::parse_memtable_key(key.memtable_key());
-            let (fkeylen, fkeyoff, tag, vallen, valoff) = Self::parse_memtable_key(foundkey);
+            let (lkeylen, lkeyoff, _, _, _) = parse_memtable_key(key.memtable_key());
+            let (fkeylen, fkeyoff, tag, vallen, valoff) = parse_memtable_key(foundkey);
 
-            if C::cmp(&key.memtable_key()[lkeyoff..lkeyoff + lkeylen],
-                      &foundkey[fkeyoff..fkeyoff + fkeylen]) == Ordering::Equal {
+            // Compare user key -- if equal, proceed
+            if self.cmp.cmp(&key.memtable_key()[lkeyoff..lkeyoff + lkeylen],
+                            &foundkey[fkeyoff..fkeyoff + fkeylen]) ==
+               Ordering::Equal {
                 if tag & 0xff == ValueType::TypeValue as u64 {
                     return Result::Ok(foundkey[valoff..valoff + vallen].to_vec());
                 } else {
@@ -171,7 +223,7 @@
 
 pub struct MemtableIterator<'a, C: 'a + Comparator> {
     _tbl: &'a MemTable<C>,
-    skipmapiter: SkipMapIter<'a, C>,
+    skipmapiter: SkipMapIter<'a, MemtableKeyComparator<C>>,
 }
 
 impl<'a, C: 'a + Comparator> Iterator for MemtableIterator<'a, C> {
@@ -180,8 +232,7 @@
     fn next(&mut self) -> Option<Self::Item> {
         loop {
             if let Some((foundkey, _)) = self.skipmapiter.next() {
-                let (keylen, keyoff, tag, vallen, valoff) =
-                    MemTable::<C>::parse_memtable_key(foundkey);
+                let (keylen, keyoff, tag, vallen, valoff) = parse_memtable_key(foundkey);
 
                 if tag & 0xff == ValueType::TypeValue as u64 {
                     return Some((&foundkey[keyoff..keyoff + keylen],
@@ -203,8 +254,7 @@
     fn prev(&mut self) -> Option<Self::Item> {
         loop {
             if let Some((foundkey, _)) = self.skipmapiter.prev() {
-                let (keylen, keyoff, tag, vallen, valoff) =
-                    MemTable::<C>::parse_memtable_key(foundkey);
+                let (keylen, keyoff, tag, vallen, valoff) = parse_memtable_key(foundkey);
 
                 if tag & 0xff == ValueType::TypeValue as u64 {
                     return Some((&foundkey[keyoff..keyoff + keylen],
@@ -226,7 +276,7 @@
         }
 
         if let Some((foundkey, _)) = self.skipmapiter.current() {
-            let (keylen, keyoff, tag, vallen, valoff) = MemTable::<C>::parse_memtable_key(foundkey);
+            let (keylen, keyoff, tag, vallen, valoff) = parse_memtable_key(foundkey);
 
             if tag & 0xff == ValueType::TypeValue as u64 {
                 return Some((&foundkey[keyoff..keyoff + keylen],
@@ -247,11 +297,13 @@
 #[allow(unused_variables)]
 mod tests {
     use super::*;
+    use super::{parse_memtable_key, parse_tag};
     use types::*;
 
     fn get_memtable() -> MemTable<StandardComparator> {
         let mut mt = MemTable::new();
-        let entries = vec![(120, "abc", "123"),
+        let entries = vec![(115, "abc", "122"),
+                           (120, "abc", "123"),
                            (121, "abd", "124"),
                            (122, "abe", "125"),
                            (123, "abf", "126")];
@@ -263,6 +315,12 @@
     }
 
     #[test]
+    fn test_memtable_parse_tag() {
+        let tag = (12345 << 8) | 67;
+        assert_eq!(parse_tag(tag), (67, 12345));
+    }
+
+    #[test]
     fn test_memtable_lookupkey() {
         use integer_encoding::VarInt;
 
@@ -274,7 +332,7 @@
         assert_eq!(lk1.key.capacity(), 14);
 
         assert_eq!(lk1.user_key(), "abcde".as_bytes());
-        assert_eq!(u32::decode_var(lk1.memtable_key()), (5, 1));
+        assert_eq!(u32::decode_var(lk1.memtable_key()), (13, 1));
         assert_eq!(lk2.internal_key(),
                    vec![120, 121, 97, 98, 120, 121, 1, 97, 0, 0, 0, 0, 0, 0].as_slice());
     }
@@ -288,32 +346,33 @@
                "123".as_bytes());
 
         assert_eq!(mt.map.iter().next().unwrap().0,
-                   vec![3, 97, 98, 99, 1, 123, 0, 0, 0, 0, 0, 0, 3, 49, 50, 51].as_slice());
+                   vec![11, 97, 98, 99, 1, 123, 0, 0, 0, 0, 0, 0, 3, 49, 50, 51].as_slice());
     }
 
     #[test]
     fn test_memtable_add_get() {
         let mt = get_memtable();
 
-        if let Result::Ok(v) = mt.get(&LookupKey::new("abc".as_bytes(), 120)) {
-            assert_eq!(v, "123".as_bytes());
+        // Smaller sequence number doesn't find entry
+        if let Result::Ok(v) = mt.get(&LookupKey::new("abc".as_bytes(), 110)) {
+            println!("{:?}", v);
+            panic!("found");
+        }
+
+        // Bigger sequence number falls back to next smaller
+        if let Result::Ok(v) = mt.get(&LookupKey::new("abc".as_bytes(), 116)) {
+            assert_eq!(v, "122".as_bytes());
         } else {
             panic!("not found");
         }
 
-        // Smaller sequence number than actual one still produces result
-        if let Result::Ok(v) = mt.get(&LookupKey::new("abc".as_bytes(), 119)) {
+        // Exact match works
+        if let Result::Ok(v) = mt.get(&LookupKey::new("abc".as_bytes(), 120)) {
             assert_eq!(v, "123".as_bytes());
         } else {
             panic!("not found");
         }
 
-        // Bigger sequence number doesn't
-        if let Result::Ok(v) = mt.get(&LookupKey::new("abc".as_bytes(), 124)) {
-            println!("{:?}", v);
-            panic!("found");
-        }
-
         if let Result::Ok(v) = mt.get(&LookupKey::new("abe".as_bytes(), 122)) {
             assert_eq!(v, "125".as_bytes());
         } else {
@@ -334,33 +393,22 @@
     }
 
     #[test]
-    fn test_memtable_iterator() {
+    fn test_memtable_iterator_fwd_seek() {
         let mt = get_memtable();
-        let mut iter = mt.iter();
-
-        iter.next();
-        assert!(iter.valid());
-        assert_eq!(iter.current().unwrap().0, vec![97, 98, 99].as_slice());
-        assert_eq!(iter.current().unwrap().1, vec![49, 50, 51].as_slice());
-        iter.seek("abf".as_bytes());
-        assert_eq!(iter.current().unwrap().0, vec![97, 98, 102].as_slice());
-        assert_eq!(iter.current().unwrap().1, vec![49, 50, 54].as_slice());
+        let iter = mt.iter();
 
-        iter.seek("abc".as_bytes());
-        assert_eq!(iter.current().unwrap().0, vec![97, 98, 99].as_slice());
-        assert_eq!(iter.current().unwrap().1, vec![49, 50, 51].as_slice());
-
-        iter.seek("abx".as_bytes());
-        assert!(!iter.valid());
-        assert!(iter.current().is_none());
+        let expected = vec!["123".as_bytes(), /* i.e., the abc entry with
+                                               * higher sequence number comes first */
+                            "122".as_bytes(),
+                            "124".as_bytes(),
+                            "125".as_bytes(),
+                            "126".as_bytes()];
+        let mut i = 0;
 
-        iter.seek("ab0".as_bytes());
-        assert!(iter.valid());
-        assert_eq!(iter.current().unwrap().0, vec![97, 98, 99].as_slice());
-
-        iter.prev();
-        assert!(!iter.valid());
-        assert!(iter.current().is_none());
+        for (k, v) in iter {
+            assert_eq!(v, expected[i]);
+            i += 1;
+        }
     }
 
     #[test]
@@ -374,9 +422,14 @@
 
         iter.next();
         assert!(iter.valid());
+        assert_eq!(iter.current().unwrap().0, vec![97, 98, 99].as_slice());
+
+        iter.next();
+        assert!(iter.valid());
         assert_eq!(iter.current().unwrap().0, vec![97, 98, 100].as_slice());
 
         iter.prev();
+        iter.prev();
         assert!(iter.valid());
         assert_eq!(iter.current().unwrap().0, vec![97, 98, 99].as_slice());
 
@@ -386,9 +439,8 @@
 
     #[test]
     fn test_memtable_parse_key() {
-        let key = vec![3, 1, 2, 3, 1, 123, 0, 0, 0, 0, 0, 0, 3, 4, 5, 6];
-        let (keylen, keyoff, tag, vallen, valoff) =
-            MemTable::<StandardComparator>::parse_memtable_key(&key);
+        let key = vec![11, 1, 2, 3, 1, 123, 0, 0, 0, 0, 0, 0, 3, 4, 5, 6];
+        let (keylen, keyoff, tag, vallen, valoff) = parse_memtable_key(&key);
         assert_eq!(keylen, 3);
         assert_eq!(&key[keyoff..keyoff + keylen], vec![1, 2, 3].as_slice());
         assert_eq!(tag, 123 << 8 | 1);