changeset 159:18e0fb2d6a4e

Make use of new Options members and Env file types in TableReader/Builder
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 09 Jul 2017 20:34:43 +0200
parents c15ac746e903
children 28698cadc1de
files src/table_builder.rs src/table_reader.rs
diffstat 2 files changed, 51 insertions(+), 91 deletions(-) [+]
line wrap: on
line diff
--- a/src/table_builder.rs	Sun Jul 09 20:33:43 2017 +0200
+++ b/src/table_builder.rs	Sun Jul 09 20:34:43 2017 +0200
@@ -1,7 +1,7 @@
 use block::{BlockBuilder, BlockContents};
 use blockhandle::BlockHandle;
 use cmp::InternalKeyCmp;
-use filter::{BoxedFilterPolicy, NoFilterPolicy};
+use filter::{InternalFilterPolicy, NoFilterPolicy};
 use filter_block::FilterBlockBuilder;
 use key_types::InternalKey;
 use options::{CompressionType, Options};
@@ -91,25 +91,26 @@
 }
 
 impl<'a, Dst: Write> TableBuilder<'a, Dst> {
-    pub fn new_no_filter(opt: Options, dst: Dst) -> TableBuilder<'a, Dst> {
-        TableBuilder::new(opt, dst, NoFilterPolicy::new())
+    pub fn new_no_filter(mut opt: Options, dst: Dst) -> TableBuilder<'a, Dst> {
+        opt.filter_policy = NoFilterPolicy::new();
+        TableBuilder::new(opt, dst)
     }
 }
 
 /// TableBuilder is used for building a new SSTable. It groups entries into blocks,
 /// calculating checksums and bloom filters.
-/// It's recommended that you use InternalFilterPolicy as FilterPol, as that policy extracts the
-/// underlying user keys from the InternalKeys used as keys in the table.
 impl<'a, Dst: Write> TableBuilder<'a, Dst> {
     /// Create a new table builder.
-    /// The comparator in opt will be wrapped in a InternalKeyCmp.
-    pub fn new(mut opt: Options, dst: Dst, fpol: BoxedFilterPolicy) -> TableBuilder<'a, Dst> {
+    /// The comparator in opt will be wrapped in a InternalKeyCmp, and the filter policy
+    /// in an InternalFilterPolicy.
+    pub fn new(mut opt: Options, dst: Dst) -> TableBuilder<'a, Dst> {
         opt.cmp = Arc::new(Box::new(InternalKeyCmp(opt.cmp.clone())));
-        TableBuilder::new_raw(opt, dst, fpol)
+        opt.filter_policy = InternalFilterPolicy::new(opt.filter_policy);
+        TableBuilder::new_raw(opt, dst)
     }
 
     /// Like new(), but doesn't wrap the comparator in an InternalKeyCmp (for testing)
-    pub fn new_raw(opt: Options, dst: Dst, fpol: BoxedFilterPolicy) -> TableBuilder<'a, Dst> {
+    pub fn new_raw(opt: Options, dst: Dst) -> TableBuilder<'a, Dst> {
         TableBuilder {
             opt: opt.clone(),
             dst: dst,
@@ -117,8 +118,8 @@
             prev_block_last_key: vec![],
             num_entries: 0,
             data_block: Some(BlockBuilder::new(opt.clone())),
+            filter_block: Some(FilterBlockBuilder::new(opt.filter_policy.clone())),
             index_block: Some(BlockBuilder::new(opt)),
-            filter_block: Some(FilterBlockBuilder::new(fpol)),
         }
     }
 
