changeset 93:edebaa08a0f0

Merge pull request #3 from thomaskrause/feature/impl-send-sync Make Table implement Send+Sync
author Lewin Bormann <lbo@spheniscida.de>
date Mon, 24 Feb 2020 13:16:20 +0100
parents bd30396ecc87 (current diff) b5d670aed9a0 (diff)
children 7b43cb6b9666
files
diffstat 9 files changed, 59 insertions(+), 43 deletions(-) [+]
line wrap: on
line diff
--- a/src/block.rs	Sat Feb 22 23:21:29 2020 +0100
+++ b/src/block.rs	Mon Feb 24 13:16:20 2020 +0100
@@ -1,6 +1,6 @@
 use std::cmp::Ordering;
 
-use std::rc::Rc;
+use std::sync::Arc;
 
 use crate::options::Options;
 use crate::types::SSIterator;
@@ -32,7 +32,7 @@
 /// N_RESTARTS contains the number of restarts.
 #[derive(Clone)]
 pub struct Block {
-    block: Rc<BlockContents>,
+    block: Arc<BlockContents>,
     opt: Options,
 }
 
@@ -59,14 +59,14 @@
         }
     }
 
-    pub fn contents(&self) -> Rc<BlockContents> {
+    pub fn contents(&self) -> Arc<BlockContents> {
         self.block.clone()
     }
 
     pub fn new(opt: Options, contents: BlockContents) -> Block {
         assert!(contents.len() > 4);
         Block {
-            block: Rc::new(contents),
+            block: Arc::new(contents),
             opt: opt,
         }
     }
@@ -76,8 +76,7 @@
 /// lifetime, as it uses a refcounted block underneath.
 pub struct BlockIter {
     /// The underlying block contents.
-    /// TODO: Maybe (probably...) this needs an Arc.
-    block: Rc<BlockContents>,
+    block: Arc<BlockContents>,
     opt: Options,
     /// offset of restarts area within the block.
     restarts_off: usize,
--- a/src/cache.rs	Sat Feb 22 23:21:29 2020 +0100
+++ b/src/cache.rs	Mon Feb 24 13:16:20 2020 +0100
@@ -237,6 +237,15 @@
     }
 }
 
