Mercurial > lbo > hg > leveldb-rs
changeset 238:889436eb7a06
version_set: Implement make_input_iterator
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Wed, 13 Sep 2017 20:23:01 +0200 |
parents | f3626bf3e30f |
children | 1a74c9d7333b |
files | src/version_set.rs |
diffstat | 1 files changed, 59 insertions(+), 6 deletions(-) [+] |
line wrap: on
line diff
--- a/src/version_set.rs Wed Sep 13 20:22:45 2017 +0200 +++ b/src/version_set.rs Wed Sep 13 20:23:01 2017 +0200 @@ -4,10 +4,11 @@ use error::{err, Status, StatusCode, Result}; use key_types::{parse_internal_key, InternalKey, LookupKey, UserKey}; use log::{LogWriter, LogReader}; +use merging_iter::MergingIter; use options::Options; use table_cache::TableCache; -use types::{parse_file_name, share, NUM_LEVELS, FileNum, FileType, Shared}; -use version::{FileMetaHandle, Version}; +use types::{parse_file_name, share, NUM_LEVELS, FileNum, FileType, LdbIterator, Shared}; +use version::{new_version_iter, FileMetaHandle, Version}; use version_edit::VersionEdit; use std::cmp::Ordering; @@ -372,7 +373,7 @@ // TODO: add log statement about compaction. - compaction.edit.set_compact_pointer(level, &largest); + compaction.edit().set_compact_pointer(level, &largest); self.compaction_ptrs[level] = largest; } @@ -601,6 +602,37 @@ } false } + + /// make_input_iterator returns an iterator over the inputs of a compaction. + fn make_input_iterator(&self, c: &Compaction) -> Box<LdbIterator> { + let cap = if c.level == 0 { c.num_inputs(0) + 1 } else { 2 }; + let mut iters: Vec<Box<LdbIterator>> = Vec::with_capacity(cap); + for i in 0..2 { + if c.num_inputs(i) == 0 { + continue; + } + if c.level + i == 0 { + // Add individual iterators for L0 tables. + for fi in 0..c.num_inputs(i) { + let f = &c.inputs[i][fi]; + if let Ok(tbl) = self.cache.borrow_mut().get_table(f.borrow().num) { + iters.push(Box::new(tbl.iter())); + } else { + // TODO: Log this. + panic!("error opening table"); + } + } + } else { + // Create concatenating iterator higher levels. + iters.push(Box::new(new_version_iter(c.inputs[i].clone(), + self.cache.clone(), + self.opt.cmp.clone()))); + } + } + assert!(iters.len() <= cap); + let cmp: Rc<Box<Cmp>> = Rc::new(Box::new(self.cmp.clone())); + Box::new(MergingIter::new(cmp, iters)) + } } struct Builder { @@ -811,6 +843,7 @@ use super::*; use cmp::DefaultCmp; use mem_env::MemEnv; + use test_util::LdbIteratorIter; use types::FileMetaData; use version::testutil::make_version; @@ -1024,6 +1057,20 @@ } } + /// iterator_properties tests that it contains len elements and that they are ordered in + /// ascending order by cmp. + fn iterator_properties<It: LdbIterator>(mut it: It, len: usize, cmp: Rc<Box<Cmp>>) { + let mut wr = LdbIteratorIter::wrap(&mut it); + let first = wr.next().unwrap(); + let mut count = 1; + wr.fold(first, |(a, _), (b, c)| { + assert_eq!(Ordering::Less, cmp.cmp(&a, &b)); + count += 1; + (b, c) + }); + assert_eq!(len, count); + } + #[test] fn test_version_set_compaction() { let (v, opt) = make_version(); @@ -1067,7 +1114,10 @@ let c = vs.compact_range(0, from.internal_key(), to.internal_key()).unwrap(); assert_eq!(2, c.inputs[0].len()); assert_eq!(1, c.inputs[1].len()); - assert_eq!(1, c.grandparents.unwrap().len()); + assert_eq!(1, c.grandparents.as_ref().unwrap().len()); + iterator_properties(vs.make_input_iterator(&c), + 9, + Rc::new(Box::new(vs.cmp.clone()))); // Expand input range on higher level. let from = LookupKey::new("dab".as_bytes(), 1000); @@ -1075,7 +1125,10 @@ let c = vs.compact_range(1, from.internal_key(), to.internal_key()).unwrap(); assert_eq!(3, c.inputs[0].len()); assert_eq!(1, c.inputs[1].len()); - assert_eq!(0, c.grandparents.unwrap().len()); + assert_eq!(0, c.grandparents.as_ref().unwrap().len()); + iterator_properties(vs.make_input_iterator(&c), + 12, + Rc::new(Box::new(vs.cmp.clone()))); // is_trivial_move let from = LookupKey::new("fab".as_bytes(), 1000); @@ -1108,7 +1161,7 @@ assert_eq!(inp.2, f.borrow().num); } c.add_input_deletions(); - assert_eq!(23, c.edit.encode().len()) + assert_eq!(23, c.edit().encode().len()) } } }