@@ -248,7 +249,6 @@
 mod tests {
     use super::{Footer, TableBuilder};
     use blockhandle::BlockHandle;
-    use filter::BloomPolicy;
     use options::Options;
 
     #[test]
@@ -270,7 +270,7 @@
         let mut d = Vec::with_capacity(512);
         let mut opt = Options::default();
         opt.block_restart_interval = 3;
-        let mut b = TableBuilder::new_raw(opt, &mut d, BloomPolicy::new(4));
+        let mut b = TableBuilder::new_raw(opt, &mut d);
 
         let data = vec![("abc", "def"), ("abd", "dee"), ("bcd", "asa"), ("bsr", "a00")];
 
@@ -288,7 +288,7 @@
         let mut d = Vec::with_capacity(512);
         let mut opt = Options::default();
         opt.block_restart_interval = 3;
-        let mut b = TableBuilder::new_raw(opt, &mut d, BloomPolicy::new(4));
+        let mut b = TableBuilder::new_raw(opt, &mut d);
 
         // Test two equal consecutive keys
         let data = vec![("abc", "def"), ("abc", "dee"), ("bcd", "asa"), ("bsr", "a00")];
--- a/src/table_reader.rs	Sun Jul 09 20:33:43 2017 +0200
+++ b/src/table_reader.rs	Sun Jul 09 20:34:43 2017 +0200
@@ -2,9 +2,9 @@
 use blockhandle::BlockHandle;
 use cache;
 use cmp::InternalKeyCmp;
-use env::RandomAccessFile;
+use env::{self, RandomAccessFile};
 use error::{Status, StatusCode, Result};
-use filter::{BoxedFilterPolicy, InternalFilterPolicy};
+use filter;
 use filter_block::FilterBlockReader;
 use key_types::InternalKey;
 use options::{self, CompressionType, Options};
@@ -12,20 +12,19 @@
 use types::LdbIterator;
 
 use std::cmp::Ordering;
-use std::io::{Read, Seek};
 use std::sync::Arc;
 
 use integer_encoding::{FixedInt, FixedIntWriter};
 use crc::crc32::{self, Hasher32};
 
 /// Reads the table footer.
-fn read_footer<F: Read + Seek>(f: &RandomAccessFile<F>, size: usize) -> Result<Footer> {
+fn read_footer(f: &RandomAccessFile, size: usize) -> Result<Footer> {
     let buf = try!(f.read_at(size - table_builder::FULL_FOOTER_LENGTH,
                              table_builder::FULL_FOOTER_LENGTH));
     Ok(Footer::decode(&buf))
 }
 
-fn read_bytes<F: Read + Seek>(f: &RandomAccessFile<F>, location: &BlockHandle) -> Result<Vec<u8>> {
+fn read_bytes(f: &RandomAccessFile, location: &BlockHandle) -> Result<Vec<u8>> {
     f.read_at(location.offset(), location.size())
 }
 
@@ -38,10 +37,10 @@
 
 impl TableBlock {
     /// Reads a block at location.
-    fn read_block<R: Read + Seek>(opt: Options,
-                                  f: &RandomAccessFile<R>,
-                                  location: &BlockHandle)
-                                  -> Result<TableBlock> {
+    fn read_block(opt: Options,
+                  f: &RandomAccessFile,
+                  location: &BlockHandle)
+                  -> Result<TableBlock> {
         // The block is denoted by offset and length in BlockHandle. A block in an encoded
         // table is followed by 1B compression type and 4B checksum.
         let buf = try!(read_bytes(f, location));
@@ -71,8 +70,8 @@
 }
 
 #[derive(Clone)]
-pub struct Table<R: Read + Seek> {
-    file: RandomAccessFile<R>,
+pub struct Table {
+    file: RandomAccessFile,
     file_size: usize,
     cache_id: cache::CacheID,
 
@@ -83,13 +82,9 @@
     filters: Option<FilterBlockReader>,
 }
 
-impl<R: Read + Seek> Table<R> {
+impl Table {
     /// Creates a new table reader operating on unformatted keys (i.e., UserKey).
-    fn new_raw(opt: Options,
-               file: RandomAccessFile<R>,
-               size: usize,
-               fp: BoxedFilterPolicy)
-               -> Result<Table<R>> {
+    fn new_raw(opt: Options, file: RandomAccessFile, size: usize) -> Result<Table> {
         let footer = try!(read_footer(&file, size));
 
         let indexblock = try!(TableBlock::read_block(opt.clone(), &file, &footer.index));
@@ -102,7 +97,7 @@
 
         // Open filter block for reading
         let mut filter_block_reader = None;
-        let filter_name = format!("filter.{}", fp.name()).as_bytes().to_vec();
+        let filter_name = format!("filter.{}", opt.filter_policy.name()).as_bytes().to_vec();
 
         let mut metaindexiter = metaindexblock.block.iter();
 
@@ -113,7 +108,8 @@
 
             if filter_block_location.size() > 0 {
                 let buf = try!(read_bytes(&file, &filter_block_location));
-                filter_block_reader = Some(FilterBlockReader::new_owned(fp, buf));
+                filter_block_reader = Some(FilterBlockReader::new_owned(opt.filter_policy.clone(),
+                                                                        buf));
             }
         }
         metaindexiter.reset();
@@ -133,12 +129,10 @@
     /// Creates a new table reader operating on internal keys (i.e., InternalKey). This means that
     /// a different comparator (internal_key_cmp) and a different filter policy
     /// (InternalFilterPolicy) are used.
-    pub fn new(mut opt: Options, file: R, size: usize, fp: BoxedFilterPolicy) -> Result<Table<R>> {
+    pub fn new(mut opt: Options, file: Box<env::RandomAccess>, size: usize) -> Result<Table> {
         opt.cmp = Arc::new(Box::new(InternalKeyCmp(opt.cmp.clone())));
-        let t = try!(Table::new_raw(opt,
-                                    RandomAccessFile::new(file),
-                                    size,
-                                    InternalFilterPolicy::new(fp)));
+        opt.filter_policy = filter::InternalFilterPolicy::new(opt.filter_policy);
+        let t = try!(Table::new_raw(opt, RandomAccessFile::new(file), size));
         Ok(t)
     }
 
@@ -187,7 +181,7 @@
     }
 
     // Iterators read from the file; thus only one iterator can be borrowed (mutably) per scope
-    fn iter<'a>(&'a mut self) -> TableIterator<'a, R> {
+    fn iter<'a>(&'a mut self) -> TableIterator<'a> {
         let iter = TableIterator {
             current_block: self.indexblock.iter(),
             init: false,
@@ -250,8 +244,8 @@
 
 /// This iterator is a "TwoLevelIterator"; it uses an index block in order to get an offset hint
 /// into the data blocks.
-pub struct TableIterator<'a, R: 'a + Read + Seek> {
-    table: &'a mut Table<R>,
+pub struct TableIterator<'a> {
+    table: &'a mut Table,
     opt: Options,
     // We're not using Option<BlockIter>, but instead a separate `init` field. That makes it easier
     // working with the current block in the iterator methods (no borrowing annoyance as with
@@ -262,7 +256,7 @@
     index_block: BlockIter,
 }
 
-impl<'a, R: Read + Seek> TableIterator<'a, R> {
+impl<'a> TableIterator<'a> {
     // Skips to the entry referenced by the next entry in the index block.
     // This is called once a block has run out of entries.
     // Err means corruption or I/O error; Ok(true) means a new block was loaded; Ok(false) means
@@ -288,7 +282,7 @@
     }
 }
 
-impl<'a, R: Read + Seek> Iterator for TableIterator<'a, R> {
+impl<'a> Iterator for TableIterator<'a> {
     type Item = (Vec<u8>, Vec<u8>);
 
     fn next(&mut self) -> Option<Self::Item> {
@@ -320,7 +314,7 @@
     }
 }
 
-impl<'a, R: Read + Seek> LdbIterator for TableIterator<'a, R> {
+impl<'a> LdbIterator for TableIterator<'a> {
     // A call to valid() after seeking is necessary to ensure that the seek worked (e.g., no error
     // while reading from disk)
     fn seek(&mut self, to: &[u8]) {
@@ -391,7 +385,6 @@
 #[cfg(test)]
 mod tests {
     use filter::BloomPolicy;
-    use filter::InternalFilterPolicy;
     use options::Options;
     use table_builder::TableBuilder;
     use types::LdbIterator;
@@ -423,7 +416,7 @@
 
         {
             // Uses the standard comparator in opt.
-            let mut b = TableBuilder::new_raw(opt, &mut d, BloomPolicy::new(4));
+            let mut b = TableBuilder::new_raw(opt, &mut d);
             let data = build_data();
 
             for &(k, v) in data.iter() {
@@ -445,6 +438,7 @@
         let mut opt = Options::default();
         opt.block_restart_interval = 1;
         opt.block_size = 32;
+        opt.filter_policy = BloomPolicy::new(4);
 
         let mut i = 0 as u64;
         let data: Vec<(Vec<u8>, &'static str)> = build_data()
@@ -457,8 +451,7 @@
 
         {
             // Uses InternalKeyCmp
-            let mut b =
-                TableBuilder::new(opt, &mut d, InternalFilterPolicy::new(BloomPolicy::new(4)));
+            let mut b = TableBuilder::new(opt, &mut d);
 
             for &(ref k, ref v) in data.iter() {
                 b.add(k.as_slice(), v.as_bytes());
@@ -473,8 +466,8 @@
         (d, size)
     }
 
-    fn wrap_buffer<'a>(src: &'a [u8]) -> RandomAccessFile<Cursor<&'a [u8]>> {
-        RandomAccessFile::new(Cursor::new(src))
+    fn wrap_buffer(src: Vec<u8>) -> RandomAccessFile {
+        RandomAccessFile::new(Box::new(Cursor::new(src)))
     }
 
     #[test]
@@ -483,8 +476,7 @@
         let mut opt = Options::default();
         opt.block_size = 32;
 
-        let mut table = Table::new_raw(opt.clone(), wrap_buffer(&src), size, BloomPolicy::new(4))
-            .unwrap();
+        let mut table = Table::new_raw(opt.clone(), wrap_buffer(src), size).unwrap();
         let mut iter = table.iter();
 
         // index/metaindex blocks are not cached. That'd be a waste of memory.
@@ -504,11 +496,7 @@
         let (src, size) = build_table();
         let data = build_data();
 
-        let mut table = Table::new_raw(Options::default(),
-                                       wrap_buffer(&src),
-                                       size,
-                                       BloomPolicy::new(4))
-            .unwrap();
+        let mut table = Table::new_raw(Options::default(), wrap_buffer(src), size).unwrap();
         let mut iter = table.iter();
         let mut i = 0;
 
@@ -540,11 +528,7 @@
     fn test_table_iterator_filter() {
         let (src, size) = build_table();
 
-        let mut table = Table::new_raw(Options::default(),
-                                       wrap_buffer(&src),
-                                       size,
-                                       BloomPolicy::new(4))
-            .unwrap();
+        let mut table = Table::new_raw(Options::default(), wrap_buffer(src), size).unwrap();
         assert!(table.filters.is_some());
         let filter_reader = table.filters.clone().unwrap();
         let mut iter = table.iter();
@@ -564,11 +548,7 @@
     fn test_table_iterator_state_behavior() {
         let (src, size) = build_table();
 
-        let mut table = Table::new_raw(Options::default(),
-                                       wrap_buffer(&src),
-                                       size,
-                                       BloomPolicy::new(4))
-            .unwrap();
+        let mut table = Table::new_raw(Options::default(), wrap_buffer(src), size).unwrap();
         let mut iter = table.iter();
 
         // behavior test
@@ -598,11 +578,7 @@
         let (src, size) = build_table();
         let data = build_data();
 
-        let mut table = Table::new_raw(Options::default(),
-                                       wrap_buffer(&src),
-                                       size,
-                                       BloomPolicy::new(4))
-            .unwrap();
+        let mut table = Table::new_raw(Options::default(), wrap_buffer(src), size).unwrap();
         let mut iter = table.iter();
         let mut i = 0;
 
@@ -635,11 +611,7 @@
     fn test_table_iterator_seek() {
         let (src, size) = build_table();
 
-        let mut table = Table::new_raw(Options::default(),
-                                       wrap_buffer(&src),
-                                       size,
-                                       BloomPolicy::new(4))
-            .unwrap();
+        let mut table = Table::new_raw(Options::default(), wrap_buffer(src), size).unwrap();
         let mut iter = table.iter();
 
         iter.seek("bcd".as_bytes());
@@ -656,11 +628,7 @@
     fn test_table_get() {
         let (src, size) = build_table();
 
-        let mut table = Table::new_raw(Options::default(),
-                                       wrap_buffer(&src),
-                                       size,
-                                       BloomPolicy::new(4))
-            .unwrap();
+        let mut table = Table::new_raw(Options::default(), wrap_buffer(src), size).unwrap();
         let mut table2 = table.clone();
 
         // Test that all of the table's entries are reachable via get()
@@ -690,11 +658,7 @@
 
         let (src, size) = build_internal_table();
 
-        let mut table = Table::new(Options::default(),
-                                   Cursor::new(&src as &[u8]),
-                                   size,
-                                   BloomPolicy::new(4))
-            .unwrap();
+        let mut table = Table::new(Options::default(), Box::new(Cursor::new(src)), size).unwrap();
         let filter_reader = table.filters.clone().unwrap();
 
         // Check that we're actually using internal keys
@@ -725,11 +689,7 @@
 
         src[10] += 1;
 
-        let mut table = Table::new_raw(Options::default(),
-                                       wrap_buffer(&src),
-                                       size,
-                                       BloomPolicy::new(4))
-            .unwrap();
+        let mut table = Table::new_raw(Options::default(), wrap_buffer(src), size).unwrap();
 
         assert!(table.filters.is_some());
         assert_eq!(table.filters.as_ref().unwrap().num(), 1);