Mercurial > lbo > hg > leveldb-rs
changeset 211:eade0c7f1ac7
version: use new Shared<> type.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Wed, 06 Sep 2017 20:11:07 +0200 |
parents | 6e1e7c828b69 |
children | 2d3f7bcaef6e |
files | src/version.rs |
diffstat | 1 files changed, 78 insertions(+), 61 deletions(-) [+] |
line wrap: on
line diff
--- a/src/version.rs Wed Sep 06 20:09:25 2017 +0200 +++ b/src/version.rs Wed Sep 06 20:11:07 2017 +0200 @@ -1,17 +1,19 @@ use cmp::{Cmp, InternalKeyCmp}; use error::Result; use key_types::{parse_internal_key, InternalKey, LookupKey, UserKey}; -use table_cache::TableCache; +use table_cache::SharedTableCache; use table_reader::TableIterator; -use types::{MAX_SEQUENCE_NUMBER, FileMetaData, LdbIterator}; -use version_set::{self, NUM_LEVELS}; +use types::{MAX_SEQUENCE_NUMBER, FileMetaData, LdbIterator, Shared}; +use version_set::NUM_LEVELS; use std::cmp::Ordering; use std::default::Default; use std::rc::Rc; -/// FileMetaHandle is a reference-counted FileMetaData object. -type FileMetaHandle = Rc<FileMetaData>; +/// FileMetaHandle is a reference-counted FileMetaData object with interior mutability. This is +/// necessary to provide a shared metadata container that can be modified while referenced by e.g. +/// multiple versions. +type FileMetaHandle = Shared<FileMetaData>; /// Contains statistics about seeks occurred in a file. struct GetStats { @@ -20,7 +22,7 @@ } struct Version { - table_cache: Rc<TableCache>, + table_cache: SharedTableCache, user_cmp: Rc<Box<Cmp>>, files: [Vec<FileMetaHandle>; NUM_LEVELS], @@ -29,7 +31,7 @@ } impl Version { - fn new(cache: Rc<TableCache>, ucmp: Rc<Box<Cmp>>) -> Version { + fn new(cache: SharedTableCache, ucmp: Rc<Box<Cmp>>) -> Version { Version { table_cache: cache, user_cmp: ucmp, @@ -58,19 +60,21 @@ if level == 0 { to_search.reserve(files.len()); - for f in files { - let (fsmallest, flargest) = (parse_internal_key(&(*f).smallest).2, - parse_internal_key(&(*f).largest).2); + for f_ in files { + let f = f_.borrow(); + let (fsmallest, flargest) = (parse_internal_key(&f.smallest).2, + parse_internal_key(&f.largest).2); if self.user_cmp.cmp(ukey, fsmallest) >= Ordering::Equal && self.user_cmp.cmp(ukey, flargest) <= Ordering::Equal { - to_search.push(f.clone()); + to_search.push(f_.clone()); } } - to_search.sort_by(|a, b| a.num.cmp(&b.num)); + to_search.sort_by(|a, b| a.borrow().num.cmp(&b.borrow().num)); } else { let ix = find_file(&icmp, files, ikey); if ix < files.len() { - let fsmallest = parse_internal_key(&(*files[ix]).smallest).2; + let f = files[ix].borrow(); + let fsmallest = parse_internal_key(&f.smallest).2; if self.user_cmp.cmp(ukey, fsmallest) >= Ordering::Equal { to_search.push(files[ix].clone()); } @@ -91,7 +95,7 @@ last_read_level = level; last_read = Some(f.clone()); - let val = Rc::get_mut(&mut self.table_cache).unwrap().get((*f).num, ikey)?; + let val = self.table_cache.borrow_mut().get(f.borrow().num, ikey)?; return Ok(val.map(|v| (v, stats))); } } @@ -121,7 +125,7 @@ last_read_level = level; last_read = Some(f.clone()); - let val = Rc::get_mut(&mut self.table_cache).unwrap().get((*f).num, ikey)?; + let val = self.table_cache.borrow_mut().get(f.borrow().num, ikey)?; return Ok(val.map(|v| (v, stats))); } } @@ -136,22 +140,24 @@ let files = &self.files[0]; levels[0].reserve(files.len()); - for f in files { - let (fsmallest, flargest) = (parse_internal_key(&(*f).smallest).2, - parse_internal_key(&(*f).largest).2); + for f_ in files { + let f = f_.borrow(); + let (fsmallest, flargest) = (parse_internal_key(&f.smallest).2, + parse_internal_key(&f.largest).2); if self.user_cmp.cmp(ukey, fsmallest) >= Ordering::Equal && self.user_cmp.cmp(ukey, flargest) <= Ordering::Equal { - levels[0].push(f.clone()); + levels[0].push(f_.clone()); } } - levels[0].sort_by(|a, b| a.num.cmp(&b.num)); + levels[0].sort_by(|a, b| a.borrow().num.cmp(&b.borrow().num)); let icmp = InternalKeyCmp(self.user_cmp.clone()); for level in 1..NUM_LEVELS { let files = &self.files[level]; let ix = find_file(&icmp, files, ikey); if ix < files.len() { - let fsmallest = parse_internal_key(&(*files[ix]).smallest).2; + let f = files[ix].borrow(); + let fsmallest = parse_internal_key(&f.smallest).2; if self.user_cmp.cmp(ukey, fsmallest) >= Ordering::Equal { levels[level].push(files[ix].clone()); } @@ -192,12 +198,11 @@ } fn update_stats(&mut self, stats: GetStats) -> bool { - if let Some(mut file) = stats.file { + if let Some(file) = stats.file { { - let mut f = Rc::get_mut(&mut file).unwrap(); - f.allowed_seeks -= 1; + file.borrow_mut().allowed_seeks -= 1; } - if (*file).allowed_seeks < 1 && self.file_to_compact.is_some() { + if file.borrow().allowed_seeks < 1 && self.file_to_compact.is_none() { self.file_to_compact = Some(file.clone()); self.file_to_compact_lvl = stats.level; return true; @@ -208,7 +213,11 @@ /// overlap_in_level returns true if the specified level's files overlap the range [smallest; /// largest]. - fn overlap_in_level(&self, level: usize, smallest: &[u8], largest: &[u8]) -> bool { + fn overlap_in_level<'a, 'b>(&self, + level: usize, + smallest: UserKey<'a>, + largest: UserKey<'a>) + -> bool { assert!(level < NUM_LEVELS); if level == 0 { some_file_overlaps_range_disjoint(&InternalKeyCmp(self.user_cmp.clone()), @@ -238,19 +247,24 @@ (Some((newubegin, newuend)), _) => { ubegin = newubegin; uend = newuend; - }, - (None, result) => return result + } + (None, result) => return result, } } // the actual search happens in this inner function. This is done to enhance the control // flow. It takes the smallest and largest user keys and returns a new pair of user keys if // the search range should be expanded, or a list of overlapping files. - fn do_search(myself: &Version, level: usize, ubegin: Vec<u8>, uend: Vec<u8>) -> (Option<(Vec<u8>, Vec<u8>)>, Vec<FileMetaHandle>) { + fn do_search(myself: &Version, + level: usize, + ubegin: Vec<u8>, + uend: Vec<u8>) + -> (Option<(Vec<u8>, Vec<u8>)>, Vec<FileMetaHandle>) { let mut inputs = vec![]; - for f in myself.files[level].iter() { - let ((_, _, fsmallest), (_, _, flargest)) = (parse_internal_key(&(*f).smallest), - parse_internal_key(&(*f).largest)); + for f_ in myself.files[level].iter() { + let f = f_.borrow(); + let ((_, _, fsmallest), (_, _, flargest)) = (parse_internal_key(&f.smallest), + parse_internal_key(&f.largest)); // Skip files that are not overlapping. if !ubegin.is_empty() && myself.user_cmp.cmp(flargest, &ubegin) == Ordering::Less { continue; @@ -258,17 +272,17 @@ myself.user_cmp.cmp(fsmallest, &uend) == Ordering::Greater { continue; } else { - inputs.push(f.clone()); + inputs.push(f_.clone()); // In level 0, files may overlap each other. Check if the new file begins // before ubegin or ends after uend, and expand the range, if so. Then, restart // the search. if level == 0 { if !ubegin.is_empty() && myself.user_cmp.cmp(fsmallest, &ubegin) == Ordering::Less { - return (Some((fsmallest.to_vec(), uend)), inputs) + return (Some((fsmallest.to_vec(), uend)), inputs); } else if !uend.is_empty() && myself.user_cmp.cmp(flargest, &uend) == Ordering::Greater { - return (Some((ubegin, flargest.to_vec())), inputs) + return (Some((ubegin, flargest.to_vec())), inputs); } } } @@ -294,7 +308,7 @@ // NOTE: Maybe we need to change this to Rc to support modification of the file set after // creation of the iterator. Versions should be immutable, though. files: Vec<FileMetaHandle>, - cache: Rc<TableCache>, + cache: SharedTableCache, cmp: InternalKeyCmp, current: Option<TableIterator>, @@ -310,15 +324,15 @@ return false; } } - if let Ok(tbl) = Rc::get_mut(&mut self.cache) - .unwrap() - .get_table((*self.files[self.current_ix]).num) { - self.current_ix += 1; + if let Ok(tbl) = self.cache + .borrow_mut() + .get_table(self.files[self.current_ix].borrow().num) { self.current = Some(tbl.iter()); - self.advance() } else { - false + return false; } + self.current_ix += 1; + self.advance() } fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool { if let Some(ref t) = self.current { @@ -330,7 +344,7 @@ fn seek(&mut self, key: &[u8]) { let ix = find_file(&self.cmp, &self.files, key); assert!(ix < self.files.len()); - if let Ok(tbl) = Rc::get_mut(&mut self.cache).unwrap().get_table((*self.files[ix]).num) { + if let Ok(tbl) = self.cache.borrow_mut().get_table(self.files[ix].borrow().num) { let mut iter = tbl.iter(); iter.seek(key); if iter.valid() { @@ -355,9 +369,9 @@ } else if self.current_ix > 0 { let f = &self.files[self.current_ix - 1]; // Find previous table, seek to last entry. - if let Ok(tbl) = Rc::get_mut(&mut self.cache).unwrap().get_table((*f).num) { + if let Ok(tbl) = self.cache.borrow_mut().get_table(f.borrow().num) { let mut iter = tbl.iter(); - iter.seek(&(*f).largest); + iter.seek(&f.borrow().largest); // The saved largest key must be in the table. assert!(iter.valid()); self.current_ix -= 1; @@ -372,13 +386,15 @@ /// key_is_after_file returns true if the given user key is larger than the largest key in f. fn key_is_after_file<'a>(cmp: &InternalKeyCmp, key: UserKey<'a>, f: &FileMetaHandle) -> bool { - let ulargest = parse_internal_key(&(*f).largest).2; + let f = f.borrow(); + let ulargest = parse_internal_key(&f.largest).2; !key.is_empty() && cmp.cmp_inner(key, ulargest) == Ordering::Greater } /// key_is_before_file returns true if the given user key is larger than the largest key in f. fn key_is_before_file<'a>(cmp: &InternalKeyCmp, key: UserKey<'a>, f: &FileMetaHandle) -> bool { - let usmallest = parse_internal_key(&(*f).smallest).2; + let f = f.borrow(); + let usmallest = parse_internal_key(&f.smallest).2; !key.is_empty() && cmp.cmp_inner(key, usmallest) == Ordering::Less } @@ -388,7 +404,7 @@ let (mut left, mut right) = (0, files.len()); while left < right { let mid = (left + right) / 2; - if cmp.cmp(&(*files[mid]).largest, key) == Ordering::Less { + if cmp.cmp(&files[mid].borrow().largest, key) == Ordering::Less { left = mid + 1; } else { right = mid; @@ -440,10 +456,11 @@ use mem_env::MemEnv; use options::Options; use table_builder::TableBuilder; - use table_cache::table_name; + use table_cache::{table_name, TableCache}; + use types::share; fn new_file(num: u64, smallest: &[u8], largest: &[u8]) -> FileMetaHandle { - Rc::new(FileMetaData { + share(FileMetaData { allowed_seeks: 10, size: 163840, num: num, @@ -474,10 +491,10 @@ seq += 1; } - let mut f = new_file(num, - LookupKey::new(contents[0].0, MAX_SEQUENCE_NUMBER).internal_key(), - LookupKey::new(contents[contents.len() - 1].0, 0).internal_key()); - Rc::get_mut(&mut f).unwrap().size = tbl.finish() as u64; + let f = new_file(num, + LookupKey::new(contents[0].0, MAX_SEQUENCE_NUMBER).internal_key(), + LookupKey::new(contents[contents.len() - 1].0, 0).internal_key()); + f.borrow_mut().size = tbl.finish() as u64; f } @@ -520,7 +537,7 @@ opts.set_env(Box::new(env)); let cache = TableCache::new("db", opts, 100); - let mut v = Version::new(Rc::new(cache), Rc::new(Box::new(DefaultCmp))); + let mut v = Version::new(share(cache), Rc::new(Box::new(DefaultCmp))); v.files[0] = vec![t1, t2]; v.files[1] = vec![t3, t4, t5]; v.files[2] = vec![t6, t7]; @@ -539,8 +556,8 @@ let to = LookupKey::new("aae".as_bytes(), 0); let r = v.overlapping_inputs(0, from.internal_key(), to.internal_key()); assert_eq!(r.len(), 2); - assert_eq!((*r[0]).num, 1); - assert_eq!((*r[1]).num, 2); + assert_eq!(r[0].borrow().num, 1); + assert_eq!(r[1].borrow().num, 2); } { let from = LookupKey::new("cab".as_bytes(), MAX_SEQUENCE_NUMBER); @@ -548,7 +565,7 @@ // expect one file. let r = v.overlapping_inputs(1, from.internal_key(), to.internal_key()); assert_eq!(r.len(), 1); - assert_eq!((*r[0]).num, 3); + assert_eq!(r[0].borrow().num, 3); } { let from = LookupKey::new("cab".as_bytes(), MAX_SEQUENCE_NUMBER); @@ -556,9 +573,9 @@ let r = v.overlapping_inputs(1, from.internal_key(), to.internal_key()); // Assert that correct number of files and correct files were returned. assert_eq!(r.len(), 3); - assert_eq!((*r[0]).num, 3); - assert_eq!((*r[1]).num, 4); - assert_eq!((*r[2]).num, 5); + assert_eq!(r[0].borrow().num, 3); + assert_eq!(r[1].borrow().num, 4); + assert_eq!(r[2].borrow().num, 5); } { let from = LookupKey::new("hhh".as_bytes(), MAX_SEQUENCE_NUMBER);