changeset 236:0c747fecd98f

version_set: Implement pick_compaction.
author Lewin Bormann <lbo@spheniscida.de>
date Tue, 12 Sep 2017 19:58:44 +0200
parents c5a6d5da8380
children f3626bf3e30f
files src/version_set.rs
diffstat 1 files changed, 114 insertions(+), 32 deletions(-) [+]
line wrap: on
line diff
--- a/src/version_set.rs	Mon Sep 11 19:10:35 2017 +0200
+++ b/src/version_set.rs	Tue Sep 12 19:58:44 2017 +0200
@@ -31,7 +31,7 @@
     grandparents: Option<Vec<FileMetaHandle>>,
     overlapped_bytes: usize,
     seen_key: bool,
-    pub edit: VersionEdit,
+    edit: VersionEdit,
 }
 
 impl Compaction {
@@ -53,6 +53,20 @@
         }
     }
 
+    fn add_input(&mut self, parent: usize, f: FileMetaHandle) {
+        assert!(parent <= 1);
+        self.inputs[parent].push(f)
+    }
+
+    fn num_inputs(&self, parent: usize) -> usize {
+        assert!(parent < 2);
+        self.inputs[parent].len()
+    }
+
+    fn edit(&mut self) -> &mut VersionEdit {
+        &mut self.edit
+    }
+
     /// add_input_deletions marks the current input files as deleted in the inner VersionEdit.
     fn add_input_deletions(&mut self) {
         for parent in 0..2 {
@@ -62,12 +76,6 @@
         }
     }
 
-    fn input(&self, parent: usize, i: usize) -> FileMetaHandle {
-        assert!(parent < 2);
-        assert!(i < self.inputs[parent].len());
-        self.inputs[parent][i].clone()
-    }
-
     /// is_base_level_for checks whether the given key may exist in levels higher than this
     /// compaction's level plus 2. I.e., whether the levels for this compaction are the last ones
     /// to contain the key.
@@ -91,11 +99,6 @@
         true
     }
 
-    fn num_inputs(&self, parent: usize) -> usize {
-        assert!(parent < 2);
-        self.inputs[parent].len()
-    }
-
     fn is_trivial_move(&self) -> bool {
         let inputs_size = total_size(self.grandparents.as_ref().unwrap().iter());
         self.num_inputs(0) == 1 && self.num_inputs(1) == 0 && inputs_size < 10 * self.max_file_size
@@ -181,8 +184,11 @@
         files.into_iter().collect()
     }
 
