changeset 234:950bdfc410bc

version_set: Implement log_and_apply, recover, and add tests.
author Lewin Bormann <lbo@spheniscida.de>
date Mon, 11 Sep 2017 18:11:45 +0200
parents 6f1af366dc97
children c5a6d5da8380
files src/log.rs src/types.rs src/version.rs src/version_edit.rs src/version_set.rs
diffstat 5 files changed, 443 insertions(+), 37 deletions(-) [+]
line wrap: on
line diff
--- a/src/log.rs	Sun Sep 10 17:32:28 2017 +0200
+++ b/src/log.rs	Mon Sep 11 18:11:45 2017 +0200
@@ -100,6 +100,11 @@
         self.current_block_offset += s;
         Ok(s)
     }
+
+    pub fn flush(&mut self) -> Result<()> {
+        self.dst.flush()?;
+        Ok(())
+    }
 }
 
 
--- a/src/types.rs	Sun Sep 10 17:32:28 2017 +0200
+++ b/src/types.rs	Mon Sep 11 18:11:45 2017 +0200
@@ -1,7 +1,12 @@
 //! A collection of fundamental and/or simple types used by other modules
 
+use error::{err, Result, StatusCode};
+
 use std::cell::RefCell;
 use std::rc::Rc;
+use std::str::FromStr;
+
+pub const NUM_LEVELS: usize = 7;
 
 #[derive(Debug, PartialOrd, PartialEq)]
 pub enum ValueType {
@@ -101,4 +106,71 @@
     pub largest: Vec<u8>,
 }
 
-pub const NUM_LEVELS: usize = 7;
+#[derive(Debug, Clone, PartialEq)]
+pub enum FileType {
+    Log,
+    DBLock,
+    Table,
+    Descriptor,
+    Current,
+    Temp,
+    InfoLog,
+}
+
+pub fn parse_file_name(f: &str) -> Result<(FileNum, FileType)> {
+    if f == "CURRENT" {
+        return Ok((0, FileType::Current));
+    } else if f == "LOCK" {
+        return Ok((0, FileType::DBLock));
+    } else if f == "LOG" || f == "LOG.old" {
+        return Ok((0, FileType::InfoLog));
+    } else if f.starts_with("MANIFEST-") {
+        if let Some(ix) = f.find('-') {
+            if let Ok(num) = FileNum::from_str_radix(&f[ix + 1..], 10) {
+                return Ok((num, FileType::Descriptor));
+            }
+            return err(StatusCode::InvalidArgument,
+                       "manifest file number is invalid");
+        }
+        return err(StatusCode::InvalidArgument, "manifest file has no dash");
+    } else if let Some(ix) = f.find('.') {
+        // 00012345.log 00123.sst ...
+        if let Ok(num) = FileNum::from_str_radix(&f[0..ix], 10) {
+            let typ = match &f[ix + 1..] {
+                "sst" | "ldb" => FileType::Table,
+                "dbtmp" => FileType::Temp,
+                _ => {
+                    return err(StatusCode::InvalidArgument,
+                               "unknown numbered file extension")
+                }
+            };
+            return Ok((num, typ));
+        }
+        return err(StatusCode::InvalidArgument,
+                   "invalid file number for table or temp file");
+    }
+    err(StatusCode::InvalidArgument, "unknown file type")
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_types_parse_file_name() {
+        for c in &[("CURRENT", (0, FileType::Current)),
+                   ("LOCK", (0, FileType::DBLock)),
+                   ("LOG", (0, FileType::InfoLog)),
+                   ("LOG.old", (0, FileType::InfoLog)),
+                   ("MANIFEST-01234", (1234, FileType::Descriptor)),
+                   ("001122.sst", (1122, FileType::Table)),
+                   ("001122.ldb", (1122, FileType::Table)),
+                   ("001122.dbtmp", (1122, FileType::Temp))] {
+            assert_eq!(parse_file_name(c.0).unwrap(), c.1);
+        }
+        assert!(parse_file_name("xyz.LOCK").is_err());
+        assert!(parse_file_name("01a.sst").is_err());
+        assert!(parse_file_name("0011.abc").is_err());
+        assert!(parse_file_name("MANIFEST-trolol").is_err());
+    }
+}
--- a/src/version.rs	Sun Sep 10 17:32:28 2017 +0200
+++ b/src/version.rs	Mon Sep 11 18:11:45 2017 +0200
@@ -27,6 +27,8 @@
 
     pub file_to_compact: Option<FileMetaHandle>,
     pub file_to_compact_lvl: usize,
