changeset 426:8514c86969b7

table_block: Move file I/O and low-level block logic into dedicated module.
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 21 Oct 2017 14:06:57 +0200
parents 8b6ecfd60c85
children 9b3e2c834aac
files src/lib.rs src/options.rs src/table_block.rs src/table_reader.rs
diffstat 4 files changed, 115 insertions(+), 89 deletions(-) [+]
line wrap: on
line diff
--- a/src/lib.rs	Fri Oct 20 20:26:54 2017 +0200
+++ b/src/lib.rs	Sat Oct 21 14:06:57 2017 +0200
@@ -22,7 +22,6 @@
 //!
 
 #![allow(dead_code)]
-#![allow(unused_imports)]
 
 extern crate crc;
 extern crate integer_encoding;
@@ -55,6 +54,7 @@
 mod options;
 mod skipmap;
 mod snapshot;
+mod table_block;
 mod table_builder;
 mod table_cache;
 mod table_reader;
--- a/src/options.rs	Fri Oct 20 20:26:54 2017 +0200
+++ b/src/options.rs	Sat Oct 21 14:06:57 2017 +0200
@@ -1,3 +1,5 @@
+
+use block::Block;
 use cache::Cache;
 use cmp::{Cmp, DefaultCmp};
 use disk_env;
@@ -5,7 +7,6 @@
 use filter;
 use infolog::{self, Logger};
 use mem_env::MemEnv;
-use table_reader::TableBlock;
 use types::{share, SequenceNumber, Shared};
 
 use std::default::Default;
@@ -50,7 +51,7 @@
     pub write_buffer_size: usize,
     pub max_open_files: usize,
     pub max_file_size: usize,
-    pub block_cache: Shared<Cache<TableBlock>>,
+    pub block_cache: Shared<Cache<Block>>,
     pub block_size: usize,
     pub block_restart_interval: usize,
     pub compression_type: CompressionType,
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/table_block.rs	Sat Oct 21 14:06:57 2017 +0200
@@ -0,0 +1,75 @@
+use block::Block;
+use blockhandle::BlockHandle;
+use env::RandomAccess;
+use error::{err, StatusCode, Result};
+use filter;
+use filter_block::FilterBlockReader;
+use log::unmask_crc;
+use options::{self, CompressionType, Options};
+use table_builder;
+
+use crc::crc32::{self, Hasher32};
+use integer_encoding::FixedInt;
+use snap::Decoder;
+
+/// Reads the data for the specified block handle from a file.
+fn read_bytes(f: &RandomAccess, location: &BlockHandle) -> Result<Vec<u8>> {
+    let mut buf = vec![0; location.size()];
+    f.read_at(location.offset(), &mut buf).map(|_| buf)
+}
+
+/// Reads a serialized filter block from a file and returns a FilterBlockReader.
+pub fn read_filter_block(src: &RandomAccess,
+                         location: &BlockHandle,
+                         policy: filter::BoxedFilterPolicy)
+                         -> Result<FilterBlockReader> {
+    if location.size() == 0 {
+        return err(StatusCode::InvalidArgument,
+                   "no filter block in empty location");
+    }
+    let buf = read_bytes(src, location)?;
+    Ok(FilterBlockReader::new_owned(policy, buf))
+}
+
+/// Reads a table block from a random-access source.
+/// A table block consists of [bytes..., compress (1B), checksum (4B)]; the handle only refers to
+/// the location and length of [bytes...].
+pub fn read_table_block(opt: Options, f: &RandomAccess, location: &BlockHandle) -> Result<Block> {
+    // The block is denoted by offset and length in BlockHandle. A block in an encoded
+    // table is followed by 1B compression type and 4B checksum.
+    // The checksum refers to the compressed contents.
+    let buf = try!(read_bytes(f, location));
+    let compress = try!(read_bytes(f,
+                                   &BlockHandle::new(location.offset() + location.size(),
+                                                     table_builder::TABLE_BLOCK_COMPRESS_LEN)));
+    let cksum = try!(read_bytes(f,
+                                &BlockHandle::new(location.offset() + location.size() +
+                                                  table_builder::TABLE_BLOCK_COMPRESS_LEN,
+                                                  table_builder::TABLE_BLOCK_CKSUM_LEN)));
+
+    if !verify_table_block(&buf, compress[0], unmask_crc(u32::decode_fixed(&cksum))) {
+        return err(StatusCode::Corruption,
+                   &format!("checksum verification failed for block at {}",
+                            location.offset()));
+    }
+
+    if let Some(ctype) = options::int_to_compressiontype(compress[0] as u32) {
+        match ctype {
+            CompressionType::CompressionNone => Ok(Block::new(opt, buf)),
+            CompressionType::CompressionSnappy => {
+                let decoded = Decoder::new().decompress_vec(&buf)?;
+                Ok(Block::new(opt, decoded))
+            }
+        }
+    } else {
+        err(StatusCode::InvalidData, "invalid compression type")
+    }
+}
+
+/// Verify checksum of block
+fn verify_table_block(data: &[u8], compression: u8, want: u32) -> bool {
+    let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
+    digest.write(data);
+    digest.write(&[compression; 1]);
+    digest.sum32() == want
+}
--- a/src/table_reader.rs	Fri Oct 20 20:26:54 2017 +0200
+++ b/src/table_reader.rs	Sat Oct 21 14:06:57 2017 +0200
@@ -3,12 +3,13 @@
 use cache;
 use cmp::{Cmp, InternalKeyCmp};
 use env::RandomAccess;
