changeset 208:acbd2d3dd789

version: Set up mock versions and add test for overlapping_inputs().
author Lewin Bormann <lbo@spheniscida.de>
date Tue, 05 Sep 2017 20:43:36 +0200
parents 58e068ea2b58
children d1d0a777f0d1
files src/filter_block.rs src/table_builder.rs src/table_cache.rs src/version.rs
diffstat 4 files changed, 143 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- a/src/filter_block.rs	Tue Sep 05 19:38:24 2017 +0200
+++ b/src/filter_block.rs	Tue Sep 05 20:43:36 2017 +0200
@@ -22,6 +22,8 @@
 ///
 /// where offsets are 4 bytes, offset of offsets is 4 bytes, and log2 of FILTER_BASE is 1 byte.
 /// Two consecutive filter offsets may be the same.
+///
+/// TODO: See if we can remove the lifetime parameter.
 pub struct FilterBlockBuilder<'a> {
     policy: BoxedFilterPolicy,
     // filters, concatenated
--- a/src/table_builder.rs	Tue Sep 05 19:38:24 2017 +0200
+++ b/src/table_builder.rs	Tue Sep 05 20:43:36 2017 +0200
@@ -200,7 +200,7 @@
         handle
     }
 
-    pub fn finish(mut self) {
+    pub fn finish(mut self) -> usize {
         assert!(self.data_block.is_some());
         let ctype = self.opt.compression_type;
 
@@ -242,6 +242,7 @@
         footer.encode(&mut buf);
 
         self.offset += self.dst.write(&buf[..]).unwrap();
+        self.offset
     }
 }
 
--- a/src/table_cache.rs	Tue Sep 05 19:38:24 2017 +0200
+++ b/src/table_cache.rs	Tue Sep 05 20:43:36 2017 +0200
@@ -15,7 +15,7 @@
 
 const DEFAULT_SUFFIX: &str = "ldb";
 
-fn table_name(name: &str, num: u64, suff: &str) -> String {
+pub fn table_name(name: &str, num: u64, suff: &str) -> String {
     assert!(num > 0);
     format!("{}/{:06}.{}", name, num, suff)
 }
--- a/src/version.rs	Tue Sep 05 19:38:24 2017 +0200
+++ b/src/version.rs	Tue Sep 05 20:43:36 2017 +0200
@@ -234,7 +234,12 @@
         let (mut ubegin, mut uend) = (parse_internal_key(begin).2.to_vec(),
                                       parse_internal_key(end).2.to_vec());
 
-        loop {
+        let mut done = false;
+        while !done {
+            // By default, only do one search. In the special case outlined below, done is set to
+            // false in order to restart the search from scratch.
+            done = true;
+
             'inner: for f in self.files[level].iter() {
                 let ((_, _, fsmallest), (_, _, flargest)) = (parse_internal_key(&(*f).smallest),
                                                              parse_internal_key(&(*f).largest));
@@ -254,17 +259,22 @@
                            self.user_cmp.cmp(fsmallest, &ubegin) == Ordering::Less {
                             ubegin = fsmallest.to_vec();
                             inputs.truncate(0);
+                            done = false;
                             break 'inner;
                         } else if !uend.is_empty() &&
                                   self.user_cmp.cmp(flargest, &uend) == Ordering::Greater {
                             uend = flargest.to_vec();
                             inputs.truncate(0);
+                            done = false;
                             break 'inner;
+                        } else {
                         }
                     }
                 }
             }
         }
