changeset 211:eade0c7f1ac7

version: use new Shared<> type.
author Lewin Bormann <lbo@spheniscida.de>
date Wed, 06 Sep 2017 20:11:07 +0200
parents 6e1e7c828b69
children 2d3f7bcaef6e
files src/version.rs
diffstat 1 files changed, 78 insertions(+), 61 deletions(-) [+]
line wrap: on
line diff
--- a/src/version.rs	Wed Sep 06 20:09:25 2017 +0200
+++ b/src/version.rs	Wed Sep 06 20:11:07 2017 +0200
@@ -1,17 +1,19 @@
 use cmp::{Cmp, InternalKeyCmp};
 use error::Result;
 use key_types::{parse_internal_key, InternalKey, LookupKey, UserKey};
-use table_cache::TableCache;
+use table_cache::SharedTableCache;
 use table_reader::TableIterator;
-use types::{MAX_SEQUENCE_NUMBER, FileMetaData, LdbIterator};
-use version_set::{self, NUM_LEVELS};
+use types::{MAX_SEQUENCE_NUMBER, FileMetaData, LdbIterator, Shared};
+use version_set::NUM_LEVELS;
 
 use std::cmp::Ordering;
 use std::default::Default;
 use std::rc::Rc;
 
-/// FileMetaHandle is a reference-counted FileMetaData object.
-type FileMetaHandle = Rc<FileMetaData>;
+/// FileMetaHandle is a reference-counted FileMetaData object with interior mutability. This is
+/// necessary to provide a shared metadata container that can be modified while referenced by e.g.
+/// multiple versions.
+type FileMetaHandle = Shared<FileMetaData>;
 
 /// Contains statistics about seeks occurred in a file.
 struct GetStats {
@@ -20,7 +22,7 @@
 }
 
 struct Version {
-    table_cache: Rc<TableCache>,
+    table_cache: SharedTableCache,
     user_cmp: Rc<Box<Cmp>>,
     files: [Vec<FileMetaHandle>; NUM_LEVELS],
 
@@ -29,7 +31,7 @@
 }
 
 impl Version {
-    fn new(cache: Rc<TableCache>, ucmp: Rc<Box<Cmp>>) -> Version {
+    fn new(cache: SharedTableCache, ucmp: Rc<Box<Cmp>>) -> Version {
         Version {
             table_cache: cache,
             user_cmp: ucmp,
@@ -58,19 +60,21 @@
 
             if level == 0 {
                 to_search.reserve(files.len());
-                for f in files {
-                    let (fsmallest, flargest) = (parse_internal_key(&(*f).smallest).2,
-                                                 parse_internal_key(&(*f).largest).2);
+                for f_ in files {
+                    let f = f_.borrow();
+                    let (fsmallest, flargest) = (parse_internal_key(&f.smallest).2,
+                                                 parse_internal_key(&f.largest).2);
                     if self.user_cmp.cmp(ukey, fsmallest) >= Ordering::Equal &&
                        self.user_cmp.cmp(ukey, flargest) <= Ordering::Equal {
-                        to_search.push(f.clone());
+                        to_search.push(f_.clone());
                     }
                 }
-                to_search.sort_by(|a, b| a.num.cmp(&b.num));
+                to_search.sort_by(|a, b| a.borrow().num.cmp(&b.borrow().num));
             } else {
                 let ix = find_file(&icmp, files, ikey);
                 if ix < files.len() {
-                    let fsmallest = parse_internal_key(&(*files[ix]).smallest).2;
+                    let f = files[ix].borrow();
+                    let fsmallest = parse_internal_key(&f.smallest).2;
                     if self.user_cmp.cmp(ukey, fsmallest) >= Ordering::Equal {
                         to_search.push(files[ix].clone());
                     }
@@ -91,7 +95,7 @@
                 last_read_level = level;
                 last_read = Some(f.clone());
 
-                let val = Rc::get_mut(&mut self.table_cache).unwrap().get((*f).num, ikey)?;
+                let val = self.table_cache.borrow_mut().get(f.borrow().num, ikey)?;
                 return Ok(val.map(|v| (v, stats)));
             }
         }
@@ -121,7 +125,7 @@
                 last_read_level = level;
                 last_read = Some(f.clone());
 
-                let val = Rc::get_mut(&mut self.table_cache).unwrap().get((*f).num, ikey)?;
+                let val = self.table_cache.borrow_mut().get(f.borrow().num, ikey)?;
                 return Ok(val.map(|v| (v, stats)));
             }
         }
@@ -136,22 +140,24 @@
 
         let files = &self.files[0];
         levels[0].reserve(files.len());
-        for f in files {
-            let (fsmallest, flargest) = (parse_internal_key(&(*f).smallest).2,
-                                         parse_internal_key(&(*f).largest).2);
+        for f_ in files {
+            let f = f_.borrow();
+            let (fsmallest, flargest) = (parse_internal_key(&f.smallest).2,
+                                         parse_internal_key(&f.largest).2);
             if self.user_cmp.cmp(ukey, fsmallest) >= Ordering::Equal &&
                self.user_cmp.cmp(ukey, flargest) <= Ordering::Equal {
-                levels[0].push(f.clone());
+                levels[0].push(f_.clone());
             }
         }
-        levels[0].sort_by(|a, b| a.num.cmp(&b.num));
+        levels[0].sort_by(|a, b| a.borrow().num.cmp(&b.borrow().num));
 
         let icmp = InternalKeyCmp(self.user_cmp.clone());
         for level in 1..NUM_LEVELS {
             let files = &self.files[level];
             let ix = find_file(&icmp, files, ikey);
             if ix < files.len() {
-                let fsmallest = parse_internal_key(&(*files[ix]).smallest).2;
+                let f = files[ix].borrow();
+                let fsmallest = parse_internal_key(&f.smallest).2;
                 if self.user_cmp.cmp(ukey, fsmallest) >= Ordering::Equal {
                     levels[level].push(files[ix].clone());
                 }
@@ -192,12 +198,11 @@
     }
 
     fn update_stats(&mut self, stats: GetStats) -> bool {
-        if let Some(mut file) = stats.file {
+        if let Some(file) = stats.file {
             {
-                let mut f = Rc::get_mut(&mut file).unwrap();
-                f.allowed_seeks -= 1;
+                file.borrow_mut().allowed_seeks -= 1;
             }
-            if (*file).allowed_seeks < 1 && self.file_to_compact.is_some() {
+            if file.borrow().allowed_seeks < 1 && self.file_to_compact.is_none() {
                 self.file_to_compact = Some(file.clone());
                 self.file_to_compact_lvl = stats.level;
                 return true;
@@ -208,7 +213,11 @@
 
     /// overlap_in_level returns true if the specified level's files overlap the range [smallest;
     /// largest].
-    fn overlap_in_level(&self, level: usize, smallest: &[u8], largest: &[u8]) -> bool {
+    fn overlap_in_level<'a, 'b>(&self,
+                                level: usize,
+                                smallest: UserKey<'a>,
+                                largest: UserKey<'a>)
+                                -> bool {
         assert!(level < NUM_LEVELS);
         if level == 0 {
             some_file_overlaps_range_disjoint(&InternalKeyCmp(self.user_cmp.clone()),
@@ -238,19 +247,24 @@
                 (Some((newubegin, newuend)), _) => {
                     ubegin = newubegin;
                     uend = newuend;
-                },
-                (None, result) => return result
+                }
+                (None, result) => return result,
             }
         }
 
         // the actual search happens in this inner function. This is done to enhance the control
         // flow. It takes the smallest and largest user keys and returns a new pair of user keys if
         // the search range should be expanded, or a list of overlapping files.
-        fn do_search(myself: &Version, level: usize, ubegin: Vec<u8>, uend: Vec<u8>) -> (Option<(Vec<u8>, Vec<u8>)>, Vec<FileMetaHandle>) {
+        fn do_search(myself: &Version,
+                     level: usize,
+                     ubegin: Vec<u8>,
+                     uend: Vec<u8>)
+                     -> (Option<(Vec<u8>, Vec<u8>)>, Vec<FileMetaHandle>) {
             let mut inputs = vec![];
-            for f in myself.files[level].iter() {
-                let ((_, _, fsmallest), (_, _, flargest)) = (parse_internal_key(&(*f).smallest),
-                                                             parse_internal_key(&(*f).largest));
+            for f_ in myself.files[level].iter() {
+                let f = f_.borrow();
+                let ((_, _, fsmallest), (_, _, flargest)) = (parse_internal_key(&f.smallest),
+                                                             parse_internal_key(&f.largest));
                 // Skip files that are not overlapping.
                 if !ubegin.is_empty() && myself.user_cmp.cmp(flargest, &ubegin) == Ordering::Less {
                     continue;
@@ -258,17 +272,17 @@
                           myself.user_cmp.cmp(fsmallest, &uend) == Ordering::Greater {
                     continue;
                 } else {
-                    inputs.push(f.clone());
+                    inputs.push(f_.clone());
                     // In level 0, files may overlap each other. Check if the new file begins
                     // before ubegin or ends after uend, and expand the range, if so. Then, restart
                     // the search.
                     if level == 0 {
                         if !ubegin.is_empty() &&
                            myself.user_cmp.cmp(fsmallest, &ubegin) == Ordering::Less {
-                            return (Some((fsmallest.to_vec(), uend)), inputs)
+                            return (Some((fsmallest.to_vec(), uend)), inputs);
                         } else if !uend.is_empty() &&
                                   myself.user_cmp.cmp(flargest, &uend) == Ordering::Greater {
-                            return (Some((ubegin, flargest.to_vec())), inputs)
+                            return (Some((ubegin, flargest.to_vec())), inputs);
                         }
                     }
                 }
@@ -294,7 +308,7 @@
     // NOTE: Maybe we need to change this to Rc to support modification of the file set after
     // creation of the iterator. Versions should be immutable, though.
     files: Vec<FileMetaHandle>,
-    cache: Rc<TableCache>,
+    cache: SharedTableCache,
     cmp: InternalKeyCmp,
 
     current: Option<TableIterator>,
@@ -310,15 +324,15 @@
                 return false;
             }
         }
-        if let Ok(tbl) = Rc::get_mut(&mut self.cache)
-            .unwrap()
-            .get_table((*self.files[self.current_ix]).num) {
-            self.current_ix += 1;
+        if let Ok(tbl) = self.cache
+            .borrow_mut()
+            .get_table(self.files[self.current_ix].borrow().num) {
             self.current = Some(tbl.iter());
-            self.advance()
         } else {
-            false
+            return false;
         }
+        self.current_ix += 1;
+        self.advance()
     }
     fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool {
         if let Some(ref t) = self.current {
@@ -330,7 +344,7 @@
     fn seek(&mut self, key: &[u8]) {
         let ix = find_file(&self.cmp, &self.files, key);
         assert!(ix < self.files.len());
-        if let Ok(tbl) = Rc::get_mut(&mut self.cache).unwrap().get_table((*self.files[ix]).num) {
+        if let Ok(tbl) = self.cache.borrow_mut().get_table(self.files[ix].borrow().num) {
             let mut iter = tbl.iter();
             iter.seek(key);
             if iter.valid() {
@@ -355,9 +369,9 @@
             } else if self.current_ix > 0 {
                 let f = &self.files[self.current_ix - 1];
                 // Find previous table, seek to last entry.
-                if let Ok(tbl) = Rc::get_mut(&mut self.cache).unwrap().get_table((*f).num) {
+                if let Ok(tbl) = self.cache.borrow_mut().get_table(f.borrow().num) {
                     let mut iter = tbl.iter();
-                    iter.seek(&(*f).largest);
+                    iter.seek(&f.borrow().largest);
                     // The saved largest key must be in the table.
                     assert!(iter.valid());
                     self.current_ix -= 1;
@@ -372,13 +386,15 @@
 
 /// key_is_after_file returns true if the given user key is larger than the largest key in f.
 fn key_is_after_file<'a>(cmp: &InternalKeyCmp, key: UserKey<'a>, f: &FileMetaHandle) -> bool {
-    let ulargest = parse_internal_key(&(*f).largest).2;
+    let f = f.borrow();
+    let ulargest = parse_internal_key(&f.largest).2;
     !key.is_empty() && cmp.cmp_inner(key, ulargest) == Ordering::Greater
 }
 
 /// key_is_before_file returns true if the given user key is larger than the largest key in f.
 fn key_is_before_file<'a>(cmp: &InternalKeyCmp, key: UserKey<'a>, f: &FileMetaHandle) -> bool {
-    let usmallest = parse_internal_key(&(*f).smallest).2;
+    let f = f.borrow();
+    let usmallest = parse_internal_key(&f.smallest).2;
     !key.is_empty() && cmp.cmp_inner(key, usmallest) == Ordering::Less
 }
 
@@ -388,7 +404,7 @@
     let (mut left, mut right) = (0, files.len());
     while left < right {
         let mid = (left + right) / 2;
-        if cmp.cmp(&(*files[mid]).largest, key) == Ordering::Less {
+        if cmp.cmp(&files[mid].borrow().largest, key) == Ordering::Less {
             left = mid + 1;
         } else {
             right = mid;
@@ -440,10 +456,11 @@
     use mem_env::MemEnv;
     use options::Options;
     use table_builder::TableBuilder;
-    use table_cache::table_name;
+    use table_cache::{table_name, TableCache};
+    use types::share;
 
     fn new_file(num: u64, smallest: &[u8], largest: &[u8]) -> FileMetaHandle {
-        Rc::new(FileMetaData {
+        share(FileMetaData {
             allowed_seeks: 10,
             size: 163840,
             num: num,
@@ -474,10 +491,10 @@
             seq += 1;
         }
 
-        let mut f = new_file(num,
-                             LookupKey::new(contents[0].0, MAX_SEQUENCE_NUMBER).internal_key(),
-                             LookupKey::new(contents[contents.len() - 1].0, 0).internal_key());
-        Rc::get_mut(&mut f).unwrap().size = tbl.finish() as u64;
+        let f = new_file(num,
+                         LookupKey::new(contents[0].0, MAX_SEQUENCE_NUMBER).internal_key(),
+                         LookupKey::new(contents[contents.len() - 1].0, 0).internal_key());
+        f.borrow_mut().size = tbl.finish() as u64;
         f
     }
 
@@ -520,7 +537,7 @@
 
         opts.set_env(Box::new(env));
         let cache = TableCache::new("db", opts, 100);
-        let mut v = Version::new(Rc::new(cache), Rc::new(Box::new(DefaultCmp)));
+        let mut v = Version::new(share(cache), Rc::new(Box::new(DefaultCmp)));
         v.files[0] = vec![t1, t2];
         v.files[1] = vec![t3, t4, t5];
         v.files[2] = vec![t6, t7];
@@ -539,8 +556,8 @@
             let to = LookupKey::new("aae".as_bytes(), 0);
             let r = v.overlapping_inputs(0, from.internal_key(), to.internal_key());
             assert_eq!(r.len(), 2);
-            assert_eq!((*r[0]).num, 1);
-            assert_eq!((*r[1]).num, 2);
+            assert_eq!(r[0].borrow().num, 1);
+            assert_eq!(r[1].borrow().num, 2);
         }
         {
             let from = LookupKey::new("cab".as_bytes(), MAX_SEQUENCE_NUMBER);
@@ -548,7 +565,7 @@
             // expect one file.
             let r = v.overlapping_inputs(1, from.internal_key(), to.internal_key());
             assert_eq!(r.len(), 1);
-            assert_eq!((*r[0]).num, 3);
+            assert_eq!(r[0].borrow().num, 3);
         }
         {
             let from = LookupKey::new("cab".as_bytes(), MAX_SEQUENCE_NUMBER);
@@ -556,9 +573,9 @@
             let r = v.overlapping_inputs(1, from.internal_key(), to.internal_key());
             // Assert that correct number of files and correct files were returned.
             assert_eq!(r.len(), 3);
-            assert_eq!((*r[0]).num, 3);
-            assert_eq!((*r[1]).num, 4);
-            assert_eq!((*r[2]).num, 5);
+            assert_eq!(r[0].borrow().num, 3);
+            assert_eq!(r[1].borrow().num, 4);
+            assert_eq!(r[2].borrow().num, 5);
         }
         {
             let from = LookupKey::new("hhh".as_bytes(), MAX_SEQUENCE_NUMBER);