changeset 238:889436eb7a06

version_set: Implement make_input_iterator
author Lewin Bormann <lbo@spheniscida.de>
date Wed, 13 Sep 2017 20:23:01 +0200
parents f3626bf3e30f
children 1a74c9d7333b
files src/version_set.rs
diffstat 1 files changed, 59 insertions(+), 6 deletions(-) [+]
line wrap: on
line diff
--- a/src/version_set.rs	Wed Sep 13 20:22:45 2017 +0200
+++ b/src/version_set.rs	Wed Sep 13 20:23:01 2017 +0200
@@ -4,10 +4,11 @@
 use error::{err, Status, StatusCode, Result};
 use key_types::{parse_internal_key, InternalKey, LookupKey, UserKey};
 use log::{LogWriter, LogReader};
+use merging_iter::MergingIter;
 use options::Options;
 use table_cache::TableCache;
-use types::{parse_file_name, share, NUM_LEVELS, FileNum, FileType, Shared};
-use version::{FileMetaHandle, Version};
+use types::{parse_file_name, share, NUM_LEVELS, FileNum, FileType, LdbIterator, Shared};
+use version::{new_version_iter, FileMetaHandle, Version};
 use version_edit::VersionEdit;
 
 use std::cmp::Ordering;
@@ -372,7 +373,7 @@
 
         // TODO: add log statement about compaction.
 
-        compaction.edit.set_compact_pointer(level, &largest);
+        compaction.edit().set_compact_pointer(level, &largest);
         self.compaction_ptrs[level] = largest;
     }
 
@@ -601,6 +602,37 @@
         }
         false
     }
+
+    /// make_input_iterator returns an iterator over the inputs of a compaction.
+    fn make_input_iterator(&self, c: &Compaction) -> Box<LdbIterator> {
+        let cap = if c.level == 0 { c.num_inputs(0) + 1 } else { 2 };
+        let mut iters: Vec<Box<LdbIterator>> = Vec::with_capacity(cap);
+        for i in 0..2 {
+            if c.num_inputs(i) == 0 {
+                continue;
+            }
+            if c.level + i == 0 {
+                // Add individual iterators for L0 tables.
+                for fi in 0..c.num_inputs(i) {
+                    let f = &c.inputs[i][fi];
+                    if let Ok(tbl) = self.cache.borrow_mut().get_table(f.borrow().num) {
+                        iters.push(Box::new(tbl.iter()));
+                    } else {
+                        // TODO: Log this.
+                        panic!("error opening table");
+                    }
+                }
+            } else {
+                // Create concatenating iterator higher levels.
+                iters.push(Box::new(new_version_iter(c.inputs[i].clone(),
+                                                     self.cache.clone(),
+                                                     self.opt.cmp.clone())));
+            }
+        }
+        assert!(iters.len() <= cap);
+        let cmp: Rc<Box<Cmp>> = Rc::new(Box::new(self.cmp.clone()));
+        Box::new(MergingIter::new(cmp, iters))
+    }
 }
 
 struct Builder {
@@ -811,6 +843,7 @@
     use super::*;
     use cmp::DefaultCmp;
     use mem_env::MemEnv;
+    use test_util::LdbIteratorIter;
     use types::FileMetaData;
     use version::testutil::make_version;
 
@@ -1024,6 +1057,20 @@
         }
     }
 
+    /// iterator_properties tests that it contains len elements and that they are ordered in
+    /// ascending order by cmp.
+    fn iterator_properties<It: LdbIterator>(mut it: It, len: usize, cmp: Rc<Box<Cmp>>) {
+        let mut wr = LdbIteratorIter::wrap(&mut it);
+        let first = wr.next().unwrap();
+        let mut count = 1;
+        wr.fold(first, |(a, _), (b, c)| {
+            assert_eq!(Ordering::Less, cmp.cmp(&a, &b));
+            count += 1;
+            (b, c)
+        });
+        assert_eq!(len, count);
+    }
+
     #[test]
     fn test_version_set_compaction() {
         let (v, opt) = make_version();
@@ -1067,7 +1114,10 @@
             let c = vs.compact_range(0, from.internal_key(), to.internal_key()).unwrap();
             assert_eq!(2, c.inputs[0].len());
             assert_eq!(1, c.inputs[1].len());
-            assert_eq!(1, c.grandparents.unwrap().len());
+            assert_eq!(1, c.grandparents.as_ref().unwrap().len());
+            iterator_properties(vs.make_input_iterator(&c),
+                                9,
+                                Rc::new(Box::new(vs.cmp.clone())));
 
             // Expand input range on higher level.
             let from = LookupKey::new("dab".as_bytes(), 1000);
@@ -1075,7 +1125,10 @@
             let c = vs.compact_range(1, from.internal_key(), to.internal_key()).unwrap();
             assert_eq!(3, c.inputs[0].len());
             assert_eq!(1, c.inputs[1].len());
-            assert_eq!(0, c.grandparents.unwrap().len());
+            assert_eq!(0, c.grandparents.as_ref().unwrap().len());
+            iterator_properties(vs.make_input_iterator(&c),
+                                12,
+                                Rc::new(Box::new(vs.cmp.clone())));
 
             // is_trivial_move
             let from = LookupKey::new("fab".as_bytes(), 1000);
@@ -1108,7 +1161,7 @@
                 assert_eq!(inp.2, f.borrow().num);
             }
             c.add_input_deletions();
-            assert_eq!(23, c.edit.encode().len())
+            assert_eq!(23, c.edit().encode().len())
         }
     }
 }