Mercurial > lbo > hg > leveldb-rs
changeset 234:950bdfc410bc
version_set: Implement log_and_apply, recover, and add tests.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Mon, 11 Sep 2017 18:11:45 +0200 |
parents | 6f1af366dc97 |
children | c5a6d5da8380 |
files | src/log.rs src/types.rs src/version.rs src/version_edit.rs src/version_set.rs |
diffstat | 5 files changed, 443 insertions(+), 37 deletions(-) [+] |
line wrap: on
line diff
--- a/src/log.rs Sun Sep 10 17:32:28 2017 +0200 +++ b/src/log.rs Mon Sep 11 18:11:45 2017 +0200 @@ -100,6 +100,11 @@ self.current_block_offset += s; Ok(s) } + + pub fn flush(&mut self) -> Result<()> { + self.dst.flush()?; + Ok(()) + } }
--- a/src/types.rs Sun Sep 10 17:32:28 2017 +0200 +++ b/src/types.rs Mon Sep 11 18:11:45 2017 +0200 @@ -1,7 +1,12 @@ //! A collection of fundamental and/or simple types used by other modules +use error::{err, Result, StatusCode}; + use std::cell::RefCell; use std::rc::Rc; +use std::str::FromStr; + +pub const NUM_LEVELS: usize = 7; #[derive(Debug, PartialOrd, PartialEq)] pub enum ValueType { @@ -101,4 +106,71 @@ pub largest: Vec<u8>, } -pub const NUM_LEVELS: usize = 7; +#[derive(Debug, Clone, PartialEq)] +pub enum FileType { + Log, + DBLock, + Table, + Descriptor, + Current, + Temp, + InfoLog, +} + +pub fn parse_file_name(f: &str) -> Result<(FileNum, FileType)> { + if f == "CURRENT" { + return Ok((0, FileType::Current)); + } else if f == "LOCK" { + return Ok((0, FileType::DBLock)); + } else if f == "LOG" || f == "LOG.old" { + return Ok((0, FileType::InfoLog)); + } else if f.starts_with("MANIFEST-") { + if let Some(ix) = f.find('-') { + if let Ok(num) = FileNum::from_str_radix(&f[ix + 1..], 10) { + return Ok((num, FileType::Descriptor)); + } + return err(StatusCode::InvalidArgument, + "manifest file number is invalid"); + } + return err(StatusCode::InvalidArgument, "manifest file has no dash"); + } else if let Some(ix) = f.find('.') { + // 00012345.log 00123.sst ... + if let Ok(num) = FileNum::from_str_radix(&f[0..ix], 10) { + let typ = match &f[ix + 1..] { + "sst" | "ldb" => FileType::Table, + "dbtmp" => FileType::Temp, + _ => { + return err(StatusCode::InvalidArgument, + "unknown numbered file extension") + } + }; + return Ok((num, typ)); + } + return err(StatusCode::InvalidArgument, + "invalid file number for table or temp file"); + } + err(StatusCode::InvalidArgument, "unknown file type") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_types_parse_file_name() { + for c in &[("CURRENT", (0, FileType::Current)), + ("LOCK", (0, FileType::DBLock)), + ("LOG", (0, FileType::InfoLog)), + ("LOG.old", (0, FileType::InfoLog)), + ("MANIFEST-01234", (1234, FileType::Descriptor)), + ("001122.sst", (1122, FileType::Table)), + ("001122.ldb", (1122, FileType::Table)), + ("001122.dbtmp", (1122, FileType::Temp))] { + assert_eq!(parse_file_name(c.0).unwrap(), c.1); + } + assert!(parse_file_name("xyz.LOCK").is_err()); + assert!(parse_file_name("01a.sst").is_err()); + assert!(parse_file_name("0011.abc").is_err()); + assert!(parse_file_name("MANIFEST-trolol").is_err()); + } +}
--- a/src/version.rs Sun Sep 10 17:32:28 2017 +0200 +++ b/src/version.rs Mon Sep 11 18:11:45 2017 +0200 @@ -27,6 +27,8 @@ pub file_to_compact: Option<FileMetaHandle>, pub file_to_compact_lvl: usize, + pub compaction_score: Option<f64>, + pub compaction_level: Option<usize>, } impl Version { @@ -37,6 +39,8 @@ files: Default::default(), file_to_compact: None, file_to_compact_lvl: 0, + compaction_score: None, + compaction_level: None, } }
--- a/src/version_edit.rs Sun Sep 10 17:32:28 2017 +0200 +++ b/src/version_edit.rs Mon Sep 11 18:11:45 2017 +0200 @@ -61,10 +61,10 @@ /// Manages changes to the set of managed SSTables and logfiles. pub struct VersionEdit { comparator: Option<String>, - log_number: Option<FileNum>, - prev_log_number: Option<FileNum>, - next_file_number: Option<FileNum>, - last_seq: Option<SequenceNumber>, + pub log_number: Option<FileNum>, + pub prev_log_number: Option<FileNum>, + pub next_file_number: Option<FileNum>, + pub last_seq: Option<SequenceNumber>, pub compaction_ptrs: Vec<CompactionPointer>, pub deleted: HashSet<(usize, FileNum)>,
--- a/src/version_set.rs Sun Sep 10 17:32:28 2017 +0200 +++ b/src/version_set.rs Mon Sep 11 18:11:45 2017 +0200 @@ -1,17 +1,20 @@ use cmp::{Cmp, InternalKeyCmp}; -use error::Result; +use env::Env; +use error::{err, Status, StatusCode, Result}; use key_types::{parse_internal_key, InternalKey, LookupKey, UserKey}; -use log::LogWriter; +use log::{LogWriter, LogReader}; use options::Options; use table_cache::TableCache; -use types::{share, NUM_LEVELS, FileNum, Shared}; +use types::{parse_file_name, share, NUM_LEVELS, FileNum, FileType, Shared}; use version::{FileMetaHandle, Version}; use version_edit::VersionEdit; use std::cmp::Ordering; use std::collections::HashSet; use std::io::Write; +use std::ops::Deref; +use std::path::Path; use std::rc::Rc; struct Compaction { @@ -129,22 +132,24 @@ cmp: InternalKeyCmp, cache: Shared<TableCache>, - next_file_num: u64, - manifest_num: u64, - last_seq: u64, - log_num: u64, - prev_log_num: u64, + pub next_file_num: u64, + pub manifest_num: u64, + pub last_seq: u64, + pub log_num: u64, + pub prev_log_num: u64, - log: Option<LogWriter<Box<Write>>>, versions: Vec<Shared<Version>>, current: Option<Shared<Version>>, compaction_ptrs: [Vec<u8>; NUM_LEVELS], + + descriptor_log: Option<LogWriter<Box<Write>>>, } impl VersionSet { // Note: opt.cmp should not contain an InternalKeyCmp at this point, but instead the default or // user-supplied one. pub fn new(db: String, opt: Options, cache: Shared<TableCache>) -> VersionSet { + let v = share(Version::new(cache.clone(), opt.cmp.clone())); VersionSet { dbname: db, cmp: InternalKeyCmp(opt.cmp.clone()), @@ -152,15 +157,15 @@ cache: cache, next_file_num: 2, - manifest_num: 2, + manifest_num: 0, last_seq: 0, log_num: 0, prev_log_num: 0, - log: None, - versions: vec![], - current: None, + versions: vec![v.clone()], + current: Some(v), compaction_ptrs: Default::default(), + descriptor_log: None, } } @@ -180,12 +185,34 @@ self.current.clone() } + fn num_level_bytes(&self, l: usize) -> usize { + assert!(l < NUM_LEVELS); + assert!(self.current.is_some()); + total_size(self.current.as_ref().unwrap().borrow().files[l].iter()) + } + fn num_level_files(&self, l: usize) -> usize { + assert!(l < NUM_LEVELS); + assert!(self.current.is_some()); + self.current.as_ref().unwrap().borrow().files[l].len() + } + fn add_version(&mut self, v: Version) { let sv = share(v); self.current = Some(sv.clone()); self.versions.push(sv); } + fn new_file_number(&mut self) -> FileNum { + self.next_file_num += 1; + self.next_file_num - 1 + } + + fn mark_file_number_used(&mut self, n: FileNum) { + if self.next_file_num <= n { + self.next_file_num = n + 1; + } + } + fn approximate_offset<'a>(&self, v: &Shared<Version>, key: InternalKey<'a>) -> usize { let mut offset = 0; for level in 0..NUM_LEVELS { @@ -304,8 +331,10 @@ self.compaction_ptrs[level] = largest; } - fn write_snapshot<W: Write>(&self, lw: &mut LogWriter<W>) -> Result<usize> { - assert!(self.current.is_some()); + /// write_snapshot writes the current version, with all files, to the manifest. + fn write_snapshot(&mut self) -> Result<usize> { + assert!(self.descriptor_log.is_some()); + let mut edit = VersionEdit::new(); edit.set_comparator_name(self.opt.cmp.id()); @@ -324,21 +353,208 @@ edit.add_file(level, f.borrow().clone()); } } - lw.add_record(&edit.encode()) + self.descriptor_log.as_mut().unwrap().add_record(&edit.encode()) } - fn log_and_apply(&mut self, edit: &mut VersionEdit) { + /// log_and_apply merges the given edit with the current state and generates a new version. It + /// writes the VersionEdit to the manifest. + fn log_and_apply(&mut self, mut edit: VersionEdit) -> Result<()> { assert!(self.current.is_some()); - edit.set_log_num(self.log_num); - edit.set_prev_log_num(self.prev_log_num); + if edit.log_number.is_none() { + edit.set_log_num(self.log_num); + } else { + assert!(edit.log_number.unwrap() >= self.log_num); + assert!(edit.log_number.unwrap() < self.next_file_num); + } + if edit.prev_log_number.is_none() { + edit.set_prev_log_num(self.prev_log_num); + } edit.set_next_file(self.next_file_num); edit.set_last_seq(self.last_seq); let mut v = Version::new(self.cache.clone(), self.opt.cmp.clone()); + { + let mut builder = Builder::new(); + builder.apply(&edit, &mut self.compaction_ptrs); + builder.save_to(&self.cmp, self.current.as_ref().unwrap(), &mut v); + } + self.finalize(&mut v); + + if self.descriptor_log.is_none() { + let descname = format!("{}/MANIFEST-{:06}", self.dbname, self.manifest_num); + edit.set_next_file(self.next_file_num); + self.descriptor_log = + Some(LogWriter::new(self.opt.env.open_writable_file(Path::new(&descname))?)); + self.write_snapshot()?; + } + + let encoded = edit.encode(); + if let Some(ref mut lw) = self.descriptor_log { + lw.add_record(&encoded)?; + lw.flush()?; + } + set_current_file(&self.opt.env, &self.dbname, self.manifest_num)?; + + self.add_version(v); + // log_number was set above. + self.log_num = edit.log_number.unwrap(); + self.prev_log_num = edit.prev_log_number.unwrap(); + + // TODO: Roll back written files if something went wrong. + Ok(()) + } + + fn finalize(&self, v: &mut Version) { + let mut best_lvl = None; + let mut best_score = None; + + for l in 0..NUM_LEVELS - 1 { + let score: f64; + if l == 0 { + score = v.files[l].len() as f64 / 4.0; + } else { + let mut max_bytes = 10.0 * f64::from(1 << 20); + for _ in 0..l - 1 { + max_bytes *= 10.0; + } + score = total_size(v.files[l].iter()) as f64 / max_bytes; + } + if let Some(ref mut b) = best_score { + if *b < score { + *b = score; + best_lvl = Some(l); + } + } else { + best_score = Some(score); + best_lvl = Some(l); + } + } + v.compaction_score = best_score; + v.compaction_level = best_lvl; + } + + /// recover recovers the state of a LevelDB instance from the files on disk. If recover() + /// returns true, proceed with calling log_and_apply(). + fn recover(&mut self) -> Result<bool> { + assert!(self.current.is_some()); + + let mut current = String::new(); + { + let mut f = + self.opt.env.open_sequential_file(Path::new(&format!("{}/CURRENT", self.dbname)))?; + f.read_to_string(&mut current)?; + } + if current.is_empty() || !current.ends_with('\n') { + return err(StatusCode::Corruption, + "current file is empty or has no newline"); + } + { + let len = current.len(); + current.truncate(len - 1); + } + + let descfilename = format!("{}/{}", self.dbname, current); let mut builder = Builder::new(); - builder.apply(&edit, &mut self.compaction_ptrs); + { + let mut descfile = self.opt.env.open_sequential_file(Path::new(&descfilename))?; + let mut logreader = LogReader::new(&mut descfile, + // checksum= + true, + // offset= + 0); + + let mut log_number = None; + let mut prev_log_number = None; + let mut next_file_number = None; + let mut last_seq = None; + + let mut buf = Vec::new(); + while let Ok(size) = logreader.read(&mut buf) { + if size == 0 { + break; + } + let edit = VersionEdit::decode_from(&buf)?; + builder.apply(&edit, &mut self.compaction_ptrs); + if let Some(ln) = edit.log_number { + log_number = Some(ln); + } + if let Some(pln) = edit.prev_log_number { + prev_log_number = Some(pln); + } + if let Some(nfn) = edit.next_file_number { + next_file_number = Some(nfn); + } + if let Some(ls) = edit.last_seq { + last_seq = Some(ls); + } + } + + if let Some(ln) = log_number { + self.log_num = ln; + self.mark_file_number_used(ln); + } else { + return err(StatusCode::Corruption, + "no meta-lognumber entry in descriptor"); + } + if let Some(nfn) = next_file_number { + self.next_file_num = nfn + 1; + } else { + return err(StatusCode::Corruption, + "no meta-next-file entry in descriptor"); + } + if let Some(ls) = last_seq { + self.last_seq = ls; + } else { + return err(StatusCode::Corruption, + "no last-sequence entry in descriptor"); + } + + if let Some(pln) = prev_log_number { + self.prev_log_num = pln + 1; + self.mark_file_number_used(pln + 1); + } else { + self.prev_log_num = 0; + self.mark_file_number_used(1); + } + } + + let mut v = Version::new(self.cache.clone(), self.opt.cmp.clone()); builder.save_to(&self.cmp, self.current.as_ref().unwrap(), &mut v); + self.finalize(&mut v); + self.add_version(v); + self.manifest_num = self.next_file_num - 1; + + Ok(self.reuse_manifest(&descfilename, ¤t)) + } + + /// reuse_manifest checks whether the current manifest can be reused. + fn reuse_manifest(&mut self, current_manifest_path: &str, current_manifest_base: &str) -> bool { + // The original doesn't reuse manifests; we do. + if let Ok((num, typ)) = parse_file_name(current_manifest_base) { + if typ != FileType::Descriptor { + return false; + } + if let Ok(size) = self.opt.env.size_of(Path::new(current_manifest_path)) { + if size > self.opt.max_file_size { + return false; + } + } else { + return false; + } + + assert!(self.descriptor_log.is_none()); + if let Ok(f) = self.opt.env.open_appendable_file(Path::new(current_manifest_path)) { + // TODO: Log this. + self.descriptor_log = Some(LogWriter::new(f)); + self.manifest_num = num; + return true; + } else { + // TODO: Log this. + return false; + } + } + false } } @@ -431,6 +647,24 @@ } } +fn set_current_file(env: &Box<Env>, dbname: &str, manifest_file_num: FileNum) -> Result<()> { + let manifest_base = format!("MANIFEST-{:06}", manifest_file_num); + let tempfile = format!("{}/{}.dbtmp", dbname, manifest_file_num); + + { + let mut f = env.open_writable_file(Path::new(&tempfile))?; + f.write(manifest_base.as_bytes())?; + f.write("\n".as_bytes())?; + } + let currentfile = format!("{}/CURRENT", dbname); + if let Err(e) = env.rename(Path::new(&tempfile), Path::new(¤tfile)) { + // ignore error. + env.delete(Path::new(&tempfile)).is_ok(); + return Err(Status::from(e)); + } + Ok(()) +} + /// sort_files_by_smallest sorts the list of files by the smallest keys of the files. fn sort_files_by_smallest<C: Cmp>(cmp: &C, files: &mut Vec<FileMetaHandle>) { files.sort_by(|a, b| cmp.cmp(&a.borrow().smallest, &b.borrow().smallest)) @@ -458,6 +692,16 @@ b = iter_b.next(); } } + + // Push cached elements. + if let Some(a_) = a { + out.push(a_); + } + if let Some(b_) = b { + out.push(b_); + } + + // Push remaining elements from either iterator. for a in iter_a { out.push(a); } @@ -499,9 +743,10 @@ #[cfg(test)] mod tests { use super::*; + use cmp::DefaultCmp; + use mem_env::MemEnv; + use types::FileMetaData; use version::testutil::make_version; - use cmp::DefaultCmp; - use types::FileMetaData; fn example_files() -> Vec<FileMetaHandle> { let mut f1 = FileMetaData::default(); @@ -540,6 +785,96 @@ } #[test] + fn test_version_set_log_and_apply() { + let (_, opt) = make_version(); + let mut vs = VersionSet::new("db".to_string(), + opt.clone(), + share(TableCache::new("db", opt.clone(), 100))); + + assert_eq!(2, vs.new_file_number()); + // Simulate NewDB + { + let mut ve = VersionEdit::new(); + ve.set_comparator_name("leveldb.BytewiseComparator"); + ve.set_log_num(10); + ve.set_next_file(20); + ve.set_last_seq(30); + + // Write first manifest to be recovered from. + let manifest = format!("db/MANIFEST-{:06}", 1); + let mffile = opt.env.open_writable_file(Path::new(&manifest)).unwrap(); + let mut lw = LogWriter::new(mffile); + lw.add_record(&ve.encode()).unwrap(); + lw.flush().unwrap(); + set_current_file(&opt.env.as_ref(), "db", 1).unwrap(); + } + + // Recover from new state. + { + vs.recover().unwrap(); + assert_eq!(10, vs.log_num); + assert_eq!(21, vs.next_file_num); + assert_eq!(30, vs.last_seq); + assert_eq!(0, vs.current.as_ref().unwrap().borrow().files[0].len()); + assert_eq!(0, vs.current.as_ref().unwrap().borrow().files[1].len()); + assert_eq!(35, vs.write_snapshot().unwrap()); + } + + // Simulate compaction by adding a file. + { + let mut ve = VersionEdit::new(); + ve.set_prev_log_num(1); + ve.set_log_num(11); + let mut fmd = FileMetaData::default(); + fmd.num = 21; + fmd.size = 123; + fmd.smallest = "abc".as_bytes().to_vec(); + fmd.largest = "def".as_bytes().to_vec(); + ve.add_file(1, fmd); + vs.log_and_apply(ve).unwrap(); + + assert!(opt.env.exists(Path::new("db/CURRENT")).unwrap()); + assert!(opt.env.exists(Path::new("db/MANIFEST-000001")).unwrap()); + // next_file_num and last_seq are untouched by log_and_apply + assert_eq!(21, vs.new_file_number()); + assert_eq!(22, vs.next_file_num); + assert_eq!(30, vs.last_seq); + // the following fields are touched by log_and_apply. + assert_eq!(1, vs.prev_log_num); + assert_eq!(11, vs.log_num); + + assert_eq!(3, vs.versions.len()); + // The previous "compaction" should have added one file to the first level in the + // current version. + assert_eq!(0, vs.current.as_ref().unwrap().borrow().files[0].len()); + assert_eq!(1, vs.current.as_ref().unwrap().borrow().files[1].len()); + assert_eq!(47, vs.write_snapshot().unwrap()); + } + } + + #[test] + fn test_version_set_utils() { + let (v, opt) = make_version(); + let mut vs = VersionSet::new("db".to_string(), + opt.clone(), + share(TableCache::new("db", opt, 100))); + vs.add_version(v); + // live_files() + assert_eq!(9, vs.live_files().len()); + // num_level_bytes() + assert_eq!(434, vs.num_level_bytes(0)); + assert_eq!(651, vs.num_level_bytes(1)); + assert_eq!(434, vs.num_level_bytes(2)); + // num_level_files() + assert_eq!(2, vs.num_level_files(0)); + assert_eq!(3, vs.num_level_files(1)); + assert_eq!(2, vs.num_level_files(2)); + // new_file_number() + assert_eq!(2, vs.new_file_number()); + assert_eq!(3, vs.new_file_number()); + } + + #[test] fn test_version_set_compaction() { let (v, opt) = make_version(); let mut vs = VersionSet::new("db".to_string(), @@ -564,17 +899,10 @@ LookupKey::new("fab".as_bytes(), 9000) .internal_key())); } - { - // live_files() - assert_eq!(9, vs.live_files().len()); - } - // The following tests reuse the same version set and verify that various compactions work // like they should. { time_test!("compaction tests"); - let v = vs.current().unwrap(); - // compact level 0 with a partial range. let from = LookupKey::new("000".as_bytes(), 1000); let to = LookupKey::new("ab".as_bytes(), 1010); @@ -610,9 +938,6 @@ let to = LookupKey::new("zzz".as_bytes(), 1010); let mid = LookupKey::new("abc".as_bytes(), 1010); let mut c = vs.compact_range(0, from.internal_key(), to.internal_key()).unwrap(); - println!("{:?}", v.borrow().files); - println!("{:?}", c.inputs); - println!("{:?}", c.grandparents); assert!(!c.should_stop_before(from.internal_key())); assert!(!c.should_stop_before(mid.internal_key())); assert!(!c.should_stop_before(to.internal_key()));