+    pub compaction_score: Option<f64>,
+    pub compaction_level: Option<usize>,
 }
 
 impl Version {
@@ -37,6 +39,8 @@
             files: Default::default(),
             file_to_compact: None,
             file_to_compact_lvl: 0,
+            compaction_score: None,
+            compaction_level: None,
         }
     }
 
--- a/src/version_edit.rs	Sun Sep 10 17:32:28 2017 +0200
+++ b/src/version_edit.rs	Mon Sep 11 18:11:45 2017 +0200
@@ -61,10 +61,10 @@
 /// Manages changes to the set of managed SSTables and logfiles.
 pub struct VersionEdit {
     comparator: Option<String>,
-    log_number: Option<FileNum>,
-    prev_log_number: Option<FileNum>,
-    next_file_number: Option<FileNum>,
-    last_seq: Option<SequenceNumber>,
+    pub log_number: Option<FileNum>,
+    pub prev_log_number: Option<FileNum>,
+    pub next_file_number: Option<FileNum>,
+    pub last_seq: Option<SequenceNumber>,
 
     pub compaction_ptrs: Vec<CompactionPointer>,
     pub deleted: HashSet<(usize, FileNum)>,
--- a/src/version_set.rs	Sun Sep 10 17:32:28 2017 +0200
+++ b/src/version_set.rs	Mon Sep 11 18:11:45 2017 +0200
@@ -1,17 +1,20 @@
 
 use cmp::{Cmp, InternalKeyCmp};
-use error::Result;
+use env::Env;
+use error::{err, Status, StatusCode, Result};
 use key_types::{parse_internal_key, InternalKey, LookupKey, UserKey};
-use log::LogWriter;
+use log::{LogWriter, LogReader};
 use options::Options;
 use table_cache::TableCache;
-use types::{share, NUM_LEVELS, FileNum, Shared};
+use types::{parse_file_name, share, NUM_LEVELS, FileNum, FileType, Shared};
 use version::{FileMetaHandle, Version};
 use version_edit::VersionEdit;
 
 use std::cmp::Ordering;
 use std::collections::HashSet;
 use std::io::Write;
+use std::ops::Deref;
+use std::path::Path;
 use std::rc::Rc;
 
 struct Compaction {
@@ -129,22 +132,24 @@
     cmp: InternalKeyCmp,
     cache: Shared<TableCache>,
 
-    next_file_num: u64,
-    manifest_num: u64,
-    last_seq: u64,
-    log_num: u64,
-    prev_log_num: u64,
+    pub next_file_num: u64,
+    pub manifest_num: u64,
+    pub last_seq: u64,
+    pub log_num: u64,
+    pub prev_log_num: u64,
 
-    log: Option<LogWriter<Box<Write>>>,
     versions: Vec<Shared<Version>>,
     current: Option<Shared<Version>>,
     compaction_ptrs: [Vec<u8>; NUM_LEVELS],
+
+    descriptor_log: Option<LogWriter<Box<Write>>>,
 }
 
 impl VersionSet {
     // Note: opt.cmp should not contain an InternalKeyCmp at this point, but instead the default or
     // user-supplied one.
     pub fn new(db: String, opt: Options, cache: Shared<TableCache>) -> VersionSet {
+        let v = share(Version::new(cache.clone(), opt.cmp.clone()));
         VersionSet {
             dbname: db,
             cmp: InternalKeyCmp(opt.cmp.clone()),
@@ -152,15 +157,15 @@
             cache: cache,
 
             next_file_num: 2,
-            manifest_num: 2,
+            manifest_num: 0,
             last_seq: 0,
             log_num: 0,
             prev_log_num: 0,
 
-            log: None,
-            versions: vec![],
-            current: None,
+            versions: vec![v.clone()],
+            current: Some(v),
             compaction_ptrs: Default::default(),
+            descriptor_log: None,
         }
     }
 
@@ -180,12 +185,34 @@
         self.current.clone()
     }
 
+    fn num_level_bytes(&self, l: usize) -> usize {
+        assert!(l < NUM_LEVELS);
+        assert!(self.current.is_some());
+        total_size(self.current.as_ref().unwrap().borrow().files[l].iter())
+    }
+    fn num_level_files(&self, l: usize) -> usize {
+        assert!(l < NUM_LEVELS);
+        assert!(self.current.is_some());
+        self.current.as_ref().unwrap().borrow().files[l].len()
+    }
+
     fn add_version(&mut self, v: Version) {
         let sv = share(v);
         self.current = Some(sv.clone());
         self.versions.push(sv);
     }
 
+    fn new_file_number(&mut self) -> FileNum {
+        self.next_file_num += 1;
+        self.next_file_num - 1
+    }
+
+    fn mark_file_number_used(&mut self, n: FileNum) {
+        if self.next_file_num <= n {
+            self.next_file_num = n + 1;
+        }
+    }
+
     fn approximate_offset<'a>(&self, v: &Shared<Version>, key: InternalKey<'a>) -> usize {
         let mut offset = 0;
         for level in 0..NUM_LEVELS {
@@ -304,8 +331,10 @@
         self.compaction_ptrs[level] = largest;
     }
 
-    fn write_snapshot<W: Write>(&self, lw: &mut LogWriter<W>) -> Result<usize> {
-        assert!(self.current.is_some());
+    /// write_snapshot writes the current version, with all files, to the manifest.
+    fn write_snapshot(&mut self) -> Result<usize> {
+        assert!(self.descriptor_log.is_some());
+
         let mut edit = VersionEdit::new();
         edit.set_comparator_name(self.opt.cmp.id());
 
@@ -324,21 +353,208 @@
                 edit.add_file(level, f.borrow().clone());
             }
         }
-        lw.add_record(&edit.encode())
+        self.descriptor_log.as_mut().unwrap().add_record(&edit.encode())
     }
 
