changeset 235:c5a6d5da8380

version_set: Various cleanups and new test for Builder.
author Lewin Bormann <lbo@spheniscida.de>
date Mon, 11 Sep 2017 19:10:35 +0200
parents 950bdfc410bc
children 0c747fecd98f
files src/version.rs src/version_set.rs
diffstat 2 files changed, 100 insertions(+), 30 deletions(-) [+]
line wrap: on
line diff
--- a/src/version.rs	Mon Sep 11 18:11:45 2017 +0200
+++ b/src/version.rs	Mon Sep 11 19:10:35 2017 +0200
@@ -146,6 +146,8 @@
         }
     }
 
+    /// update_stats updates the number of seeks, and remembers files with too many seeks as
+    /// compaction candidates.
     fn update_stats(&mut self, stats: GetStats) -> bool {
         if let Some(file) = stats.file {
             if file.borrow().allowed_seeks <= 1 && self.file_to_compact.is_none() {
--- a/src/version_set.rs	Mon Sep 11 18:11:45 2017 +0200
+++ b/src/version_set.rs	Mon Sep 11 19:10:35 2017 +0200
@@ -382,7 +382,7 @@
         self.finalize(&mut v);
 
         if self.descriptor_log.is_none() {
-            let descname = format!("{}/MANIFEST-{:06}", self.dbname, self.manifest_num);
+            let descname = manifest_file_name(&self.dbname, self.manifest_num);
             edit.set_next_file(self.next_file_num);
             self.descriptor_log =
                 Some(LogWriter::new(self.opt.env.open_writable_file(Path::new(&descname))?));
@@ -442,7 +442,7 @@
         let mut current = String::new();
         {
             let mut f =
-                self.opt.env.open_sequential_file(Path::new(&format!("{}/CURRENT", self.dbname)))?;
+                self.opt.env.open_sequential_file(Path::new(&current_file_name(&self.dbname)))?;
             f.read_to_string(&mut current)?;
         }
         if current.is_empty() || !current.ends_with('\n') {
@@ -572,6 +572,8 @@
         }
     }
 
+    /// apply applies the edits recorded in edit to the builder state. compaction pointers are
+    /// copied to the supplied compaction_ptrs array.
     fn apply(&mut self, edit: &VersionEdit, compaction_ptrs: &mut [Vec<u8>; NUM_LEVELS]) {
         for c in edit.compaction_ptrs.iter() {
             compaction_ptrs[c.level] = c.key.clone();
@@ -585,15 +587,17 @@
             if f.allowed_seeks < 100 {
                 f.allowed_seeks = 100;
             }
-            for i in 0..self.deleted[level].len() {
-                if self.deleted[level][i] == f.num {
-                    self.deleted[level].swap_remove(i);
-                }
-            }
+            // Remove this file from the list of deleted files.
+            self.deleted[level] = self.deleted[level]
+                .iter()
+                .filter_map(|d| if *d != f.num { Some(*d) } else { None })
+                .collect();
             self.added[level].push(share(f));
         }
     }
 
+    /// maybe_add_file adds a file f at level to version v, if it's not already marked as deleted
+    /// in this edit. It also asserts that the ordering of files is preserved.
     fn maybe_add_file(&mut self,
                       cmp: &InternalKeyCmp,
                       v: &mut Version,
@@ -615,6 +619,8 @@
         v.files[level].push(f);
     }
 
+    /// save_to saves the edits applied to the builder to v, adding all non-deleted files from
+    /// Version base to v.
     fn save_to(&mut self, cmp: &InternalKeyCmp, base: &Shared<Version>, v: &mut Version) {
         for level in 0..NUM_LEVELS {
             sort_files_by_smallest(cmp, &mut self.added[level]);
@@ -625,10 +631,10 @@
             let basefiles = base.borrow().files[level].clone();
             v.files[level].reserve(basefiles.len() + self.added[level].len());
 
-            let mut iadded = added.into_iter();
-            let mut ibasefiles = basefiles.into_iter();
-            let merged = merge_iters(&mut iadded,
-                                     &mut ibasefiles,
+            let iadded = added.into_iter();
+            let ibasefiles = basefiles.into_iter();
+            let merged = merge_iters(iadded,
+                                     ibasefiles,
                                      |a, b| cmp.cmp(&a.borrow().smallest, &b.borrow().smallest));
             for m in merged {
                 self.maybe_add_file(cmp, v, level, m);
@@ -641,22 +647,37 @@
             for i in 1..v.files[level].len() {
                 let (prev_end, this_begin) = (&v.files[level][i - 1].borrow().largest,
                                               &v.files[level][i].borrow().smallest);
-                assert!(cmp.cmp(prev_end, this_begin) >= Ordering::Equal);
+                assert!(cmp.cmp(prev_end, this_begin) < Ordering::Equal);
             }
         }
     }
 }
 
+fn manifest_name(file_num: FileNum) -> String {
+    format!("MANIFEST-{:06}", file_num)
+}
+
+fn manifest_file_name(dbname: &str, file_num: FileNum) -> String {
+    format!("{}/{}", dbname, manifest_name(file_num))
+}
+
+fn temp_file_name(dbname: &str, file_num: FileNum) -> String {
+    format!("{}/{:06}.dbtmp", dbname, file_num)
+}
+
+fn current_file_name(dbname: &str) -> String {
+    format!("{}/CURRENT", dbname)
+}
+
 fn set_current_file(env: &Box<Env>, dbname: &str, manifest_file_num: FileNum) -> Result<()> {
-    let manifest_base = format!("MANIFEST-{:06}", manifest_file_num);
-    let tempfile = format!("{}/{}.dbtmp", dbname, manifest_file_num);
-
+    let manifest_base = manifest_name(manifest_file_num);
+    let tempfile = temp_file_name(dbname, manifest_file_num);
     {
         let mut f = env.open_writable_file(Path::new(&tempfile))?;
         f.write(manifest_base.as_bytes())?;
         f.write("\n".as_bytes())?;
     }
-    let currentfile = format!("{}/CURRENT", dbname);
+    let currentfile = current_file_name(dbname);
     if let Err(e) = env.rename(Path::new(&tempfile), Path::new(&currentfile)) {
         // ignore error.
         env.delete(Path::new(&tempfile)).is_ok();
@@ -675,8 +696,8 @@
                C: Fn(&Item, &Item) -> Ordering,
                I: Iterator<Item = Item>,
                J: Iterator<Item = Item>>
-    (iter_a: &mut I,
-     iter_b: &mut J,
+    (mut iter_a: I,
+     mut iter_b: J,
      cmp: C)
      -> Vec<Item> {
     let mut a = iter_a.next();
@@ -750,30 +771,40 @@
 
     fn example_files() -> Vec<FileMetaHandle> {
         let mut f1 = FileMetaData::default();
+        f1.num = 1;
+        f1.size = 10;
         f1.smallest = "f".as_bytes().to_vec();
         f1.largest = "g".as_bytes().to_vec();
         let mut f2 = FileMetaData::default();
+        f2.num = 2;
+        f2.size = 20;
         f2.smallest = "e".as_bytes().to_vec();
         f2.largest = "f".as_bytes().to_vec();
         let mut f3 = FileMetaData::default();
+        f3.num = 3;
+        f3.size = 30;
         f3.smallest = "a".as_bytes().to_vec();
         f3.largest = "b".as_bytes().to_vec();
         let mut f4 = FileMetaData::default();
+        f4.num = 4;
+        f4.size = 40;
         f4.smallest = "q".as_bytes().to_vec();
         f4.largest = "z".as_bytes().to_vec();
         vec![f1, f2, f3, f4].into_iter().map(share).collect()
     }
 
     #[test]
+    fn test_version_set_merge_iters() {
+        let v1 = vec![2, 4, 6, 8, 10];
+        let v2 = vec![1, 3, 5, 7];
+        assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8, 10],
+                   merge_iters(v1.into_iter(), v2.into_iter(), |a, b| a.cmp(&b)));
+
+    }
+
+    #[test]
     fn test_version_set_total_size() {
-        let mut f1 = FileMetaData::default();
-        f1.size = 10;
-        let mut f2 = FileMetaData::default();
-        f2.size = 20;
-        let mut f3 = FileMetaData::default();
-        f3.size = 30;
-        let files = vec![share(f1), share(f2), share(f3)];
-        assert_eq!(60, total_size(files.iter()));
+        assert_eq!(100, total_size(example_files().iter()));
     }
 
     #[test]
@@ -785,6 +816,43 @@
     }
 
     #[test]
+    fn test_version_set_builder() {
+        let (v, opt) = make_version();
+        let v = share(v);
+
+        let mut fmd = FileMetaData::default();
+        fmd.num = 21;
+        fmd.size = 123;
+        fmd.smallest = LookupKey::new("klm".as_bytes(), 777).internal_key().to_vec();
+        fmd.largest = LookupKey::new("kop".as_bytes(), 700).internal_key().to_vec();
+
+        let mut ve = VersionEdit::new();
+        ve.add_file(1, fmd);
+        // This deletion should be undone by apply().
+        ve.delete_file(1, 21);
+        ve.delete_file(0, 2);
+        ve.set_compact_pointer(2, LookupKey::new("xxx".as_bytes(), 123).internal_key());
+
+        let mut b = Builder::new();
+        let mut ptrs: [Vec<u8>; NUM_LEVELS] = Default::default();
+        b.apply(&ve, &mut ptrs);
+
+        assert_eq!(&[120 as u8, 120, 120, 1, 123, 0, 0, 0, 0, 0, 0],
+                   ptrs[2].as_slice());
+        assert_eq!(2, b.deleted[0][0]);
+        assert_eq!(1, b.added[1].len());
+
+        let mut v2 = Version::new(share(TableCache::new("db", opt.clone(), 100)),
+                                  opt.cmp.clone());
+        b.save_to(&InternalKeyCmp(opt.cmp.clone()), &v, &mut v2);
+        // Second file in L0 was removed.
+        assert_eq!(1, v2.files[0].len());
+        // File was added to L1.
+        assert_eq!(4, v2.files[1].len());
+        assert_eq!(21, v2.files[1][3].borrow().num);
+    }
+
+    #[test]
     fn test_version_set_log_and_apply() {
         let (_, opt) = make_version();
         let mut vs = VersionSet::new("db".to_string(),
@@ -801,7 +869,7 @@
             ve.set_last_seq(30);
 
             // Write first manifest to be recovered from.
-            let manifest = format!("db/MANIFEST-{:06}", 1);
+            let manifest = manifest_file_name("db", 1);
             let mffile = opt.env.open_writable_file(Path::new(&manifest)).unwrap();
             let mut lw = LogWriter::new(mffile);
             lw.add_record(&ve.encode()).unwrap();
@@ -828,8 +896,8 @@
             let mut fmd = FileMetaData::default();
             fmd.num = 21;
             fmd.size = 123;
-            fmd.smallest = "abc".as_bytes().to_vec();
-            fmd.largest = "def".as_bytes().to_vec();
+            fmd.smallest = LookupKey::new("abc".as_bytes(), 777).internal_key().to_vec();
+            fmd.largest = LookupKey::new("def".as_bytes(), 700).internal_key().to_vec();
             ve.add_file(1, fmd);
             vs.log_and_apply(ve).unwrap();
 
@@ -848,7 +916,7 @@
             // current version.
             assert_eq!(0, vs.current.as_ref().unwrap().borrow().files[0].len());
             assert_eq!(1, vs.current.as_ref().unwrap().borrow().files[1].len());
-            assert_eq!(47, vs.write_snapshot().unwrap());
+            assert_eq!(63, vs.write_snapshot().unwrap());
         }
     }