changeset 288:9d4f1bd08c4f

db_impl: Implement recovery and initialization logic. Add basic sanity test.
author Lewin Bormann <lbo@spheniscida.de>
date Mon, 25 Sep 2017 20:33:44 +0200
parents b71f43585267
children 5b720657f8cc
files src/db_impl.rs
diffstat 1 files changed, 221 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/src/db_impl.rs	Mon Sep 25 20:33:11 2017 +0200
+++ b/src/db_impl.rs	Mon Sep 25 20:33:44 2017 +0200
@@ -3,12 +3,12 @@
 
 #![allow(unused_attributes)]
 
-use cmp::InternalKeyCmp;
+use cmp::{Cmp, InternalKeyCmp};
 use env::{Env, FileLock};
 use error::{err, StatusCode, Result};
 use filter::{BoxedFilterPolicy, InternalFilterPolicy};
 use infolog::Logger;
-use log::LogWriter;
+use log::{LogReader, LogWriter};
 use key_types::{parse_internal_key, InternalKey, ValueType};
 use memtable::MemTable;
 use options::Options;
@@ -18,13 +18,16 @@
 use types::{parse_file_name, share, FileMetaData, FileNum, FileType, LdbIterator,
             MAX_SEQUENCE_NUMBER, NUM_LEVELS, SequenceNumber, Shared};
 use version_edit::VersionEdit;
-use version_set::{Compaction, VersionSet};
+use version_set::{manifest_file_name, read_current_file, set_current_file, Compaction, VersionSet};
 use version::Version;
+use write_batch::WriteBatch;
 
 use std::cmp::Ordering;
 use std::io::{self, Write};
 use std::mem;
+use std::ops::Drop;
 use std::path::Path;
+use std::rc::Rc;
 
 /// DB contains the actual database implemenation. As opposed to the original, this implementation
 /// is not concurrent (yet).
@@ -49,9 +52,10 @@
 }
 
 impl DB {
+    /// new initializes a new DB object, but doesn't touch disk.
     fn new(name: &str, mut opt: Options) -> DB {
         if opt.log.is_none() {
-            let log = open_info_log(opt.env.as_ref().as_ref(), &name);
+            let log = open_info_log(opt.env.as_ref().as_ref(), name);
             opt.log = Some(share(log));
         }
 
@@ -79,6 +83,181 @@
         }
     }
 