-use error::{err, StatusCode, Result};
+use error::Result;
 use filter;
 use filter_block::FilterBlockReader;
 use key_types::InternalKey;
 use log::unmask_crc;
 use options::{self, CompressionType, Options};
+use table_block;
 use table_builder::{self, Footer};
 use types::{current_key_val, LdbIterator};
 
@@ -17,7 +18,6 @@
 
 use crc::crc32::{self, Hasher32};
 use integer_encoding::{FixedInt, FixedIntWriter};
-use snap::{decompress_len, Decoder};
 
 /// Reads the table footer.
 fn read_footer(f: &RandomAccess, size: usize) -> Result<Footer> {
@@ -26,63 +26,6 @@
     Ok(Footer::decode(&buf))
 }
 
-fn read_bytes(f: &RandomAccess, location: &BlockHandle) -> Result<Vec<u8>> {
-    let mut buf = vec![0; location.size()];
-    f.read_at(location.offset(), &mut buf).map(|_| buf)
-}
-
-/// A TableBlock consists of [bytes..., compress (1B), checksum (4B)].
-#[derive(Clone)]
-pub struct TableBlock {
-    block: Block,
-}
-
-impl TableBlock {
-    /// Reads a block at location.
-    fn read_block(opt: Options, f: &RandomAccess, location: &BlockHandle) -> Result<TableBlock> {
-        // The block is denoted by offset and length in BlockHandle. A block in an encoded
-        // table is followed by 1B compression type and 4B checksum.
-        // The checksum refers to the compressed contents.
-        let buf = try!(read_bytes(f, location));
-        let compress = try!(read_bytes(f,
-                                       &BlockHandle::new(location.offset() + location.size(),
-                                                         table_builder::TABLE_BLOCK_COMPRESS_LEN)));
-        let cksum = try!(read_bytes(f,
-                                    &BlockHandle::new(location.offset() + location.size() +
-                                                      table_builder::TABLE_BLOCK_COMPRESS_LEN,
-                                                      table_builder::TABLE_BLOCK_CKSUM_LEN)));
-
-        let mut decomp_buf = Vec::new();
-        if let Some(ctype) = options::int_to_compressiontype(compress[0] as u32) {
-            if !TableBlock::verify(&buf, ctype as u8, unmask_crc(u32::decode_fixed(&cksum))) {
-                return err(StatusCode::Corruption,
-                           &format!("checksum verification failed for block at {}",
-                                    location.offset()));
-            }
-
-            if ctype == CompressionType::CompressionNone {
-                decomp_buf = buf;
-            } else if ctype == CompressionType::CompressionSnappy {
-                let mut decoder = Decoder::new();
-                decomp_buf = decoder.decompress_vec(&buf)?;
-            }
-        } else {
-            return err(StatusCode::InvalidData, "invalid compression type");
-        }
-
-        assert!(!decomp_buf.is_empty());
-        Ok(TableBlock { block: Block::new(opt, decomp_buf) })
-    }
-
-    /// Verify checksum of block
-    fn verify(data: &[u8], compression: u8, want: u32) -> bool {
-        let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
-        digest.write(data);
-        digest.write(&[compression; 1]);
-        digest.sum32() == want
-    }
-}
-
 #[derive(Clone)]
 pub struct Table {
     file: Rc<Box<RandomAccess>>,
@@ -101,28 +44,13 @@
     fn new_raw(opt: Options, file: Rc<Box<RandomAccess>>, size: usize) -> Result<Table> {
         let footer = try!(read_footer(file.as_ref().as_ref(), size));
         let indexblock =
-            try!(TableBlock::read_block(opt.clone(), file.as_ref().as_ref(), &footer.index));
-        let metaindexblock =
-            try!(TableBlock::read_block(opt.clone(), file.as_ref().as_ref(), &footer.meta_index));
-
-        // Open filter block for reading
-        let mut filter_block_reader = None;
-        let filter_name = format!("filter.{}", opt.filter_policy.name()).as_bytes().to_vec();
-
-        let mut metaindexiter = metaindexblock.block.iter();
+            try!(table_block::read_table_block(opt.clone(), file.as_ref().as_ref(), &footer.index));
+        let metaindexblock = try!(table_block::read_table_block(opt.clone(),
+                                                                file.as_ref().as_ref(),
+                                                                &footer.meta_index));
 
-        metaindexiter.seek(&filter_name);
-
-        if let Some((_key, val)) = current_key_val(&metaindexiter) {
-            let filter_block_location = BlockHandle::decode(&val).0;
-
-            if filter_block_location.size() > 0 {
-                let buf = try!(read_bytes(file.as_ref().as_ref(), &filter_block_location));
-                filter_block_reader = Some(FilterBlockReader::new_owned(opt.filter_policy.clone(),
-                                                                        buf));
-            }
-        }
-        metaindexiter.reset();
+        let filter_block_reader =
+            Table::read_filter_block(&metaindexblock, file.as_ref().as_ref(), &opt)?;
         let cache_id = opt.block_cache.borrow_mut().new_cache_id();
 
         Ok(Table {
@@ -132,10 +60,31 @@
             opt: opt,
             footer: footer,
             filters: filter_block_reader,
-            indexblock: indexblock.block,
+            indexblock: indexblock,
         })
     }
 
+    fn read_filter_block(metaix: &Block,
+                         file: &RandomAccess,
+                         options: &Options)
+                         -> Result<Option<FilterBlockReader>> {
+        // Open filter block for reading
+        let filter_name = format!("filter.{}", options.filter_policy.name()).as_bytes().to_vec();
+
+        let mut metaindexiter = metaix.iter();
+        metaindexiter.seek(&filter_name);
+
+        if let Some((_key, val)) = current_key_val(&metaindexiter) {
+            let filter_block_location = BlockHandle::decode(&val).0;
+            if filter_block_location.size() > 0 {
+                return Ok(Some(table_block::read_filter_block(file,
+                                                              &filter_block_location,
+                                                              options.filter_policy.clone())?));
+            }
+        }
+        Ok(None)
+    }
+
     /// Creates a new table reader operating on internal keys (i.e., InternalKey). This means that
     /// a different comparator (internal_key_cmp) and a different filter policy
     /// (InternalFilterPolicy) are used.
@@ -156,15 +105,16 @@
 
     /// Read a block from the current table at `location`, and cache it in the options' block
     /// cache.
-    fn read_block(&self, location: &BlockHandle) -> Result<TableBlock> {
+    fn read_block(&self, location: &BlockHandle) -> Result<Block> {
         let cachekey = self.block_cache_handle(location.offset());
         if let Some(block) = self.opt.block_cache.borrow_mut().get(&cachekey) {
             return Ok(block.clone());
         }
 
         // Two times as_ref(): First time to get a ref from Rc<>, then one from Box<>.
-        let b =
-            try!(TableBlock::read_block(self.opt.clone(), self.file.as_ref().as_ref(), location));
+        let b = try!(table_block::read_table_block(self.opt.clone(),
+                                                   self.file.as_ref().as_ref(),
+                                                   location));
 
         // insert a cheap copy (Rc).
         self.opt.block_cache.borrow_mut().insert(&cachekey, b.clone());
@@ -234,7 +184,7 @@
 
         // Read block (potentially from cache)
         let tb = self.read_block(&handle)?;
-        let mut iter = tb.block.iter();
+        let mut iter = tb.iter();
 
         // Go to entry and check if it's the wanted entry.
         iter.seek(key);
@@ -280,7 +230,7 @@
         let (new_block_handle, _) = BlockHandle::decode(handle);
         let block = self.table.read_block(&new_block_handle)?;
 
-        self.current_block = Some(block.block.iter());
+        self.current_block = Some(block.iter());
         self.current_block_off = new_block_handle.offset();
 
         Ok(())