Mercurial > lbo > hg > leveldb-rs
changeset 236:0c747fecd98f
version_set: Implement pick_compaction.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Tue, 12 Sep 2017 19:58:44 +0200 |
parents | c5a6d5da8380 |
children | f3626bf3e30f |
files | src/version_set.rs |
diffstat | 1 files changed, 114 insertions(+), 32 deletions(-) [+] |
line wrap: on
line diff
--- a/src/version_set.rs Mon Sep 11 19:10:35 2017 +0200 +++ b/src/version_set.rs Tue Sep 12 19:58:44 2017 +0200 @@ -31,7 +31,7 @@ grandparents: Option<Vec<FileMetaHandle>>, overlapped_bytes: usize, seen_key: bool, - pub edit: VersionEdit, + edit: VersionEdit, } impl Compaction { @@ -53,6 +53,20 @@ } } + fn add_input(&mut self, parent: usize, f: FileMetaHandle) { + assert!(parent <= 1); + self.inputs[parent].push(f) + } + + fn num_inputs(&self, parent: usize) -> usize { + assert!(parent < 2); + self.inputs[parent].len() + } + + fn edit(&mut self) -> &mut VersionEdit { + &mut self.edit + } + /// add_input_deletions marks the current input files as deleted in the inner VersionEdit. fn add_input_deletions(&mut self) { for parent in 0..2 { @@ -62,12 +76,6 @@ } } - fn input(&self, parent: usize, i: usize) -> FileMetaHandle { - assert!(parent < 2); - assert!(i < self.inputs[parent].len()); - self.inputs[parent][i].clone() - } - /// is_base_level_for checks whether the given key may exist in levels higher than this /// compaction's level plus 2. I.e., whether the levels for this compaction are the last ones /// to contain the key. @@ -91,11 +99,6 @@ true } - fn num_inputs(&self, parent: usize) -> usize { - assert!(parent < 2); - self.inputs[parent].len() - } - fn is_trivial_move(&self) -> bool { let inputs_size = total_size(self.grandparents.as_ref().unwrap().iter()); self.num_inputs(0) == 1 && self.num_inputs(1) == 0 && inputs_size < 10 * self.max_file_size @@ -181,8 +184,11 @@ files.into_iter().collect() } - fn current(&self) -> Option<Shared<Version>> { - self.current.clone() + // current returns a reference to the current version. It panics if there is no current + // version. + fn current(&self) -> Shared<Version> { + assert!(self.current.is_some()); + self.current.as_ref().unwrap().clone() } fn num_level_bytes(&self, l: usize) -> usize { @@ -234,6 +240,54 @@ offset } + fn pick_compaction(&mut self) -> Option<Compaction> { + assert!(self.current.is_some()); + let current = self.current(); + let current = current.borrow(); + + let mut c = Compaction::new(&self.opt, 0, self.current.clone()); + let level; + + // Size compaction? + if current.compaction_score.unwrap_or(0.0) >= 1.0 { + level = current.compaction_level.unwrap(); + assert!(level < NUM_LEVELS - 1); + + for f in ¤t.files[level] { + if self.compaction_ptrs[level].is_empty() || + self.cmp.cmp(&f.borrow().largest, &self.compaction_ptrs[level]) == + Ordering::Greater { + c.add_input(0, f.clone()); + break; + } + } + + if c.num_inputs(0) == 0 { + // Add first file in level. + c.add_input(0, current.files[level][0].clone()); + } + } else if let Some(ref ftc) = current.file_to_compact { + // Seek compaction? + level = current.file_to_compact_lvl; + c.add_input(0, ftc.clone()); + } else { + return None; + } + + c.level = level; + c.input_version = self.current.clone(); + + if level == 0 { + let (smallest, largest) = get_range(&self.cmp, c.inputs[0].iter()); + // This call intentionally overwrites the file previously put into c.inputs[0]. + c.inputs[0] = current.overlapping_inputs(0, &smallest, &largest); + assert!(!c.inputs[0].is_empty()); + } + + self.setup_other_inputs(&mut c); + Some(c) + } + fn compact_range<'a, 'b>(&mut self, level: usize, from: InternalKey<'a>, @@ -264,16 +318,15 @@ } fn setup_other_inputs(&mut self, compaction: &mut Compaction) { + assert!(self.current.is_some()); + let current = self.current.as_ref().unwrap(); + let current = current.borrow(); + let level = compaction.level; let (smallest, mut largest) = get_range(&self.cmp, compaction.inputs[0].iter()); - assert!(self.current.is_some()); // Set up level+1 inputs. - compaction.inputs[1] = self.current - .as_ref() - .unwrap() - .borrow() - .overlapping_inputs(level + 1, &smallest, &largest); + compaction.inputs[1] = current.overlapping_inputs(level + 1, &smallest, &largest); let (mut allstart, mut alllimit) = get_range(&self.cmp, @@ -282,22 +335,14 @@ // Check if we can add more inputs in the current level without having to compact more // inputs from level+1. if !compaction.inputs[1].is_empty() { - let expanded0 = self.current - .as_ref() - .unwrap() - .borrow() - .overlapping_inputs(level, &allstart, &alllimit); + let expanded0 = current.overlapping_inputs(level, &allstart, &alllimit); let inputs1_size = total_size(compaction.inputs[1].iter()); let expanded0_size = total_size(expanded0.iter()); // ...if we picked up more files in the current level, and the total size is acceptable if expanded0.len() > compaction.num_inputs(0) && (inputs1_size + expanded0_size) < 25 * self.opt.max_file_size { let (new_start, new_limit) = get_range(&self.cmp, expanded0.iter()); - let expanded1 = self.current - .as_ref() - .unwrap() - .borrow() - .overlapping_inputs(level + 1, &new_start, &new_limit); + let expanded1 = current.overlapping_inputs(level + 1, &new_start, &new_limit); if expanded1.len() == compaction.num_inputs(1) { // TODO: Log this. @@ -943,6 +988,43 @@ } #[test] + fn test_version_set_pick_compaction() { + let (mut v, opt) = make_version(); + let mut vs = VersionSet::new("db".to_string(), + opt.clone(), + share(TableCache::new("db", opt, 100))); + + v.compaction_score = Some(2.0); + v.compaction_level = Some(0); + vs.add_version(v); + + // Size compaction + { + let c = vs.pick_compaction().unwrap(); + assert_eq!(2, c.inputs[0].len()); + assert_eq!(1, c.inputs[1].len()); + assert_eq!(0, c.level); + assert!(c.input_version.is_some()); + } + // Seek compaction + { + let current = vs.current(); + current.borrow_mut().compaction_score = None; + current.borrow_mut().compaction_level = None; + current.borrow_mut().file_to_compact_lvl = 1; + + let fmd = current.borrow().files[1][0].clone(); + current.borrow_mut().file_to_compact = Some(fmd); + + let c = vs.pick_compaction().unwrap(); + assert_eq!(3, c.inputs[0].len()); // inputs on l+0 are expanded. + assert_eq!(1, c.inputs[1].len()); + assert_eq!(1, c.level); + assert!(c.input_version.is_some()); + } + } + + #[test] fn test_version_set_compaction() { let (v, opt) = make_version(); let mut vs = VersionSet::new("db".to_string(), @@ -953,7 +1035,7 @@ { // approximate_offset() - let v = vs.current().unwrap(); + let v = vs.current(); assert_eq!(0, vs.approximate_offset(&v, LookupKey::new("aaa".as_bytes(), 9000) @@ -1022,7 +1104,7 @@ let to = LookupKey::new("zzz".as_bytes(), 1010); let mut c = vs.compact_range(0, from.internal_key(), to.internal_key()).unwrap(); for inp in &[(0, 0, 1), (0, 1, 2), (1, 0, 3)] { - let f = c.input(inp.0, inp.1); + let f = &c.inputs[inp.0][inp.1]; assert_eq!(inp.2, f.borrow().num); } c.add_input_deletions();