+    /// Opens or creates* a new or existing database.
+    ///
+    /// *depending on the options set (create_if_missing, error_if_exists).
+    fn open(name: &str, opt: Options) -> Result<DB> {
+        let mut db = DB::new(name, opt);
+        let mut ve = VersionEdit::new();
+        let save_manifest = db.recover(&mut ve)?;
+
+        // Create log file if not existing.
+        if db.log.is_none() {
+            let lognum = db.vset.new_file_number();
+            let logfile =
+                db.opt.env.open_writable_file(Path::new(&log_file_name(&db.name, lognum)))?;
+            ve.set_log_num(lognum);
+            db.log = Some(LogWriter::new(logfile));
+            db.log_num = Some(lognum);
+        }
+
+        if save_manifest {
+            ve.set_log_num(db.log_num.unwrap_or(0));
+            db.vset.log_and_apply(ve)?;
+        }
+
+        db.delete_obsolete_files()?;
+        db.maybe_do_compaction();
+
+        Ok(db)
+    }
+
+    /// recover recovers from the existing state on disk. If the wrapped result is `true`, then
+    /// log_and_apply() should be called after recovery has finished.
+    fn recover(&mut self, ve: &mut VersionEdit) -> Result<bool> {
+        self.opt.env.mkdir(Path::new(&self.name)).is_ok();
+        let lockfile = self.opt.env.lock(Path::new(&lock_file_name(&self.name)))?;
+        self.lock = Some(lockfile);
+
+        if let Err(e) = read_current_file(&self.opt.env, &self.name) {
+            if e.code == StatusCode::NotFound && self.opt.create_if_missing {
+                self.initialize_db()?;
+            } else {
+                return err(StatusCode::InvalidArgument,
+                           "database does not exist and create_if_missing is false");
+            }
+        } else if self.opt.error_if_exists {
+            return err(StatusCode::InvalidArgument,
+                       "database already exists and error_if_exists is true");
+        }
+
+        // If save_manifest is true, the existing manifest is reused and we should log_and_apply()
+        // later.
+        let mut save_manifest = self.vset.recover()?;
+
+        // Recover from all log files not in the descriptor.
+        let mut max_seq = 0;
+        let filenames = self.opt.env.children(Path::new(&self.name))?;
+        let mut expected = self.vset.live_files();
+        let mut log_files = vec![];
+
+        for file in &filenames {
+            if let Ok((num, typ)) = parse_file_name(&file) {
+                expected.remove(&num);
+                if typ == FileType::Log && num >= self.vset.log_num {
+                    log_files.push(num);
+                }
+            }
+        }
+        if !expected.is_empty() {
+            log!(self.opt.log, "Missing at least these files: {:?}", expected);
+            return err(StatusCode::Corruption, "missing live files (see log)");
+        }
+
+        log_files.sort();
+        for i in 0..log_files.len() {
+            let (save_manifest_, max_seq_) =
+                self.recover_log_file(log_files[i], i == log_files.len() - 1, ve)?;
+            if save_manifest_ {
+                save_manifest = true;
+            }
+            if max_seq_ > max_seq {
+                max_seq = max_seq_;
+            }
+            self.vset.mark_file_number_used(log_files[i]);
+        }
+
+        if self.vset.last_seq < max_seq {
+            self.vset.last_seq = max_seq;
+        }
+
+        Ok(save_manifest)
+    }
+
+    /// initialize_db initializes a new database.
+    fn initialize_db(&mut self) -> Result<()> {
+        let mut ve = VersionEdit::new();
+        ve.set_comparator_name(self.opt.cmp.id());
+        ve.set_log_num(0);
+        ve.set_next_file(2);
+        ve.set_last_seq(0);
+
+        {
+            let manifest = manifest_file_name(&self.name, 1);
+            let manifest_file = self.opt.env.open_writable_file(Path::new(&manifest))?;
+            let mut lw = LogWriter::new(manifest_file);
+            lw.add_record(&ve.encode())?;
+            lw.flush()?;
+        }
+        set_current_file(&self.opt.env, &self.name, 1)
+    }
+
+
+    fn recover_log_file(&mut self,
+                        log_num: FileNum,
+                        is_last: bool,
+                        ve: &mut VersionEdit)
+                        -> Result<(bool, SequenceNumber)> {
+        let filename = log_file_name(&self.name, log_num);
+        let logfile = self.opt.env.open_sequential_file(Path::new(&filename))?;
+        let cmp: Rc<Box<Cmp>> = Rc::new(Box::new(self.cmp.clone()));
+
+        let mut logreader = LogReader::new(logfile,
+                                           // checksum=
+                                           true);
+        log!(self.opt.log, "Recovering log file {}", filename);
+        let mut scratch = vec![];
+        let mut mem = MemTable::new(cmp.clone());
+        let mut batch = WriteBatch::new();
+
+        let mut compactions = 0;
+        let mut max_seq = 0;
+        let mut save_manifest = false;
+
+        while let Ok(len) = logreader.read(&mut scratch) {
+            if len == 0 {
+                break;
+            }
+            if len < 12 {
+                log!(self.opt.log,
+                     "corruption in log file {:06}: record shorter than 12B",
+                     log_num);
+                continue;
+            }
+
+            batch.set_contents(&scratch);
+            batch.insert_into_memtable(batch.sequence(), &mut mem);
+
+            let last_seq = batch.sequence() + batch.count() as u64 - 1;
+            if last_seq > max_seq {
+                max_seq = last_seq
+            }
+            if mem.approx_mem_usage() > self.opt.write_buffer_size {
+                compactions += 1;
+                save_manifest = true;
+                self.write_l0_table(&mem, ve, None)?;
+                mem = MemTable::new(cmp.clone());
+            }
+            batch.clear();
+        }
+
+        // Check if we can reuse the last log file.
+        if self.opt.reuse_logs && is_last && compactions == 0 {
+            assert!(self.log.is_none());
+            let oldsize = self.opt.env.size_of(Path::new(&filename))?;
+            let oldfile = self.opt.env.open_appendable_file(Path::new(&filename))?;
+            let lw = LogWriter::new_with_off(oldfile, oldsize);
+            self.log = Some(lw);
+            self.log_num = Some(log_num);
+            self.mem = mem;
+        } else {
+            // Log is not reused, so write out the accumulated memtable.
+            self.write_l0_table(&mem, ve, None)?;
+        }
+
+        Ok((save_manifest, max_seq))
+    }
+
     fn add_stats(&mut self, level: usize, cs: CompactionStats) {
         assert!(level < NUM_LEVELS);
         self.cstats[level].add(cs);
@@ -439,6 +618,14 @@
     }
 }
 
+impl Drop for DB {
+    fn drop(&mut self) {
+        if let Some(l) = self.lock.take() {
+            self.opt.env.unlock(l);
+        }
+    }
+}
+
 struct CompactionState {
     compaction: Compaction,
     smallest_seq: SequenceNumber,
@@ -525,6 +712,10 @@
     format!("{}/{:06}.log", db, num)
 }
 
+fn lock_file_name(db: &str) -> String {
+    format!("{}/LOCK", db)
+}
+
 /// open_info_log opens an info log file in the given database. It transparently returns a
 /// /dev/null logger in case the open fails.
 fn open_info_log<E: Env + ?Sized>(env: &E, db: &str) -> Logger {
@@ -592,6 +783,32 @@
     }
 
     #[test]
+    fn test_db_impl_init() {
+        let opt = options::for_test();
+        let env = opt.env.clone();
+
+        {
+            let db = DB::open("db", opt.clone()).unwrap();
+
+            assert!(env.exists(Path::new("db/CURRENT")).unwrap());
+            assert!(env.exists(Path::new("db/MANIFEST-000001")).unwrap());
+            assert!(env.exists(Path::new("db/LOCK")).unwrap());
+            assert!(env.exists(Path::new("db/000003.log")).unwrap());
+        }
+
+        {
+            let mut opt = opt;
+            opt.reuse_manifest = false;
+            let mut db = DB::open("db", opt).unwrap();
+
+            // Obsolete manifest is deleted.
+            assert!(!env.exists(Path::new("db/MANIFEST-000001")).unwrap());
+            assert!(env.exists(Path::new("db/000005.log")).unwrap());
+            println!("{:?}", env.children(Path::new("db/")).unwrap());
+        }
+    }
+
+    #[test]
     fn test_db_impl_build_table() {
         let mut opt = options::for_test();
         opt.block_size = 128;