changeset 325:f39d34cf05e5

db_impl: Make VersionSet shared
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 01 Oct 2017 20:36:48 +0200
parents 4637c87d8b47
children 540851b54189
files src/db_impl.rs
diffstat 1 files changed, 66 insertions(+), 64 deletions(-) [+]
line wrap: on
line diff
--- a/src/db_impl.rs	Sun Oct 01 14:17:49 2017 +0000
+++ b/src/db_impl.rs	Sun Oct 01 20:36:48 2017 +0200
@@ -3,6 +3,8 @@
 
 #![allow(unused_attributes)]
 
+use db_iter::DBIterator;
+
 use cmp::{Cmp, InternalKeyCmp};
 use env::{Env, FileLock};
 use error::{err, Status, StatusCode, Result};
@@ -11,6 +13,7 @@
 use log::{LogReader, LogWriter};
 use key_types::{parse_internal_key, InternalKey, LookupKey, ValueType};
 use memtable::MemTable;
+use merging_iter::MergingIter;
 use options::Options;
 use snapshot::{Snapshot, SnapshotList};
 use table_builder::TableBuilder;
@@ -25,7 +28,7 @@
 use std::cmp::Ordering;
 use std::io::{self, Write};
 use std::mem;
-use std::ops::Drop;
+use std::ops::{DerefMut, Drop};
 use std::path::Path;
 use std::rc::Rc;
 
@@ -35,7 +38,7 @@
     name: String,
     lock: Option<FileLock>,
 
-    cmp: InternalKeyCmp,
+    internal_cmp: Rc<Box<Cmp>>,
     fpol: InternalFilterPolicy<BoxedFilterPolicy>,
     opt: Options,
 
@@ -45,7 +48,7 @@
     log: Option<LogWriter<Box<Write>>>,
     log_num: Option<FileNum>,
     cache: Shared<TableCache>,
-    vset: VersionSet,
+    vset: Shared<VersionSet>,
     snaps: SnapshotList,
 
     cstats: [CompactionStats; NUM_LEVELS],
@@ -67,7 +70,7 @@
         DB {
             name: name.to_string(),
             lock: None,
-            cmp: InternalKeyCmp(opt.cmp.clone()),
+            internal_cmp: Rc::new(Box::new(InternalKeyCmp(opt.cmp.clone()))),
             fpol: InternalFilterPolicy::new(opt.filter_policy.clone()),
 
             mem: MemTable::new(opt.cmp.clone()),
@@ -78,13 +81,17 @@
             log: None,
             log_num: None,
             cache: cache,
-            vset: vset,
+            vset: share(vset),
             snaps: SnapshotList::new(),
 
             cstats: Default::default(),
         }
     }
 
+    fn current(&self) -> Shared<Version> {
+        self.vset.borrow().current()
+    }
+
     /// Opens or creates* a new or existing database.
     ///
     /// *depending on the options set (create_if_missing, error_if_exists).
