Mercurial > lbo > hg > sstable
changeset 93:edebaa08a0f0
Merge pull request #3 from thomaskrause/feature/impl-send-sync
Make Table implement Send+Sync
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Mon, 24 Feb 2020 13:16:20 +0100 |
parents | bd30396ecc87 (current diff) b5d670aed9a0 (diff) |
children | 7b43cb6b9666 |
files | |
diffstat | 9 files changed, 59 insertions(+), 43 deletions(-) [+] |
line wrap: on
line diff
--- a/src/block.rs Sat Feb 22 23:21:29 2020 +0100 +++ b/src/block.rs Mon Feb 24 13:16:20 2020 +0100 @@ -1,6 +1,6 @@ use std::cmp::Ordering; -use std::rc::Rc; +use std::sync::Arc; use crate::options::Options; use crate::types::SSIterator; @@ -32,7 +32,7 @@ /// N_RESTARTS contains the number of restarts. #[derive(Clone)] pub struct Block { - block: Rc<BlockContents>, + block: Arc<BlockContents>, opt: Options, } @@ -59,14 +59,14 @@ } } - pub fn contents(&self) -> Rc<BlockContents> { + pub fn contents(&self) -> Arc<BlockContents> { self.block.clone() } pub fn new(opt: Options, contents: BlockContents) -> Block { assert!(contents.len() > 4); Block { - block: Rc::new(contents), + block: Arc::new(contents), opt: opt, } } @@ -76,8 +76,7 @@ /// lifetime, as it uses a refcounted block underneath. pub struct BlockIter { /// The underlying block contents. - /// TODO: Maybe (probably...) this needs an Arc. - block: Rc<BlockContents>, + block: Arc<BlockContents>, opt: Options, /// offset of restarts area within the block. restarts_off: usize,
--- a/src/cache.rs Sat Feb 22 23:21:29 2020 +0100 +++ b/src/cache.rs Mon Feb 24 13:16:20 2020 +0100 @@ -237,6 +237,15 @@ } } +// The compiler does not automatically derive Send and Sync for Cache because it contains +// raw pointers. +// These raw pointers are only pointing to the elements hold in the same cache and insertion +// clones the values. It is therefore safe to implement Send for Cache. +// Since all functions that access these raw pointers are mutable member functions, it is also safe to implement Sync +// (Sync is defined as "if &T is Send-able") +unsafe impl<T: Send> Send for Cache<T> {} +unsafe impl<T: Sync> Sync for Cache<T> {} + #[cfg(test)] mod tests { use super::LRUList;
--- a/src/cmp.rs Sat Feb 22 23:21:29 2020 +0100 +++ b/src/cmp.rs Mon Feb 24 13:16:20 2020 +0100 @@ -2,7 +2,7 @@ /// Comparator trait, supporting types that can be nested (i.e., add additional functionality on /// top of an inner comparator) -pub trait Cmp { +pub trait Cmp: Sync + Send { /// Compare to byte strings, bytewise. fn cmp(&self, _: &[u8], _: &[u8]) -> Ordering;
--- a/src/filter.rs Sat Feb 22 23:21:29 2020 +0100 +++ b/src/filter.rs Mon Feb 24 13:16:20 2020 +0100 @@ -1,11 +1,11 @@ -use std::rc::Rc; +use std::sync::Arc; use integer_encoding::FixedInt; /// Encapsulates a filter algorithm allowing to search for keys more efficiently. /// Usually, policies are used as a BoxedFilterPolicy (see below), so they /// can be easily cloned and nested. -pub trait FilterPolicy { +pub trait FilterPolicy: Send + Sync { /// Returns a string identifying this policy. fn name(&self) -> &'static str; /// Create a filter matching the given keys. Keys are given as a long byte array that is @@ -17,7 +17,7 @@ /// A boxed and refcounted filter policy (reference-counted because a Box with unsized content /// couldn't be cloned otherwise) -pub type BoxedFilterPolicy = Rc<Box<dyn FilterPolicy>>; +pub type BoxedFilterPolicy = Arc<Box<dyn FilterPolicy>>; /// Used for tables that don't have filter blocks but need a type parameter. #[derive(Clone)]
--- a/src/filter_block.rs Sat Feb 22 23:21:29 2020 +0100 +++ b/src/filter_block.rs Mon Feb 24 13:16:20 2020 +0100 @@ -1,7 +1,7 @@ use crate::block::BlockContents; use crate::filter::BoxedFilterPolicy; -use std::rc::Rc; +use std::sync::Arc; use integer_encoding::FixedInt; @@ -108,7 +108,7 @@ #[derive(Clone)] pub struct FilterBlockReader { policy: BoxedFilterPolicy, - block: Rc<BlockContents>, + block: Arc<BlockContents>, offsets_offset: usize, filter_base_lg2: u32, @@ -116,10 +116,10 @@ impl FilterBlockReader { pub fn new_owned(pol: BoxedFilterPolicy, data: Vec<u8>) -> FilterBlockReader { - FilterBlockReader::new(pol, Rc::new(data)) + FilterBlockReader::new(pol, Arc::new(data)) } - pub fn new(pol: BoxedFilterPolicy, data: Rc<Vec<u8>>) -> FilterBlockReader { + pub fn new(pol: BoxedFilterPolicy, data: Arc<Vec<u8>>) -> FilterBlockReader { assert!(data.len() >= 5); let fbase = data[data.len() - 1] as u32; @@ -186,7 +186,7 @@ fn produce_filter_block() -> Vec<u8> { let keys = get_keys(); - let mut bld = FilterBlockBuilder::new(Rc::new(Box::new(BloomPolicy::new(32)))); + let mut bld = FilterBlockBuilder::new(Arc::new(Box::new(BloomPolicy::new(32)))); bld.start_block(0); @@ -223,7 +223,7 @@ #[test] fn test_filter_block_build_read() { let result = produce_filter_block(); - let reader = FilterBlockReader::new_owned(Rc::new(Box::new(BloomPolicy::new(32))), result); + let reader = FilterBlockReader::new_owned(Arc::new(Box::new(BloomPolicy::new(32))), result); assert_eq!( reader.offset_of(get_filter_index(5121, FILTER_BASE_LOG2)),
--- a/src/options.rs Sat Feb 22 23:21:29 2020 +0100 +++ b/src/options.rs Mon Feb 24 13:16:20 2020 +0100 @@ -5,7 +5,7 @@ use crate::types::{share, Shared}; use std::default::Default; -use std::rc::Rc; +use std::sync::Arc; const KB: usize = 1 << 10; const MB: usize = KB * KB; @@ -33,7 +33,7 @@ /// self-explanatory; the defaults are defined in the `Default` implementation. #[derive(Clone)] pub struct Options { - pub cmp: Rc<Box<dyn Cmp>>, + pub cmp: Arc<Box<dyn Cmp>>, pub write_buffer_size: usize, pub block_cache: Shared<Cache<Block>>, pub block_size: usize, @@ -45,14 +45,14 @@ impl Default for Options { fn default() -> Options { Options { - cmp: Rc::new(Box::new(DefaultCmp)), + cmp: Arc::new(Box::new(DefaultCmp)), write_buffer_size: WRITE_BUFFER_SIZE, // 2000 elements by default block_cache: share(Cache::new(BLOCK_CACHE_CAPACITY / BLOCK_MAX_SIZE)), block_size: BLOCK_MAX_SIZE, block_restart_interval: 16, compression_type: CompressionType::CompressionNone, - filter_policy: Rc::new(Box::new(filter::BloomPolicy::new(DEFAULT_BITS_PER_KEY))), + filter_policy: Arc::new(Box::new(filter::BloomPolicy::new(DEFAULT_BITS_PER_KEY))), } } }
--- a/src/table_builder.rs Sat Feb 22 23:21:29 2020 +0100 +++ b/src/table_builder.rs Mon Feb 24 13:16:20 2020 +0100 @@ -9,7 +9,7 @@ use std::cmp::Ordering; use std::io::Write; -use std::rc::Rc; +use std::sync::Arc; use crc::crc32; use crc::Hasher32; @@ -95,7 +95,7 @@ impl<Dst: Write> TableBuilder<Dst> { pub fn new_no_filter(mut opt: Options, dst: Dst) -> TableBuilder<Dst> { - opt.filter_policy = Rc::new(Box::new(NoFilterPolicy::new())); + opt.filter_policy = Arc::new(Box::new(NoFilterPolicy::new())); TableBuilder::new(opt, dst) } }
--- a/src/table_reader.rs Sat Feb 22 23:21:29 2020 +0100 +++ b/src/table_reader.rs Mon Feb 24 13:16:20 2020 +0100 @@ -11,7 +11,7 @@ use std::cmp::Ordering; use std::fs; use std::path; -use std::rc::Rc; +use std::sync::Arc; use integer_encoding::FixedIntWriter; @@ -25,7 +25,7 @@ /// `Table` is used for accessing SSTables. #[derive(Clone)] pub struct Table { - file: Rc<Box<dyn RandomAccess>>, + file: Arc<Box<dyn RandomAccess>>, file_size: usize, cache_id: cache::CacheID, @@ -52,10 +52,13 @@ table_block::read_table_block(opt.clone(), file.as_ref(), &footer.meta_index)?; let filter_block_reader = Table::read_filter_block(&metaindex_block, file.as_ref(), &opt)?; - let cache_id = opt.block_cache.borrow_mut().new_cache_id(); + let cache_id = { + let mut block_cache = opt.block_cache.write()?; + block_cache.new_cache_id() + }; Ok(Table { - file: Rc::new(file), + file: Arc::new(file), file_size: size, cache_id: cache_id, opt: opt, @@ -108,7 +111,8 @@ /// cache. fn read_block(&self, location: &BlockHandle) -> Result<Block> { let cachekey = self.block_cache_handle(location.offset()); - if let Some(block) = self.opt.block_cache.borrow_mut().get(&cachekey) { + let mut block_cache = self.opt.block_cache.write()?; + if let Some(block) = block_cache.get(&cachekey) { return Ok(block.clone()); } @@ -116,11 +120,8 @@ let b = table_block::read_table_block(self.opt.clone(), self.file.as_ref().as_ref(), location)?; - // insert a cheap copy (Rc). - self.opt - .block_cache - .borrow_mut() - .insert(&cachekey, b.clone()); + // insert a cheap copy (Arc). + block_cache.insert(&cachekey, b.clone()); Ok(b) } @@ -345,6 +346,8 @@ use super::*; + const LOCK_POISONED: &str = "Lock poisoned"; + fn build_data() -> Vec<(&'static str, &'static str)> { vec![ // block 1 @@ -417,15 +420,17 @@ let mut iter = table.iter(); // index/metaindex blocks are not cached. That'd be a waste of memory. - assert_eq!(opt.block_cache.borrow().count(), 0); + assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 0); + iter.next(); - assert_eq!(opt.block_cache.borrow().count(), 1); + assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 1); + // This may fail if block parameters or data change. In that case, adapt it. iter.next(); iter.next(); iter.next(); iter.next(); - assert_eq!(opt.block_cache.borrow().count(), 2); + assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 2); } #[test] @@ -613,7 +618,10 @@ assert_eq!(Ok(Some(v)), r); } - assert_eq!(table.opt.block_cache.borrow().count(), 3); + assert_eq!( + table.opt.block_cache.read().expect(LOCK_POISONED).count(), + 3 + ); // test that filters work and don't return anything at all. assert!(table.get(b"aaa").unwrap().is_none());
--- a/src/types.rs Sat Feb 22 23:21:29 2020 +0100 +++ b/src/types.rs Mon Feb 24 13:16:20 2020 +0100 @@ -2,15 +2,15 @@ use crate::error::Result; -use std::cell::RefCell; use std::fs::File; #[cfg(unix)] use std::os::unix::fs::FileExt; #[cfg(windows)] use std::os::windows::fs::FileExt; -use std::rc::Rc; +use std::sync::Arc; +use std::sync::RwLock; -pub trait RandomAccess { +pub trait RandomAccess: Send + Sync { fn read_at(&self, off: usize, dst: &mut [u8]) -> Result<usize>; } @@ -48,11 +48,11 @@ } } -/// A shared thingy with interior mutability. -pub type Shared<T> = Rc<RefCell<T>>; +/// A shared thingy with guarded by a lock. +pub type Shared<T> = Arc<RwLock<T>>; -pub fn share<T>(t: T) -> Rc<RefCell<T>> { - Rc::new(RefCell::new(t)) +pub fn share<T>(t: T) -> Arc<RwLock<T>> { + Arc::new(RwLock::new(t)) } /// An extension of the standard `Iterator` trait that supporting some additional functionality.