-    fn log_and_apply(&mut self, edit: &mut VersionEdit) {
+    /// log_and_apply merges the given edit with the current state and generates a new version. It
+    /// writes the VersionEdit to the manifest.
+    fn log_and_apply(&mut self, mut edit: VersionEdit) -> Result<()> {
         assert!(self.current.is_some());
 
-        edit.set_log_num(self.log_num);
-        edit.set_prev_log_num(self.prev_log_num);
+        if edit.log_number.is_none() {
+            edit.set_log_num(self.log_num);
+        } else {
+            assert!(edit.log_number.unwrap() >= self.log_num);
+            assert!(edit.log_number.unwrap() < self.next_file_num);
+        }
+        if edit.prev_log_number.is_none() {
+            edit.set_prev_log_num(self.prev_log_num);
+        }
         edit.set_next_file(self.next_file_num);
         edit.set_last_seq(self.last_seq);
 
         let mut v = Version::new(self.cache.clone(), self.opt.cmp.clone());
+        {
+            let mut builder = Builder::new();
+            builder.apply(&edit, &mut self.compaction_ptrs);
+            builder.save_to(&self.cmp, self.current.as_ref().unwrap(), &mut v);
+        }
+        self.finalize(&mut v);
+
+        if self.descriptor_log.is_none() {
+            let descname = format!("{}/MANIFEST-{:06}", self.dbname, self.manifest_num);
+            edit.set_next_file(self.next_file_num);
+            self.descriptor_log =
+                Some(LogWriter::new(self.opt.env.open_writable_file(Path::new(&descname))?));
+            self.write_snapshot()?;
+        }
+
+        let encoded = edit.encode();
+        if let Some(ref mut lw) = self.descriptor_log {
+            lw.add_record(&encoded)?;
+            lw.flush()?;
+        }
+        set_current_file(&self.opt.env, &self.dbname, self.manifest_num)?;
+
+        self.add_version(v);
+        // log_number was set above.
+        self.log_num = edit.log_number.unwrap();
+        self.prev_log_num = edit.prev_log_number.unwrap();
+
+        // TODO: Roll back written files if something went wrong.
+        Ok(())
+    }
+
+    fn finalize(&self, v: &mut Version) {
+        let mut best_lvl = None;
+        let mut best_score = None;
+
+        for l in 0..NUM_LEVELS - 1 {
+            let score: f64;
+            if l == 0 {
+                score = v.files[l].len() as f64 / 4.0;
+            } else {
+                let mut max_bytes = 10.0 * f64::from(1 << 20);
+                for _ in 0..l - 1 {
+                    max_bytes *= 10.0;
+                }
+                score = total_size(v.files[l].iter()) as f64 / max_bytes;
+            }
+            if let Some(ref mut b) = best_score {
+                if *b < score {
+                    *b = score;
+                    best_lvl = Some(l);
+                }
+            } else {
+                best_score = Some(score);
+                best_lvl = Some(l);
+            }
+        }
+        v.compaction_score = best_score;
+        v.compaction_level = best_lvl;
+    }
+
+    /// recover recovers the state of a LevelDB instance from the files on disk. If recover()
+    /// returns true, proceed with calling log_and_apply().
+    fn recover(&mut self) -> Result<bool> {
+        assert!(self.current.is_some());
+
+        let mut current = String::new();
+        {
+            let mut f =
+                self.opt.env.open_sequential_file(Path::new(&format!("{}/CURRENT", self.dbname)))?;
+            f.read_to_string(&mut current)?;
+        }
+        if current.is_empty() || !current.ends_with('\n') {
+            return err(StatusCode::Corruption,
+                       "current file is empty or has no newline");
+        }
+        {
+            let len = current.len();
+            current.truncate(len - 1);
+        }
+
+        let descfilename = format!("{}/{}", self.dbname, current);
         let mut builder = Builder::new();
-        builder.apply(&edit, &mut self.compaction_ptrs);
+        {
+            let mut descfile = self.opt.env.open_sequential_file(Path::new(&descfilename))?;
+            let mut logreader = LogReader::new(&mut descfile,
+                                               // checksum=
+                                               true,
+                                               // offset=
+                                               0);
+
+            let mut log_number = None;
+            let mut prev_log_number = None;
+            let mut next_file_number = None;
+            let mut last_seq = None;
+
+            let mut buf = Vec::new();
+            while let Ok(size) = logreader.read(&mut buf) {
+                if size == 0 {
+                    break;
+                }
+                let edit = VersionEdit::decode_from(&buf)?;
+                builder.apply(&edit, &mut self.compaction_ptrs);
+                if let Some(ln) = edit.log_number {
+                    log_number = Some(ln);
+                }
+                if let Some(pln) = edit.prev_log_number {
+                    prev_log_number = Some(pln);
+                }
+                if let Some(nfn) = edit.next_file_number {
+                    next_file_number = Some(nfn);
+                }
+                if let Some(ls) = edit.last_seq {
+                    last_seq = Some(ls);
+                }
+            }
+
+            if let Some(ln) = log_number {
+                self.log_num = ln;
+                self.mark_file_number_used(ln);
+            } else {
+                return err(StatusCode::Corruption,
+                           "no meta-lognumber entry in descriptor");
+            }
+            if let Some(nfn) = next_file_number {
+                self.next_file_num = nfn + 1;
+            } else {
+                return err(StatusCode::Corruption,
+                           "no meta-next-file entry in descriptor");
+            }
+            if let Some(ls) = last_seq {
+                self.last_seq = ls;
+            } else {
+                return err(StatusCode::Corruption,
+                           "no last-sequence entry in descriptor");
+            }
+
+            if let Some(pln) = prev_log_number {
+                self.prev_log_num = pln + 1;
+                self.mark_file_number_used(pln + 1);
+            } else {
+                self.prev_log_num = 0;
+                self.mark_file_number_used(1);
+            }
+        }
+
+        let mut v = Version::new(self.cache.clone(), self.opt.cmp.clone());
         builder.save_to(&self.cmp, self.current.as_ref().unwrap(), &mut v);
+        self.finalize(&mut v);
+        self.add_version(v);
+        self.manifest_num = self.next_file_num - 1;
+
+        Ok(self.reuse_manifest(&descfilename, &current))
+    }
+
+    /// reuse_manifest checks whether the current manifest can be reused.
+    fn reuse_manifest(&mut self, current_manifest_path: &str, current_manifest_base: &str) -> bool {
+        // The original doesn't reuse manifests; we do.
+        if let Ok((num, typ)) = parse_file_name(current_manifest_base) {
+            if typ != FileType::Descriptor {
+                return false;
+            }
+            if let Ok(size) = self.opt.env.size_of(Path::new(current_manifest_path)) {
+                if size > self.opt.max_file_size {
+                    return false;
+                }
+            } else {
+                return false;
+            }
+
+            assert!(self.descriptor_log.is_none());
+            if let Ok(f) = self.opt.env.open_appendable_file(Path::new(current_manifest_path)) {
+                // TODO: Log this.
+                self.descriptor_log = Some(LogWriter::new(f));
+                self.manifest_num = num;
+                return true;
+            } else {
+                // TODO: Log this.
+                return false;
+            }
+        }
+        false
     }
 }
 