@@ -95,7 +102,7 @@
 
         // Create log file if an old one is not being reused.
         if db.log.is_none() {
-            let lognum = db.vset.new_file_number();
+            let lognum = db.vset.borrow_mut().new_file_number();
             let logfile =
                 db.opt.env.open_writable_file(Path::new(&log_file_name(&db.name, lognum)))?;
             ve.set_log_num(lognum);
@@ -105,7 +112,7 @@
 
         if save_manifest {
             ve.set_log_num(db.log_num.unwrap_or(0));
-            db.vset.log_and_apply(ve)?;
+            db.vset.borrow_mut().log_and_apply(ve)?;
         }
 
         db.delete_obsolete_files()?;
@@ -151,18 +158,18 @@
 
         // If save_manifest is true, the existing manifest is reused and we should log_and_apply()
         // later.
-        let mut save_manifest = self.vset.recover()?;
+        let mut save_manifest = self.vset.borrow_mut().recover()?;
 
         // Recover from all log files not in the descriptor.
         let mut max_seq = 0;
         let filenames = self.opt.env.children(Path::new(&self.name))?;
-        let mut expected = self.vset.live_files();
+        let mut expected = self.vset.borrow().live_files();
         let mut log_files = vec![];
 
         for file in &filenames {
             if let Ok((num, typ)) = parse_file_name(&file) {
                 expected.remove(&num);
-                if typ == FileType::Log && num >= self.vset.log_num {
+                if typ == FileType::Log && num >= self.vset.borrow().log_num {
                     log_files.push(num);
                 }
             }
@@ -182,11 +189,11 @@
             if max_seq_ > max_seq {
                 max_seq = max_seq_;
             }
-            self.vset.mark_file_number_used(log_files[i]);
+            self.vset.borrow_mut().mark_file_number_used(log_files[i]);
         }
 
-        if self.vset.last_seq < max_seq {
-            self.vset.last_seq = max_seq;
+        if self.vset.borrow().last_seq < max_seq {
+            self.vset.borrow_mut().last_seq = max_seq;
         }
 
         Ok(save_manifest)
@@ -264,19 +271,19 @@
 
     /// delete_obsolete_files removes files that are no longer needed from the file system.
     fn delete_obsolete_files(&mut self) -> Result<()> {
-        let files = self.vset.live_files();
+        let files = self.vset.borrow().live_files();
         let filenames = self.opt.env.children(Path::new(&self.name))?;
         for name in filenames {
             if let Ok((num, typ)) = parse_file_name(&name) {
                 log!(self.opt.log, "{} {:?}", num, typ);
                 match typ {
                     FileType::Log => {
-                        if num >= self.vset.log_num {
+                        if num >= self.vset.borrow().log_num {
                             continue;
                         }
                     }
                     FileType::Descriptor => {
-                        if num >= self.vset.manifest_num {
+                        if num >= self.vset.borrow().manifest_num {
                             continue;
                         }
                     }
@@ -352,14 +359,14 @@
         assert!(self.log.is_some());
         let entries = batch.count() as u64;
         let log = self.log.as_mut().unwrap();
-        let next = self.vset.last_seq + 1;
+        let next = self.vset.borrow().last_seq + 1;
 
         batch.insert_into_memtable(next, &mut self.mem);
         log.add_record(&batch.encode(next))?;
         if sync {
             log.flush()?;
         }
-        self.vset.last_seq += entries;
+        self.vset.borrow_mut().last_seq += entries;
         Ok(())
     }
 
@@ -374,7 +381,7 @@
     // READ //
 
     fn get_internal(&mut self, seq: SequenceNumber, key: &[u8]) -> Result<Option<Vec<u8>>> {
-        let current = self.vset.current();
+        let current = self.current();
         let mut current = current.borrow_mut();
 
         let lkey = LookupKey::new(key, seq);
@@ -401,17 +408,12 @@
     /// get_at reads the value for a given key at or before snapshot. It returns Ok(None) if the
     /// entry wasn't found, and Err(_) if an error occurred.
     pub fn get_at(&mut self, snapshot: &Snapshot, key: &[u8]) -> Result<Option<Vec<u8>>> {
-        if let Some(seq) = self.snaps.sequence_at(snapshot) {
-            self.get_internal(seq, key)
-        } else {
-            err(StatusCode::InvalidArgument,
-                "get_at: snapshot does not exist")
-        }
+        self.get_internal(snapshot.sequence(), key)
     }
 
     /// get is a simplified version of get_at(), translating errors to None.
     pub fn get(&mut self, key: &[u8]) -> Option<Vec<u8>> {
-        let seq = self.vset.last_seq;
+        let seq = self.vset.borrow().last_seq;
         if let Ok(v) = self.get_internal(seq, key) {
             v
         } else {
@@ -429,7 +431,7 @@
 
     /// Trigger a compaction based on where this key is located in the different levels.
     fn record_read_sample<'a>(&mut self, k: InternalKey<'a>) {
-        let current = self.vset.current();
+        let current = self.current();
         if current.borrow_mut().record_read_sample(k) {
             if let Err(e) = self.maybe_do_compaction() {
                 log!(self.opt.log, "record_read_sample: compaction failed: {}", e);
@@ -439,15 +441,6 @@
 }
 
 impl DB {
-    // SNAPSHOTS //
-
-    /// Returns a snapshot at the current state. The snapshot is released automatically on Drop.
-    pub fn get_snapshot(&mut self) -> Snapshot {
-        self.snaps.new_snapshot(self.vset.last_seq)
-    }
-}
-
-impl DB {
     // COMPACTIONS //
     /// make_room_for_write checks if the memtable has become too large, and triggers a compaction
     /// if it's the case.
@@ -456,10 +449,10 @@
             Ok(())
         } else {
             // Create new memtable.
-            let logn = self.vset.new_file_number();
+            let logn = self.vset.borrow_mut().new_file_number();
             let logf = self.opt.env.open_writable_file(Path::new(&log_file_name(&self.name, logn)));
             if logf.is_err() {
-                self.vset.reuse_file_number(logn);
+                self.vset.borrow_mut().reuse_file_number(logn);
                 Err(logf.err().unwrap())
             } else {
                 self.log = Some(LogWriter::new(logf.unwrap()));
@@ -475,7 +468,7 @@
 
     /// maybe_do_compaction starts a blocking compaction if it makes sense.
     fn maybe_do_compaction(&mut self) -> Result<()> {
-        if self.imm.is_none() && !self.vset.needs_compaction() {
+        if self.imm.is_none() && !self.vset.borrow().needs_compaction() {
             return Ok(());
         }
         self.start_compaction()
@@ -489,7 +482,8 @@
             return self.compact_memtable();
         }
 
-        let compaction = self.vset.pick_compaction();
+
+        let compaction = self.vset.borrow_mut().pick_compaction();
         if compaction.is_none() {
             return Ok(());
         }
@@ -505,7 +499,8 @@
             compaction.edit().delete_file(level, num);
             compaction.edit().add_file(level + 1, f);
 
-            if let Err(e) = self.vset.log_and_apply(compaction.into_edit()) {
+            let r = self.vset.borrow_mut().log_and_apply(compaction.into_edit());
+            if let Err(e) = r {
                 log!(self.opt.log, "trivial move failed: {}", e);
                 Err(e)
             } else {
@@ -515,7 +510,9 @@
                      size,
                      level,
                      level + 1);
-                log!(self.opt.log, "Summary: {}", self.vset.current_summary());
+                log!(self.opt.log,
+                     "Summary: {}",
+                     self.vset.borrow().current_summary());
                 Ok(())
             }
         } else {
@@ -527,7 +524,7 @@
             self.install_compaction_results(state)?;
             log!(self.opt.log,
                  "Compaction finished: {}",
-                 self.vset.current_summary());
+                 self.vset.borrow().current_summary());
 
             self.delete_obsolete_files()
         }
@@ -535,8 +532,9 @@
 
     fn compact_memtable(&mut self) -> Result<()> {
         assert!(self.imm.is_some());
+
         let mut ve = VersionEdit::new();
-        let base = self.vset.current();
+        let base = self.current();
 
         let imm = self.imm.take().unwrap();
         if let Err(e) = self.write_l0_table(&imm, &mut ve, Some(&base.borrow())) {
@@ -544,7 +542,7 @@
             return Err(e);
         }
         ve.set_log_num(self.log_num.unwrap_or(0));
-        self.vset.log_and_apply(ve)?;
+        self.vset.borrow_mut().log_and_apply(ve)?;
         if let Err(e) = self.delete_obsolete_files() {
             log!(self.opt.log, "Error deleting obsolete files: {}", e);
         }
@@ -558,14 +556,14 @@
                       base: Option<&Version>)
                       -> Result<()> {
         let start_ts = self.opt.env.micros();
-        let num = self.vset.new_file_number();
+        let num = self.vset.borrow_mut().new_file_number();
         log!(self.opt.log, "Start write of L0 table {:06}", num);
         let fmd = build_table(&self.name, &self.opt, memt.iter(), num)?;
         log!(self.opt.log, "L0 table {:06} has {} bytes", num, fmd.size);
 
         // Wrote empty table.
         if fmd.size == 0 {
-            self.vset.reuse_file_number(num);
+            self.vset.borrow_mut().reuse_file_number(num);
             return Ok(());
         }
 
@@ -603,16 +601,16 @@
              cs.compaction.level(),
              cs.compaction.num_inputs(1),
              cs.compaction.level() + 1);
-        assert!(self.vset.num_level_files(cs.compaction.level()) > 0);
+        assert!(self.vset.borrow().num_level_files(cs.compaction.level()) > 0);
         assert!(cs.builder.is_none());
 
         cs.smallest_seq = if self.snaps.empty() {
-            self.vset.last_seq
+            self.vset.borrow().last_seq
         } else {
             self.snaps.oldest()
         };
 
-        let mut input = self.vset.make_input_iterator(&cs.compaction);
+        let mut input = self.vset.borrow().make_input_iterator(&cs.compaction);
         input.seek_to_first();
 
         let (mut key, mut val) = (vec![], vec![]);
@@ -654,7 +652,7 @@
             }
 
             if cs.builder.is_none() {
-                let fnum = self.vset.new_file_number();
+                let fnum = self.vset.borrow_mut().new_file_number();
                 let mut fmd = FileMetaData::default();
                 fmd.num = fnum;
 
@@ -714,8 +712,10 @@
         cs.current_output().size = bytes;
 
         if entries > 0 {
-            // Verify that table can be used.
-            if let Err(e) = self.cache.borrow_mut().get_table(output_num) {
+            // Verify that table can be used. (Separating get_table() because borrowing in an if
+            // let expression is dangerous).
+            let r = self.cache.borrow_mut().get_table(output_num);
+            if let Err(e) = r {
                 log!(self.opt.log, "New table can't be read: {}", e);
                 return Err(e);
             }
@@ -741,7 +741,7 @@
         for output in &cs.outputs {
             cs.compaction.edit().add_file(level + 1, output.clone());
         }
-        self.vset.log_and_apply(cs.compaction.into_edit())
+        self.vset.borrow_mut().log_and_apply(cs.compaction.into_edit())
     }
 }
 
@@ -979,7 +979,7 @@
             assert!(env.exists(Path::new("db/000003.ldb")).unwrap());
             assert!(env.exists(Path::new("db/000004.log")).unwrap());
             // Check that entry exists and is correct. Phew, long call chain!
-            let current = db.vset.current();
+            let current = db.current();
             log!(opt.log, "files: {:?}", current.borrow().files);
             assert_eq!("def".as_bytes(),
                        current.borrow_mut()
@@ -1130,9 +1130,11 @@
     fn test_db_impl_get_from_table() {
         let mut db = build_db();
 
-        assert_eq!(26, db.vset.last_seq);
+        assert_eq!(26, db.vset.borrow().last_seq);
 
+        let old_ss = db.get_snapshot();
         db.put("xyz".as_bytes(), "123".as_bytes()).unwrap();
+        assert!(db.get_at(&old_ss, "xyz".as_bytes()).unwrap().is_none());
 
         // memtable get
         assert_eq!("123".as_bytes(),
@@ -1187,7 +1189,7 @@
         assert!(db.opt.env.exists(Path::new("db/000004.ldb")).unwrap());
 
         {
-            let v = db.vset.current();
+            let v = db.current();
             let mut v = v.borrow_mut();
             v.file_to_compact = Some(v.files[2][0].clone());
             v.file_to_compact_lvl = 2;
@@ -1196,7 +1198,7 @@
         db.maybe_do_compaction().unwrap();
 
         {
-            let v = db.vset.current();
+            let v = db.current();
             let v = v.borrow_mut();
             assert_eq!(1, v.files[3].len());
         }
@@ -1231,8 +1233,8 @@
         v.compaction_level = Some(1);
 
         let mut db = DB::new("db", opt.clone());
-        db.vset.add_version(v);
-        db.vset.next_file_num = 10;
+        db.vset.borrow_mut().add_version(v);
+        db.vset.borrow_mut().next_file_num = 10;
 
         db.start_compaction().unwrap();
 
@@ -1240,7 +1242,7 @@
         assert!(opt.env.exists(Path::new("db/000010.ldb")).unwrap());
         assert_eq!(375, opt.env.size_of(Path::new("db/000010.ldb")).unwrap());
 
-        let v = db.vset.current();
+        let v = db.current();
         assert_eq!(0, v.borrow().files[1].len());
         assert_eq!(2, v.borrow().files[2].len());
     }
@@ -1254,8 +1256,8 @@
         v.file_to_compact_lvl = 2;
 
         let mut db = DB::new("db", opt.clone());
-        db.vset.add_version(v);
-        db.vset.next_file_num = 10;
+        db.vset.borrow_mut().add_version(v);
+        db.vset.borrow_mut().next_file_num = 10;
 
         db.start_compaction().unwrap();
 
@@ -1263,7 +1265,7 @@
         assert!(!opt.env.exists(Path::new("db/000010.ldb")).unwrap());
         assert_eq!(218, opt.env.size_of(Path::new("db/000006.ldb")).unwrap());
 
-        let v = db.vset.current();
+        let v = db.current();
         assert_eq!(1, v.borrow().files[2].len());
         assert_eq!(3, v.borrow().files[3].len());
     }