+
+        inputs
     }
 
     fn new_concat_iter(&self, level: usize) -> VersionIter {
@@ -420,7 +430,16 @@
 #[cfg(test)]
 mod tests {
     use super::*;
+
+    use std::default::Default;
+    use std::path::Path;
+
     use cmp::DefaultCmp;
+    use env::Env;
+    use mem_env::MemEnv;
+    use options::Options;
+    use table_builder::TableBuilder;
+    use table_cache::table_name;
 
     fn new_file(num: u64, smallest: &[u8], largest: &[u8]) -> FileMetaHandle {
         Rc::new(FileMetaData {
@@ -432,6 +451,124 @@
         })
     }
 
+    /// write_table creates a table with the given number and contents (must be sorted!) in the
+    /// memenv. The sequence numbers given to keys start with startseq.
+    fn write_table(me: &MemEnv,
+                   contents: &[(&[u8], &[u8])],
+                   startseq: u64,
+                   num: u64)
+                   -> FileMetaHandle {
+        let dst = me.open_writable_file(Path::new(&table_name("db", num, "ldb"))).unwrap();
+        let mut seq = startseq;
+        let keys: Vec<Vec<u8>> = contents.iter()
+            .map(|&(k, _)| {
+                seq += 1;
+                LookupKey::new(k, seq).internal_key().to_vec()
+            })
+            .collect();
+
+        let mut tbl = TableBuilder::new(Options::default(), dst);
+        for i in 0..contents.len() {
+            tbl.add(&keys[i], contents[i].1);
+            seq += 1;
+        }
+
+        let mut f = new_file(num,
+                             LookupKey::new(contents[0].0, MAX_SEQUENCE_NUMBER).internal_key(),
+                             LookupKey::new(contents[contents.len() - 1].0, 0).internal_key());
+        Rc::get_mut(&mut f).unwrap().size = tbl.finish() as u64;
+        f
+    }
+
+    fn make_version() -> Version {
+        time_test!("make_version");
+        let mut opts = Options::default();
+        let env = MemEnv::new();
+
+        // Level 0 (overlapping)
+        let f1: &[(&[u8], &[u8])] = &[("aaa".as_bytes(), "val1".as_bytes()),
+                                      ("aab".as_bytes(), "val2".as_bytes()),
+                                      ("aba".as_bytes(), "val3".as_bytes())];
+        let t1 = write_table(&env, f1, 1, 1);
+        let f2: &[(&[u8], &[u8])] = &[("aax".as_bytes(), "val1".as_bytes()),
+                                      ("bab".as_bytes(), "val2".as_bytes()),
+                                      ("bba".as_bytes(), "val3".as_bytes())];
+        let t2 = write_table(&env, f2, 4, 2);
+        // Level 1
+        let f3: &[(&[u8], &[u8])] = &[("caa".as_bytes(), "val1".as_bytes()),
+                                      ("cab".as_bytes(), "val2".as_bytes()),
+                                      ("cba".as_bytes(), "val3".as_bytes())];
+        let t3 = write_table(&env, f3, 7, 3);
+        let f4: &[(&[u8], &[u8])] = &[("daa".as_bytes(), "val1".as_bytes()),
+                                      ("dab".as_bytes(), "val2".as_bytes()),
+                                      ("dba".as_bytes(), "val3".as_bytes())];
+        let t4 = write_table(&env, f4, 10, 4);
+        let f5: &[(&[u8], &[u8])] = &[("eaa".as_bytes(), "val1".as_bytes()),
+                                      ("eab".as_bytes(), "val2".as_bytes()),
+                                      ("eba".as_bytes(), "val3".as_bytes())];
+        let t5 = write_table(&env, f5, 13, 5);
+        // Level 2
+        let f6: &[(&[u8], &[u8])] = &[("faa".as_bytes(), "val1".as_bytes()),
+                                      ("fab".as_bytes(), "val2".as_bytes()),
+                                      ("fba".as_bytes(), "val3".as_bytes())];
+        let t6 = write_table(&env, f6, 16, 6);
+        let f7: &[(&[u8], &[u8])] = &[("gaa".as_bytes(), "val1".as_bytes()),
+                                      ("gab".as_bytes(), "val2".as_bytes()),
+                                      ("gba".as_bytes(), "val3".as_bytes())];
+        let t7 = write_table(&env, f7, 19, 7);
+
+        opts.set_env(Box::new(env));
+        let cache = TableCache::new("db", opts, 100);
+        let mut v = Version::new(Rc::new(cache), Rc::new(Box::new(DefaultCmp)));
+        v.files[0] = vec![t1, t2];
+        v.files[1] = vec![t3, t4, t5];
+        v.files[2] = vec![t6, t7];
+        v
+    }
+
+    #[test]
+    fn test_version_overlapping_inputs() {
+        let v = make_version();
+
+        time_test!("overlapping-inputs");
+        {
+            time_test!("overlapping-inputs-1");
+            // Range is expanded in overlapping level-0 files.
+            let from = LookupKey::new("aab".as_bytes(), MAX_SEQUENCE_NUMBER);
+            let to = LookupKey::new("aae".as_bytes(), 0);
+            let r = v.overlapping_inputs(0, from.internal_key(), to.internal_key());
+            assert_eq!(r.len(), 2);
+            assert_eq!((*r[0]).num, 1);
+            assert_eq!((*r[1]).num, 2);
+        }
+        {
+            let from = LookupKey::new("cab".as_bytes(), MAX_SEQUENCE_NUMBER);
+            let to = LookupKey::new("cbx".as_bytes(), 0);
+            // expect one file.
+            let r = v.overlapping_inputs(1, from.internal_key(), to.internal_key());
+            assert_eq!(r.len(), 1);
+            assert_eq!((*r[0]).num, 3);
+        }
+        {
+            let from = LookupKey::new("cab".as_bytes(), MAX_SEQUENCE_NUMBER);
+            let to = LookupKey::new("ebx".as_bytes(), 0);
+            let r = v.overlapping_inputs(1, from.internal_key(), to.internal_key());
+            // Assert that correct number of files and correct files were returned.
+            assert_eq!(r.len(), 3);
+            assert_eq!((*r[0]).num, 3);
+            assert_eq!((*r[1]).num, 4);
+            assert_eq!((*r[2]).num, 5);
+        }
+        {
+            let from = LookupKey::new("hhh".as_bytes(), MAX_SEQUENCE_NUMBER);
+            let to = LookupKey::new("ijk".as_bytes(), 0);
+            let r = v.overlapping_inputs(2, from.internal_key(), to.internal_key());
+            assert_eq!(r.len(), 0);
+            let r = v.overlapping_inputs(1, from.internal_key(), to.internal_key());
+            assert_eq!(r.len(), 0);
+        }
+    }
+
     #[test]
     fn test_version_key_ordering() {
         time_test!();