+// The compiler does not automatically derive Send and Sync for Cache because it contains
+// raw pointers.
+// These raw pointers are only pointing to the elements hold in the same cache and insertion
+// clones the values. It is therefore safe to implement Send for Cache.
+// Since all functions that access these raw pointers are mutable member functions, it is also safe to implement Sync
+// (Sync is defined as "if &T is Send-able")
+unsafe impl<T: Send> Send for Cache<T> {}
+unsafe impl<T: Sync> Sync for Cache<T> {}
+
 #[cfg(test)]
 mod tests {
     use super::LRUList;
--- a/src/cmp.rs	Sat Feb 22 23:21:29 2020 +0100
+++ b/src/cmp.rs	Mon Feb 24 13:16:20 2020 +0100
@@ -2,7 +2,7 @@
 
 /// Comparator trait, supporting types that can be nested (i.e., add additional functionality on
 /// top of an inner comparator)
-pub trait Cmp {
+pub trait Cmp: Sync + Send {
     /// Compare to byte strings, bytewise.
     fn cmp(&self, _: &[u8], _: &[u8]) -> Ordering;
 
--- a/src/filter.rs	Sat Feb 22 23:21:29 2020 +0100
+++ b/src/filter.rs	Mon Feb 24 13:16:20 2020 +0100
@@ -1,11 +1,11 @@
-use std::rc::Rc;
+use std::sync::Arc;
 
 use integer_encoding::FixedInt;
 
 /// Encapsulates a filter algorithm allowing to search for keys more efficiently.
 /// Usually, policies are used as a BoxedFilterPolicy (see below), so they
 /// can be easily cloned and nested.
-pub trait FilterPolicy {
+pub trait FilterPolicy: Send + Sync {
     /// Returns a string identifying this policy.
     fn name(&self) -> &'static str;
     /// Create a filter matching the given keys. Keys are given as a long byte array that is
@@ -17,7 +17,7 @@
 
 /// A boxed and refcounted filter policy (reference-counted because a Box with unsized content
 /// couldn't be cloned otherwise)
-pub type BoxedFilterPolicy = Rc<Box<dyn FilterPolicy>>;
+pub type BoxedFilterPolicy = Arc<Box<dyn FilterPolicy>>;
 
 /// Used for tables that don't have filter blocks but need a type parameter.
 #[derive(Clone)]
--- a/src/filter_block.rs	Sat Feb 22 23:21:29 2020 +0100
+++ b/src/filter_block.rs	Mon Feb 24 13:16:20 2020 +0100
@@ -1,7 +1,7 @@
 use crate::block::BlockContents;
 use crate::filter::BoxedFilterPolicy;
 
-use std::rc::Rc;
+use std::sync::Arc;
 
 use integer_encoding::FixedInt;
 
@@ -108,7 +108,7 @@
 #[derive(Clone)]
 pub struct FilterBlockReader {
     policy: BoxedFilterPolicy,
-    block: Rc<BlockContents>,
+    block: Arc<BlockContents>,
 
     offsets_offset: usize,
     filter_base_lg2: u32,
@@ -116,10 +116,10 @@
 
 impl FilterBlockReader {
     pub fn new_owned(pol: BoxedFilterPolicy, data: Vec<u8>) -> FilterBlockReader {
-        FilterBlockReader::new(pol, Rc::new(data))
+        FilterBlockReader::new(pol, Arc::new(data))
     }
 
-    pub fn new(pol: BoxedFilterPolicy, data: Rc<Vec<u8>>) -> FilterBlockReader {
+    pub fn new(pol: BoxedFilterPolicy, data: Arc<Vec<u8>>) -> FilterBlockReader {
         assert!(data.len() >= 5);
 
         let fbase = data[data.len() - 1] as u32;
@@ -186,7 +186,7 @@
 
     fn produce_filter_block() -> Vec<u8> {
         let keys = get_keys();
-        let mut bld = FilterBlockBuilder::new(Rc::new(Box::new(BloomPolicy::new(32))));
+        let mut bld = FilterBlockBuilder::new(Arc::new(Box::new(BloomPolicy::new(32))));
 
         bld.start_block(0);
 
@@ -223,7 +223,7 @@
     #[test]
     fn test_filter_block_build_read() {
         let result = produce_filter_block();
-        let reader = FilterBlockReader::new_owned(Rc::new(Box::new(BloomPolicy::new(32))), result);
+        let reader = FilterBlockReader::new_owned(Arc::new(Box::new(BloomPolicy::new(32))), result);
 
         assert_eq!(
             reader.offset_of(get_filter_index(5121, FILTER_BASE_LOG2)),
--- a/src/options.rs	Sat Feb 22 23:21:29 2020 +0100
+++ b/src/options.rs	Mon Feb 24 13:16:20 2020 +0100
@@ -5,7 +5,7 @@
 use crate::types::{share, Shared};
 
 use std::default::Default;
-use std::rc::Rc;
+use std::sync::Arc;
 
 const KB: usize = 1 << 10;
 const MB: usize = KB * KB;
@@ -33,7 +33,7 @@
 /// self-explanatory; the defaults are defined in the `Default` implementation.
 #[derive(Clone)]
 pub struct Options {
-    pub cmp: Rc<Box<dyn Cmp>>,
+    pub cmp: Arc<Box<dyn Cmp>>,
     pub write_buffer_size: usize,
     pub block_cache: Shared<Cache<Block>>,
     pub block_size: usize,
@@ -45,14 +45,14 @@
 impl Default for Options {
     fn default() -> Options {
         Options {
-            cmp: Rc::new(Box::new(DefaultCmp)),
+            cmp: Arc::new(Box::new(DefaultCmp)),
             write_buffer_size: WRITE_BUFFER_SIZE,
             // 2000 elements by default
             block_cache: share(Cache::new(BLOCK_CACHE_CAPACITY / BLOCK_MAX_SIZE)),
             block_size: BLOCK_MAX_SIZE,
             block_restart_interval: 16,
             compression_type: CompressionType::CompressionNone,
-            filter_policy: Rc::new(Box::new(filter::BloomPolicy::new(DEFAULT_BITS_PER_KEY))),
+            filter_policy: Arc::new(Box::new(filter::BloomPolicy::new(DEFAULT_BITS_PER_KEY))),
         }
     }
 }
--- a/src/table_builder.rs	Sat Feb 22 23:21:29 2020 +0100
+++ b/src/table_builder.rs	Mon Feb 24 13:16:20 2020 +0100
@@ -9,7 +9,7 @@
 
 use std::cmp::Ordering;
 use std::io::Write;
-use std::rc::Rc;
+use std::sync::Arc;
 
 use crc::crc32;
 use crc::Hasher32;
@@ -95,7 +95,7 @@
 
 impl<Dst: Write> TableBuilder<Dst> {
     pub fn new_no_filter(mut opt: Options, dst: Dst) -> TableBuilder<Dst> {
-        opt.filter_policy = Rc::new(Box::new(NoFilterPolicy::new()));
+        opt.filter_policy = Arc::new(Box::new(NoFilterPolicy::new()));
         TableBuilder::new(opt, dst)
     }
 }
--- a/src/table_reader.rs	Sat Feb 22 23:21:29 2020 +0100
+++ b/src/table_reader.rs	Mon Feb 24 13:16:20 2020 +0100
@@ -11,7 +11,7 @@
 use std::cmp::Ordering;
 use std::fs;
 use std::path;
-use std::rc::Rc;
+use std::sync::Arc;
 
 use integer_encoding::FixedIntWriter;
 
@@ -25,7 +25,7 @@
 /// `Table` is used for accessing SSTables.
 #[derive(Clone)]
 pub struct Table {
-    file: Rc<Box<dyn RandomAccess>>,
+    file: Arc<Box<dyn RandomAccess>>,
     file_size: usize,
     cache_id: cache::CacheID,
 
@@ -52,10 +52,13 @@
             table_block::read_table_block(opt.clone(), file.as_ref(), &footer.meta_index)?;
 
         let filter_block_reader = Table::read_filter_block(&metaindex_block, file.as_ref(), &opt)?;
-        let cache_id = opt.block_cache.borrow_mut().new_cache_id();
+        let cache_id = {
+            let mut block_cache = opt.block_cache.write()?;
+            block_cache.new_cache_id()
+        };
 
         Ok(Table {
-            file: Rc::new(file),
+            file: Arc::new(file),
             file_size: size,
             cache_id: cache_id,
             opt: opt,
@@ -108,7 +111,8 @@
     /// cache.
     fn read_block(&self, location: &BlockHandle) -> Result<Block> {
         let cachekey = self.block_cache_handle(location.offset());
-        if let Some(block) = self.opt.block_cache.borrow_mut().get(&cachekey) {
+        let mut block_cache = self.opt.block_cache.write()?;
+        if let Some(block) = block_cache.get(&cachekey) {
             return Ok(block.clone());
         }
 
@@ -116,11 +120,8 @@
         let b =
             table_block::read_table_block(self.opt.clone(), self.file.as_ref().as_ref(), location)?;
 
-        // insert a cheap copy (Rc).
-        self.opt
-            .block_cache
-            .borrow_mut()
-            .insert(&cachekey, b.clone());
+        // insert a cheap copy (Arc).
+        block_cache.insert(&cachekey, b.clone());
 
         Ok(b)
     }
@@ -345,6 +346,8 @@
 
     use super::*;
 
+    const LOCK_POISONED: &str = "Lock poisoned";
+
     fn build_data() -> Vec<(&'static str, &'static str)> {
         vec![
             // block 1
@@ -417,15 +420,17 @@
         let mut iter = table.iter();
 
         // index/metaindex blocks are not cached. That'd be a waste of memory.
-        assert_eq!(opt.block_cache.borrow().count(), 0);
+        assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 0);
+
         iter.next();
-        assert_eq!(opt.block_cache.borrow().count(), 1);
+        assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 1);
+
         // This may fail if block parameters or data change. In that case, adapt it.
         iter.next();
         iter.next();
         iter.next();
         iter.next();
-        assert_eq!(opt.block_cache.borrow().count(), 2);
+        assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 2);
     }
 
     #[test]
@@ -613,7 +618,10 @@
             assert_eq!(Ok(Some(v)), r);
         }
 
-        assert_eq!(table.opt.block_cache.borrow().count(), 3);
+        assert_eq!(
+            table.opt.block_cache.read().expect(LOCK_POISONED).count(),
+            3
+        );
 
         // test that filters work and don't return anything at all.
         assert!(table.get(b"aaa").unwrap().is_none());
--- a/src/types.rs	Sat Feb 22 23:21:29 2020 +0100
+++ b/src/types.rs	Mon Feb 24 13:16:20 2020 +0100
@@ -2,15 +2,15 @@
 
 use crate::error::Result;
 
-use std::cell::RefCell;
 use std::fs::File;
 #[cfg(unix)]
 use std::os::unix::fs::FileExt;
 #[cfg(windows)]
 use std::os::windows::fs::FileExt;
-use std::rc::Rc;
+use std::sync::Arc;
+use std::sync::RwLock;
 
-pub trait RandomAccess {
+pub trait RandomAccess: Send + Sync {
     fn read_at(&self, off: usize, dst: &mut [u8]) -> Result<usize>;
 }
 
@@ -48,11 +48,11 @@
     }
 }
 
-/// A shared thingy with interior mutability.
-pub type Shared<T> = Rc<RefCell<T>>;
+/// A shared thingy with guarded by a lock.
+pub type Shared<T> = Arc<RwLock<T>>;
 
-pub fn share<T>(t: T) -> Rc<RefCell<T>> {
-    Rc::new(RefCell::new(t))
+pub fn share<T>(t: T) -> Arc<RwLock<T>> {
+    Arc::new(RwLock::new(t))
 }
 
 /// An extension of the standard `Iterator` trait that supporting some additional functionality.