@@ -431,6 +647,24 @@
     }
 }
 
+fn set_current_file(env: &Box<Env>, dbname: &str, manifest_file_num: FileNum) -> Result<()> {
+    let manifest_base = format!("MANIFEST-{:06}", manifest_file_num);
+    let tempfile = format!("{}/{}.dbtmp", dbname, manifest_file_num);
+
+    {
+        let mut f = env.open_writable_file(Path::new(&tempfile))?;
+        f.write(manifest_base.as_bytes())?;
+        f.write("\n".as_bytes())?;
+    }
+    let currentfile = format!("{}/CURRENT", dbname);
+    if let Err(e) = env.rename(Path::new(&tempfile), Path::new(&currentfile)) {
+        // ignore error.
+        env.delete(Path::new(&tempfile)).is_ok();
+        return Err(Status::from(e));
+    }
+    Ok(())
+}
+
 /// sort_files_by_smallest sorts the list of files by the smallest keys of the files.
 fn sort_files_by_smallest<C: Cmp>(cmp: &C, files: &mut Vec<FileMetaHandle>) {
     files.sort_by(|a, b| cmp.cmp(&a.borrow().smallest, &b.borrow().smallest))
@@ -458,6 +692,16 @@
             b = iter_b.next();
         }
     }
+
+    // Push cached elements.
+    if let Some(a_) = a {
+        out.push(a_);
+    }
+    if let Some(b_) = b {
+        out.push(b_);
+    }
+
+    // Push remaining elements from either iterator.
     for a in iter_a {
         out.push(a);
     }