-    fn current(&self) -> Option<Shared<Version>> {
-        self.current.clone()
+    // current returns a reference to the current version. It panics if there is no current
+    // version.
+    fn current(&self) -> Shared<Version> {
+        assert!(self.current.is_some());
+        self.current.as_ref().unwrap().clone()
     }
 
     fn num_level_bytes(&self, l: usize) -> usize {
@@ -234,6 +240,54 @@
         offset
     }
 
+    fn pick_compaction(&mut self) -> Option<Compaction> {
+        assert!(self.current.is_some());
+        let current = self.current();
+        let current = current.borrow();
+
+        let mut c = Compaction::new(&self.opt, 0, self.current.clone());
+        let level;
+
+        // Size compaction?
+        if current.compaction_score.unwrap_or(0.0) >= 1.0 {
+            level = current.compaction_level.unwrap();
+            assert!(level < NUM_LEVELS - 1);
+
+            for f in &current.files[level] {
+                if self.compaction_ptrs[level].is_empty() ||
+                   self.cmp.cmp(&f.borrow().largest, &self.compaction_ptrs[level]) ==
+                   Ordering::Greater {
+                    c.add_input(0, f.clone());
+                    break;
+                }
+            }
+
+            if c.num_inputs(0) == 0 {
+                // Add first file in level.
+                c.add_input(0, current.files[level][0].clone());
+            }
+        } else if let Some(ref ftc) = current.file_to_compact {
+            // Seek compaction?
+            level = current.file_to_compact_lvl;
+            c.add_input(0, ftc.clone());
+        } else {
+            return None;
+        }
+
+        c.level = level;
+        c.input_version = self.current.clone();
+
+        if level == 0 {
+            let (smallest, largest) = get_range(&self.cmp, c.inputs[0].iter());
+            // This call intentionally overwrites the file previously put into c.inputs[0].
+            c.inputs[0] = current.overlapping_inputs(0, &smallest, &largest);
+            assert!(!c.inputs[0].is_empty());
+        }
+
+        self.setup_other_inputs(&mut c);
+        Some(c)
+    }
+
     fn compact_range<'a, 'b>(&mut self,
                              level: usize,
                              from: InternalKey<'a>,
@@ -264,16 +318,15 @@
     }
 
     fn setup_other_inputs(&mut self, compaction: &mut Compaction) {
+        assert!(self.current.is_some());
+        let current = self.current.as_ref().unwrap();
+        let current = current.borrow();
+
         let level = compaction.level;
         let (smallest, mut largest) = get_range(&self.cmp, compaction.inputs[0].iter());
 
-        assert!(self.current.is_some());
         // Set up level+1 inputs.
-        compaction.inputs[1] = self.current
-            .as_ref()
-            .unwrap()
-            .borrow()
-            .overlapping_inputs(level + 1, &smallest, &largest);
+        compaction.inputs[1] = current.overlapping_inputs(level + 1, &smallest, &largest);
 
         let (mut allstart, mut alllimit) =
             get_range(&self.cmp,
@@ -282,22 +335,14 @@
         // Check if we can add more inputs in the current level without having to compact more
         // inputs from level+1.
         if !compaction.inputs[1].is_empty() {
-            let expanded0 = self.current
-                .as_ref()
-                .unwrap()
-                .borrow()
-                .overlapping_inputs(level, &allstart, &alllimit);
+            let expanded0 = current.overlapping_inputs(level, &allstart, &alllimit);
             let inputs1_size = total_size(compaction.inputs[1].iter());
             let expanded0_size = total_size(expanded0.iter());
             // ...if we picked up more files in the current level, and the total size is acceptable
             if expanded0.len() > compaction.num_inputs(0) &&
                (inputs1_size + expanded0_size) < 25 * self.opt.max_file_size {
                 let (new_start, new_limit) = get_range(&self.cmp, expanded0.iter());
-                let expanded1 = self.current
-                    .as_ref()
-                    .unwrap()
-                    .borrow()
-                    .overlapping_inputs(level + 1, &new_start, &new_limit);
+                let expanded1 = current.overlapping_inputs(level + 1, &new_start, &new_limit);
                 if expanded1.len() == compaction.num_inputs(1) {
                     // TODO: Log this.
 
@@ -943,6 +988,43 @@
     }
 
     #[test]
+    fn test_version_set_pick_compaction() {
+        let (mut v, opt) = make_version();
+        let mut vs = VersionSet::new("db".to_string(),
+                                     opt.clone(),
+                                     share(TableCache::new("db", opt, 100)));
+
+        v.compaction_score = Some(2.0);
+        v.compaction_level = Some(0);
+        vs.add_version(v);
+
+        // Size compaction
+        {
+            let c = vs.pick_compaction().unwrap();
+            assert_eq!(2, c.inputs[0].len());
+            assert_eq!(1, c.inputs[1].len());
+            assert_eq!(0, c.level);
+            assert!(c.input_version.is_some());
+        }
+        // Seek compaction
+        {
+            let current = vs.current();
+            current.borrow_mut().compaction_score = None;
+            current.borrow_mut().compaction_level = None;
+            current.borrow_mut().file_to_compact_lvl = 1;
+
+            let fmd = current.borrow().files[1][0].clone();
+            current.borrow_mut().file_to_compact = Some(fmd);
+
+            let c = vs.pick_compaction().unwrap();
+            assert_eq!(3, c.inputs[0].len()); // inputs on l+0 are expanded.
+            assert_eq!(1, c.inputs[1].len());
+            assert_eq!(1, c.level);
+            assert!(c.input_version.is_some());
+        }
+    }
+
+    #[test]
     fn test_version_set_compaction() {
         let (v, opt) = make_version();
         let mut vs = VersionSet::new("db".to_string(),
@@ -953,7 +1035,7 @@
 
         {
             // approximate_offset()
-            let v = vs.current().unwrap();
+            let v = vs.current();
             assert_eq!(0,
                        vs.approximate_offset(&v,
                                              LookupKey::new("aaa".as_bytes(), 9000)
@@ -1022,7 +1104,7 @@
             let to = LookupKey::new("zzz".as_bytes(), 1010);
             let mut c = vs.compact_range(0, from.internal_key(), to.internal_key()).unwrap();
             for inp in &[(0, 0, 1), (0, 1, 2), (1, 0, 3)] {
-                let f = c.input(inp.0, inp.1);
+                let f = &c.inputs[inp.0][inp.1];
                 assert_eq!(inp.2, f.borrow().num);
             }
             c.add_input_deletions();