Mercurial > lbo > hg > leveldb-rs
changeset 235:c5a6d5da8380
version_set: Various cleanups and new test for Builder.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Mon, 11 Sep 2017 19:10:35 +0200 |
parents | 950bdfc410bc |
children | 0c747fecd98f |
files | src/version.rs src/version_set.rs |
diffstat | 2 files changed, 100 insertions(+), 30 deletions(-) [+] |
line wrap: on
line diff
--- a/src/version.rs Mon Sep 11 18:11:45 2017 +0200 +++ b/src/version.rs Mon Sep 11 19:10:35 2017 +0200 @@ -146,6 +146,8 @@ } } + /// update_stats updates the number of seeks, and remembers files with too many seeks as + /// compaction candidates. fn update_stats(&mut self, stats: GetStats) -> bool { if let Some(file) = stats.file { if file.borrow().allowed_seeks <= 1 && self.file_to_compact.is_none() {
--- a/src/version_set.rs Mon Sep 11 18:11:45 2017 +0200 +++ b/src/version_set.rs Mon Sep 11 19:10:35 2017 +0200 @@ -382,7 +382,7 @@ self.finalize(&mut v); if self.descriptor_log.is_none() { - let descname = format!("{}/MANIFEST-{:06}", self.dbname, self.manifest_num); + let descname = manifest_file_name(&self.dbname, self.manifest_num); edit.set_next_file(self.next_file_num); self.descriptor_log = Some(LogWriter::new(self.opt.env.open_writable_file(Path::new(&descname))?)); @@ -442,7 +442,7 @@ let mut current = String::new(); { let mut f = - self.opt.env.open_sequential_file(Path::new(&format!("{}/CURRENT", self.dbname)))?; + self.opt.env.open_sequential_file(Path::new(¤t_file_name(&self.dbname)))?; f.read_to_string(&mut current)?; } if current.is_empty() || !current.ends_with('\n') { @@ -572,6 +572,8 @@ } } + /// apply applies the edits recorded in edit to the builder state. compaction pointers are + /// copied to the supplied compaction_ptrs array. fn apply(&mut self, edit: &VersionEdit, compaction_ptrs: &mut [Vec<u8>; NUM_LEVELS]) { for c in edit.compaction_ptrs.iter() { compaction_ptrs[c.level] = c.key.clone(); @@ -585,15 +587,17 @@ if f.allowed_seeks < 100 { f.allowed_seeks = 100; } - for i in 0..self.deleted[level].len() { - if self.deleted[level][i] == f.num { - self.deleted[level].swap_remove(i); - } - } + // Remove this file from the list of deleted files. + self.deleted[level] = self.deleted[level] + .iter() + .filter_map(|d| if *d != f.num { Some(*d) } else { None }) + .collect(); self.added[level].push(share(f)); } } + /// maybe_add_file adds a file f at level to version v, if it's not already marked as deleted + /// in this edit. It also asserts that the ordering of files is preserved. fn maybe_add_file(&mut self, cmp: &InternalKeyCmp, v: &mut Version, @@ -615,6 +619,8 @@ v.files[level].push(f); } + /// save_to saves the edits applied to the builder to v, adding all non-deleted files from + /// Version base to v. fn save_to(&mut self, cmp: &InternalKeyCmp, base: &Shared<Version>, v: &mut Version) { for level in 0..NUM_LEVELS { sort_files_by_smallest(cmp, &mut self.added[level]); @@ -625,10 +631,10 @@ let basefiles = base.borrow().files[level].clone(); v.files[level].reserve(basefiles.len() + self.added[level].len()); - let mut iadded = added.into_iter(); - let mut ibasefiles = basefiles.into_iter(); - let merged = merge_iters(&mut iadded, - &mut ibasefiles, + let iadded = added.into_iter(); + let ibasefiles = basefiles.into_iter(); + let merged = merge_iters(iadded, + ibasefiles, |a, b| cmp.cmp(&a.borrow().smallest, &b.borrow().smallest)); for m in merged { self.maybe_add_file(cmp, v, level, m); @@ -641,22 +647,37 @@ for i in 1..v.files[level].len() { let (prev_end, this_begin) = (&v.files[level][i - 1].borrow().largest, &v.files[level][i].borrow().smallest); - assert!(cmp.cmp(prev_end, this_begin) >= Ordering::Equal); + assert!(cmp.cmp(prev_end, this_begin) < Ordering::Equal); } } } } +fn manifest_name(file_num: FileNum) -> String { + format!("MANIFEST-{:06}", file_num) +} + +fn manifest_file_name(dbname: &str, file_num: FileNum) -> String { + format!("{}/{}", dbname, manifest_name(file_num)) +} + +fn temp_file_name(dbname: &str, file_num: FileNum) -> String { + format!("{}/{:06}.dbtmp", dbname, file_num) +} + +fn current_file_name(dbname: &str) -> String { + format!("{}/CURRENT", dbname) +} + fn set_current_file(env: &Box<Env>, dbname: &str, manifest_file_num: FileNum) -> Result<()> { - let manifest_base = format!("MANIFEST-{:06}", manifest_file_num); - let tempfile = format!("{}/{}.dbtmp", dbname, manifest_file_num); - + let manifest_base = manifest_name(manifest_file_num); + let tempfile = temp_file_name(dbname, manifest_file_num); { let mut f = env.open_writable_file(Path::new(&tempfile))?; f.write(manifest_base.as_bytes())?; f.write("\n".as_bytes())?; } - let currentfile = format!("{}/CURRENT", dbname); + let currentfile = current_file_name(dbname); if let Err(e) = env.rename(Path::new(&tempfile), Path::new(¤tfile)) { // ignore error. env.delete(Path::new(&tempfile)).is_ok(); @@ -675,8 +696,8 @@ C: Fn(&Item, &Item) -> Ordering, I: Iterator<Item = Item>, J: Iterator<Item = Item>> - (iter_a: &mut I, - iter_b: &mut J, + (mut iter_a: I, + mut iter_b: J, cmp: C) -> Vec<Item> { let mut a = iter_a.next(); @@ -750,30 +771,40 @@ fn example_files() -> Vec<FileMetaHandle> { let mut f1 = FileMetaData::default(); + f1.num = 1; + f1.size = 10; f1.smallest = "f".as_bytes().to_vec(); f1.largest = "g".as_bytes().to_vec(); let mut f2 = FileMetaData::default(); + f2.num = 2; + f2.size = 20; f2.smallest = "e".as_bytes().to_vec(); f2.largest = "f".as_bytes().to_vec(); let mut f3 = FileMetaData::default(); + f3.num = 3; + f3.size = 30; f3.smallest = "a".as_bytes().to_vec(); f3.largest = "b".as_bytes().to_vec(); let mut f4 = FileMetaData::default(); + f4.num = 4; + f4.size = 40; f4.smallest = "q".as_bytes().to_vec(); f4.largest = "z".as_bytes().to_vec(); vec![f1, f2, f3, f4].into_iter().map(share).collect() } #[test] + fn test_version_set_merge_iters() { + let v1 = vec![2, 4, 6, 8, 10]; + let v2 = vec![1, 3, 5, 7]; + assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8, 10], + merge_iters(v1.into_iter(), v2.into_iter(), |a, b| a.cmp(&b))); + + } + + #[test] fn test_version_set_total_size() { - let mut f1 = FileMetaData::default(); - f1.size = 10; - let mut f2 = FileMetaData::default(); - f2.size = 20; - let mut f3 = FileMetaData::default(); - f3.size = 30; - let files = vec![share(f1), share(f2), share(f3)]; - assert_eq!(60, total_size(files.iter())); + assert_eq!(100, total_size(example_files().iter())); } #[test] @@ -785,6 +816,43 @@ } #[test] + fn test_version_set_builder() { + let (v, opt) = make_version(); + let v = share(v); + + let mut fmd = FileMetaData::default(); + fmd.num = 21; + fmd.size = 123; + fmd.smallest = LookupKey::new("klm".as_bytes(), 777).internal_key().to_vec(); + fmd.largest = LookupKey::new("kop".as_bytes(), 700).internal_key().to_vec(); + + let mut ve = VersionEdit::new(); + ve.add_file(1, fmd); + // This deletion should be undone by apply(). + ve.delete_file(1, 21); + ve.delete_file(0, 2); + ve.set_compact_pointer(2, LookupKey::new("xxx".as_bytes(), 123).internal_key()); + + let mut b = Builder::new(); + let mut ptrs: [Vec<u8>; NUM_LEVELS] = Default::default(); + b.apply(&ve, &mut ptrs); + + assert_eq!(&[120 as u8, 120, 120, 1, 123, 0, 0, 0, 0, 0, 0], + ptrs[2].as_slice()); + assert_eq!(2, b.deleted[0][0]); + assert_eq!(1, b.added[1].len()); + + let mut v2 = Version::new(share(TableCache::new("db", opt.clone(), 100)), + opt.cmp.clone()); + b.save_to(&InternalKeyCmp(opt.cmp.clone()), &v, &mut v2); + // Second file in L0 was removed. + assert_eq!(1, v2.files[0].len()); + // File was added to L1. + assert_eq!(4, v2.files[1].len()); + assert_eq!(21, v2.files[1][3].borrow().num); + } + + #[test] fn test_version_set_log_and_apply() { let (_, opt) = make_version(); let mut vs = VersionSet::new("db".to_string(), @@ -801,7 +869,7 @@ ve.set_last_seq(30); // Write first manifest to be recovered from. - let manifest = format!("db/MANIFEST-{:06}", 1); + let manifest = manifest_file_name("db", 1); let mffile = opt.env.open_writable_file(Path::new(&manifest)).unwrap(); let mut lw = LogWriter::new(mffile); lw.add_record(&ve.encode()).unwrap(); @@ -828,8 +896,8 @@ let mut fmd = FileMetaData::default(); fmd.num = 21; fmd.size = 123; - fmd.smallest = "abc".as_bytes().to_vec(); - fmd.largest = "def".as_bytes().to_vec(); + fmd.smallest = LookupKey::new("abc".as_bytes(), 777).internal_key().to_vec(); + fmd.largest = LookupKey::new("def".as_bytes(), 700).internal_key().to_vec(); ve.add_file(1, fmd); vs.log_and_apply(ve).unwrap(); @@ -848,7 +916,7 @@ // current version. assert_eq!(0, vs.current.as_ref().unwrap().borrow().files[0].len()); assert_eq!(1, vs.current.as_ref().unwrap().borrow().files[1].len()); - assert_eq!(47, vs.write_snapshot().unwrap()); + assert_eq!(63, vs.write_snapshot().unwrap()); } }