Mercurial > lbo > hg > leveldb-rs
changeset 288:9d4f1bd08c4f
db_impl: Implement recovery and initialization logic. Add basic sanity test.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Mon, 25 Sep 2017 20:33:44 +0200 |
parents | b71f43585267 |
children | 5b720657f8cc |
files | src/db_impl.rs |
diffstat | 1 files changed, 221 insertions(+), 4 deletions(-) [+] |
line wrap: on
line diff
--- a/src/db_impl.rs Mon Sep 25 20:33:11 2017 +0200 +++ b/src/db_impl.rs Mon Sep 25 20:33:44 2017 +0200 @@ -3,12 +3,12 @@ #![allow(unused_attributes)] -use cmp::InternalKeyCmp; +use cmp::{Cmp, InternalKeyCmp}; use env::{Env, FileLock}; use error::{err, StatusCode, Result}; use filter::{BoxedFilterPolicy, InternalFilterPolicy}; use infolog::Logger; -use log::LogWriter; +use log::{LogReader, LogWriter}; use key_types::{parse_internal_key, InternalKey, ValueType}; use memtable::MemTable; use options::Options; @@ -18,13 +18,16 @@ use types::{parse_file_name, share, FileMetaData, FileNum, FileType, LdbIterator, MAX_SEQUENCE_NUMBER, NUM_LEVELS, SequenceNumber, Shared}; use version_edit::VersionEdit; -use version_set::{Compaction, VersionSet}; +use version_set::{manifest_file_name, read_current_file, set_current_file, Compaction, VersionSet}; use version::Version; +use write_batch::WriteBatch; use std::cmp::Ordering; use std::io::{self, Write}; use std::mem; +use std::ops::Drop; use std::path::Path; +use std::rc::Rc; /// DB contains the actual database implemenation. As opposed to the original, this implementation /// is not concurrent (yet). @@ -49,9 +52,10 @@ } impl DB { + /// new initializes a new DB object, but doesn't touch disk. fn new(name: &str, mut opt: Options) -> DB { if opt.log.is_none() { - let log = open_info_log(opt.env.as_ref().as_ref(), &name); + let log = open_info_log(opt.env.as_ref().as_ref(), name); opt.log = Some(share(log)); } @@ -79,6 +83,181 @@ } } + /// Opens or creates* a new or existing database. + /// + /// *depending on the options set (create_if_missing, error_if_exists). + fn open(name: &str, opt: Options) -> Result<DB> { + let mut db = DB::new(name, opt); + let mut ve = VersionEdit::new(); + let save_manifest = db.recover(&mut ve)?; + + // Create log file if not existing. + if db.log.is_none() { + let lognum = db.vset.new_file_number(); + let logfile = + db.opt.env.open_writable_file(Path::new(&log_file_name(&db.name, lognum)))?; + ve.set_log_num(lognum); + db.log = Some(LogWriter::new(logfile)); + db.log_num = Some(lognum); + } + + if save_manifest { + ve.set_log_num(db.log_num.unwrap_or(0)); + db.vset.log_and_apply(ve)?; + } + + db.delete_obsolete_files()?; + db.maybe_do_compaction(); + + Ok(db) + } + + /// recover recovers from the existing state on disk. If the wrapped result is `true`, then + /// log_and_apply() should be called after recovery has finished. + fn recover(&mut self, ve: &mut VersionEdit) -> Result<bool> { + self.opt.env.mkdir(Path::new(&self.name)).is_ok(); + let lockfile = self.opt.env.lock(Path::new(&lock_file_name(&self.name)))?; + self.lock = Some(lockfile); + + if let Err(e) = read_current_file(&self.opt.env, &self.name) { + if e.code == StatusCode::NotFound && self.opt.create_if_missing { + self.initialize_db()?; + } else { + return err(StatusCode::InvalidArgument, + "database does not exist and create_if_missing is false"); + } + } else if self.opt.error_if_exists { + return err(StatusCode::InvalidArgument, + "database already exists and error_if_exists is true"); + } + + // If save_manifest is true, the existing manifest is reused and we should log_and_apply() + // later. + let mut save_manifest = self.vset.recover()?; + + // Recover from all log files not in the descriptor. + let mut max_seq = 0; + let filenames = self.opt.env.children(Path::new(&self.name))?; + let mut expected = self.vset.live_files(); + let mut log_files = vec![]; + + for file in &filenames { + if let Ok((num, typ)) = parse_file_name(&file) { + expected.remove(&num); + if typ == FileType::Log && num >= self.vset.log_num { + log_files.push(num); + } + } + } + if !expected.is_empty() { + log!(self.opt.log, "Missing at least these files: {:?}", expected); + return err(StatusCode::Corruption, "missing live files (see log)"); + } + + log_files.sort(); + for i in 0..log_files.len() { + let (save_manifest_, max_seq_) = + self.recover_log_file(log_files[i], i == log_files.len() - 1, ve)?; + if save_manifest_ { + save_manifest = true; + } + if max_seq_ > max_seq { + max_seq = max_seq_; + } + self.vset.mark_file_number_used(log_files[i]); + } + + if self.vset.last_seq < max_seq { + self.vset.last_seq = max_seq; + } + + Ok(save_manifest) + } + + /// initialize_db initializes a new database. + fn initialize_db(&mut self) -> Result<()> { + let mut ve = VersionEdit::new(); + ve.set_comparator_name(self.opt.cmp.id()); + ve.set_log_num(0); + ve.set_next_file(2); + ve.set_last_seq(0); + + { + let manifest = manifest_file_name(&self.name, 1); + let manifest_file = self.opt.env.open_writable_file(Path::new(&manifest))?; + let mut lw = LogWriter::new(manifest_file); + lw.add_record(&ve.encode())?; + lw.flush()?; + } + set_current_file(&self.opt.env, &self.name, 1) + } + + + fn recover_log_file(&mut self, + log_num: FileNum, + is_last: bool, + ve: &mut VersionEdit) + -> Result<(bool, SequenceNumber)> { + let filename = log_file_name(&self.name, log_num); + let logfile = self.opt.env.open_sequential_file(Path::new(&filename))?; + let cmp: Rc<Box<Cmp>> = Rc::new(Box::new(self.cmp.clone())); + + let mut logreader = LogReader::new(logfile, + // checksum= + true); + log!(self.opt.log, "Recovering log file {}", filename); + let mut scratch = vec![]; + let mut mem = MemTable::new(cmp.clone()); + let mut batch = WriteBatch::new(); + + let mut compactions = 0; + let mut max_seq = 0; + let mut save_manifest = false; + + while let Ok(len) = logreader.read(&mut scratch) { + if len == 0 { + break; + } + if len < 12 { + log!(self.opt.log, + "corruption in log file {:06}: record shorter than 12B", + log_num); + continue; + } + + batch.set_contents(&scratch); + batch.insert_into_memtable(batch.sequence(), &mut mem); + + let last_seq = batch.sequence() + batch.count() as u64 - 1; + if last_seq > max_seq { + max_seq = last_seq + } + if mem.approx_mem_usage() > self.opt.write_buffer_size { + compactions += 1; + save_manifest = true; + self.write_l0_table(&mem, ve, None)?; + mem = MemTable::new(cmp.clone()); + } + batch.clear(); + } + + // Check if we can reuse the last log file. + if self.opt.reuse_logs && is_last && compactions == 0 { + assert!(self.log.is_none()); + let oldsize = self.opt.env.size_of(Path::new(&filename))?; + let oldfile = self.opt.env.open_appendable_file(Path::new(&filename))?; + let lw = LogWriter::new_with_off(oldfile, oldsize); + self.log = Some(lw); + self.log_num = Some(log_num); + self.mem = mem; + } else { + // Log is not reused, so write out the accumulated memtable. + self.write_l0_table(&mem, ve, None)?; + } + + Ok((save_manifest, max_seq)) + } + fn add_stats(&mut self, level: usize, cs: CompactionStats) { assert!(level < NUM_LEVELS); self.cstats[level].add(cs); @@ -439,6 +618,14 @@ } } +impl Drop for DB { + fn drop(&mut self) { + if let Some(l) = self.lock.take() { + self.opt.env.unlock(l); + } + } +} + struct CompactionState { compaction: Compaction, smallest_seq: SequenceNumber, @@ -525,6 +712,10 @@ format!("{}/{:06}.log", db, num) } +fn lock_file_name(db: &str) -> String { + format!("{}/LOCK", db) +} + /// open_info_log opens an info log file in the given database. It transparently returns a /// /dev/null logger in case the open fails. fn open_info_log<E: Env + ?Sized>(env: &E, db: &str) -> Logger { @@ -592,6 +783,32 @@ } #[test] + fn test_db_impl_init() { + let opt = options::for_test(); + let env = opt.env.clone(); + + { + let db = DB::open("db", opt.clone()).unwrap(); + + assert!(env.exists(Path::new("db/CURRENT")).unwrap()); + assert!(env.exists(Path::new("db/MANIFEST-000001")).unwrap()); + assert!(env.exists(Path::new("db/LOCK")).unwrap()); + assert!(env.exists(Path::new("db/000003.log")).unwrap()); + } + + { + let mut opt = opt; + opt.reuse_manifest = false; + let mut db = DB::open("db", opt).unwrap(); + + // Obsolete manifest is deleted. + assert!(!env.exists(Path::new("db/MANIFEST-000001")).unwrap()); + assert!(env.exists(Path::new("db/000005.log")).unwrap()); + println!("{:?}", env.children(Path::new("db/")).unwrap()); + } + } + + #[test] fn test_db_impl_build_table() { let mut opt = options::for_test(); opt.block_size = 128;