@@ -499,9 +743,10 @@
 #[cfg(test)]
 mod tests {
     use super::*;
+    use cmp::DefaultCmp;
+    use mem_env::MemEnv;
+    use types::FileMetaData;
     use version::testutil::make_version;
-    use cmp::DefaultCmp;
-    use types::FileMetaData;
 
     fn example_files() -> Vec<FileMetaHandle> {
         let mut f1 = FileMetaData::default();
@@ -540,6 +785,96 @@
     }
 
     #[test]
+    fn test_version_set_log_and_apply() {
+        let (_, opt) = make_version();
+        let mut vs = VersionSet::new("db".to_string(),
+                                     opt.clone(),
+                                     share(TableCache::new("db", opt.clone(), 100)));
+
+        assert_eq!(2, vs.new_file_number());
+        // Simulate NewDB
+        {
+            let mut ve = VersionEdit::new();
+            ve.set_comparator_name("leveldb.BytewiseComparator");
+            ve.set_log_num(10);
+            ve.set_next_file(20);
+            ve.set_last_seq(30);
+
+            // Write first manifest to be recovered from.
+            let manifest = format!("db/MANIFEST-{:06}", 1);
+            let mffile = opt.env.open_writable_file(Path::new(&manifest)).unwrap();
+            let mut lw = LogWriter::new(mffile);
+            lw.add_record(&ve.encode()).unwrap();
+            lw.flush().unwrap();
+            set_current_file(&opt.env.as_ref(), "db", 1).unwrap();
+        }
+
+        // Recover from new state.
+        {
+            vs.recover().unwrap();
+            assert_eq!(10, vs.log_num);
+            assert_eq!(21, vs.next_file_num);
+            assert_eq!(30, vs.last_seq);
+            assert_eq!(0, vs.current.as_ref().unwrap().borrow().files[0].len());
+            assert_eq!(0, vs.current.as_ref().unwrap().borrow().files[1].len());
+            assert_eq!(35, vs.write_snapshot().unwrap());
+        }
+
+        // Simulate compaction by adding a file.
+        {
+            let mut ve = VersionEdit::new();
+            ve.set_prev_log_num(1);
+            ve.set_log_num(11);
+            let mut fmd = FileMetaData::default();
+            fmd.num = 21;
+            fmd.size = 123;
+            fmd.smallest = "abc".as_bytes().to_vec();
+            fmd.largest = "def".as_bytes().to_vec();
+            ve.add_file(1, fmd);
+            vs.log_and_apply(ve).unwrap();
+
+            assert!(opt.env.exists(Path::new("db/CURRENT")).unwrap());
+            assert!(opt.env.exists(Path::new("db/MANIFEST-000001")).unwrap());
+            // next_file_num and last_seq are untouched by log_and_apply
+            assert_eq!(21, vs.new_file_number());
+            assert_eq!(22, vs.next_file_num);
+            assert_eq!(30, vs.last_seq);
+            // the following fields are touched by log_and_apply.
+            assert_eq!(1, vs.prev_log_num);
+            assert_eq!(11, vs.log_num);
+
+            assert_eq!(3, vs.versions.len());
+            // The previous "compaction" should have added one file to the first level in the
+            // current version.
+            assert_eq!(0, vs.current.as_ref().unwrap().borrow().files[0].len());
+            assert_eq!(1, vs.current.as_ref().unwrap().borrow().files[1].len());
+            assert_eq!(47, vs.write_snapshot().unwrap());
+        }
+    }
+
+    #[test]
+    fn test_version_set_utils() {
+        let (v, opt) = make_version();
+        let mut vs = VersionSet::new("db".to_string(),
+                                     opt.clone(),
+                                     share(TableCache::new("db", opt, 100)));
+        vs.add_version(v);
+        // live_files()
+        assert_eq!(9, vs.live_files().len());
+        // num_level_bytes()
+        assert_eq!(434, vs.num_level_bytes(0));
+        assert_eq!(651, vs.num_level_bytes(1));
+        assert_eq!(434, vs.num_level_bytes(2));
+        // num_level_files()
+        assert_eq!(2, vs.num_level_files(0));
+        assert_eq!(3, vs.num_level_files(1));
+        assert_eq!(2, vs.num_level_files(2));
+        // new_file_number()
+        assert_eq!(2, vs.new_file_number());
+        assert_eq!(3, vs.new_file_number());
+    }
+
+    #[test]
     fn test_version_set_compaction() {
         let (v, opt) = make_version();
         let mut vs = VersionSet::new("db".to_string(),
@@ -564,17 +899,10 @@
                                              LookupKey::new("fab".as_bytes(), 9000)
                                                  .internal_key()));
         }
-        {
-            // live_files()
-            assert_eq!(9, vs.live_files().len());
-        }
-
         // The following tests reuse the same version set and verify that various compactions work
         // like they should.
         {
             time_test!("compaction tests");
-            let v = vs.current().unwrap();
-
             // compact level 0 with a partial range.
             let from = LookupKey::new("000".as_bytes(), 1000);
             let to = LookupKey::new("ab".as_bytes(), 1010);
@@ -610,9 +938,6 @@
             let to = LookupKey::new("zzz".as_bytes(), 1010);
             let mid = LookupKey::new("abc".as_bytes(), 1010);
             let mut c = vs.compact_range(0, from.internal_key(), to.internal_key()).unwrap();
-            println!("{:?}", v.borrow().files);
-            println!("{:?}", c.inputs);
-            println!("{:?}", c.grandparents);
             assert!(!c.should_stop_before(from.internal_key()));
             assert!(!c.should_stop_before(mid.internal_key()));
             assert!(!c.should_stop_before(to.internal_key()));