Mercurial > lbo > hg > leveldb-rs
changeset 426:8514c86969b7
table_block: Move file I/O and low-level block logic into dedicated module.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 21 Oct 2017 14:06:57 +0200 |
parents | 8b6ecfd60c85 |
children | 9b3e2c834aac |
files | src/lib.rs src/options.rs src/table_block.rs src/table_reader.rs |
diffstat | 4 files changed, 115 insertions(+), 89 deletions(-) [+] |
line wrap: on
line diff
--- a/src/lib.rs Fri Oct 20 20:26:54 2017 +0200 +++ b/src/lib.rs Sat Oct 21 14:06:57 2017 +0200 @@ -22,7 +22,6 @@ //! #![allow(dead_code)] -#![allow(unused_imports)] extern crate crc; extern crate integer_encoding; @@ -55,6 +54,7 @@ mod options; mod skipmap; mod snapshot; +mod table_block; mod table_builder; mod table_cache; mod table_reader;
--- a/src/options.rs Fri Oct 20 20:26:54 2017 +0200 +++ b/src/options.rs Sat Oct 21 14:06:57 2017 +0200 @@ -1,3 +1,5 @@ + +use block::Block; use cache::Cache; use cmp::{Cmp, DefaultCmp}; use disk_env; @@ -5,7 +7,6 @@ use filter; use infolog::{self, Logger}; use mem_env::MemEnv; -use table_reader::TableBlock; use types::{share, SequenceNumber, Shared}; use std::default::Default; @@ -50,7 +51,7 @@ pub write_buffer_size: usize, pub max_open_files: usize, pub max_file_size: usize, - pub block_cache: Shared<Cache<TableBlock>>, + pub block_cache: Shared<Cache<Block>>, pub block_size: usize, pub block_restart_interval: usize, pub compression_type: CompressionType,
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/table_block.rs Sat Oct 21 14:06:57 2017 +0200 @@ -0,0 +1,75 @@ +use block::Block; +use blockhandle::BlockHandle; +use env::RandomAccess; +use error::{err, StatusCode, Result}; +use filter; +use filter_block::FilterBlockReader; +use log::unmask_crc; +use options::{self, CompressionType, Options}; +use table_builder; + +use crc::crc32::{self, Hasher32}; +use integer_encoding::FixedInt; +use snap::Decoder; + +/// Reads the data for the specified block handle from a file. +fn read_bytes(f: &RandomAccess, location: &BlockHandle) -> Result<Vec<u8>> { + let mut buf = vec![0; location.size()]; + f.read_at(location.offset(), &mut buf).map(|_| buf) +} + +/// Reads a serialized filter block from a file and returns a FilterBlockReader. +pub fn read_filter_block(src: &RandomAccess, + location: &BlockHandle, + policy: filter::BoxedFilterPolicy) + -> Result<FilterBlockReader> { + if location.size() == 0 { + return err(StatusCode::InvalidArgument, + "no filter block in empty location"); + } + let buf = read_bytes(src, location)?; + Ok(FilterBlockReader::new_owned(policy, buf)) +} + +/// Reads a table block from a random-access source. +/// A table block consists of [bytes..., compress (1B), checksum (4B)]; the handle only refers to +/// the location and length of [bytes...]. +pub fn read_table_block(opt: Options, f: &RandomAccess, location: &BlockHandle) -> Result<Block> { + // The block is denoted by offset and length in BlockHandle. A block in an encoded + // table is followed by 1B compression type and 4B checksum. + // The checksum refers to the compressed contents. + let buf = try!(read_bytes(f, location)); + let compress = try!(read_bytes(f, + &BlockHandle::new(location.offset() + location.size(), + table_builder::TABLE_BLOCK_COMPRESS_LEN))); + let cksum = try!(read_bytes(f, + &BlockHandle::new(location.offset() + location.size() + + table_builder::TABLE_BLOCK_COMPRESS_LEN, + table_builder::TABLE_BLOCK_CKSUM_LEN))); + + if !verify_table_block(&buf, compress[0], unmask_crc(u32::decode_fixed(&cksum))) { + return err(StatusCode::Corruption, + &format!("checksum verification failed for block at {}", + location.offset())); + } + + if let Some(ctype) = options::int_to_compressiontype(compress[0] as u32) { + match ctype { + CompressionType::CompressionNone => Ok(Block::new(opt, buf)), + CompressionType::CompressionSnappy => { + let decoded = Decoder::new().decompress_vec(&buf)?; + Ok(Block::new(opt, decoded)) + } + } + } else { + err(StatusCode::InvalidData, "invalid compression type") + } +} + +/// Verify checksum of block +fn verify_table_block(data: &[u8], compression: u8, want: u32) -> bool { + let mut digest = crc32::Digest::new(crc32::CASTAGNOLI); + digest.write(data); + digest.write(&[compression; 1]); + digest.sum32() == want +}
--- a/src/table_reader.rs Fri Oct 20 20:26:54 2017 +0200 +++ b/src/table_reader.rs Sat Oct 21 14:06:57 2017 +0200 @@ -3,12 +3,13 @@ use cache; use cmp::{Cmp, InternalKeyCmp}; use env::RandomAccess; -use error::{err, StatusCode, Result}; +use error::Result; use filter; use filter_block::FilterBlockReader; use key_types::InternalKey; use log::unmask_crc; use options::{self, CompressionType, Options}; +use table_block; use table_builder::{self, Footer}; use types::{current_key_val, LdbIterator}; @@ -17,7 +18,6 @@ use crc::crc32::{self, Hasher32}; use integer_encoding::{FixedInt, FixedIntWriter}; -use snap::{decompress_len, Decoder}; /// Reads the table footer. fn read_footer(f: &RandomAccess, size: usize) -> Result<Footer> { @@ -26,63 +26,6 @@ Ok(Footer::decode(&buf)) } -fn read_bytes(f: &RandomAccess, location: &BlockHandle) -> Result<Vec<u8>> { - let mut buf = vec![0; location.size()]; - f.read_at(location.offset(), &mut buf).map(|_| buf) -} - -/// A TableBlock consists of [bytes..., compress (1B), checksum (4B)]. -#[derive(Clone)] -pub struct TableBlock { - block: Block, -} - -impl TableBlock { - /// Reads a block at location. - fn read_block(opt: Options, f: &RandomAccess, location: &BlockHandle) -> Result<TableBlock> { - // The block is denoted by offset and length in BlockHandle. A block in an encoded - // table is followed by 1B compression type and 4B checksum. - // The checksum refers to the compressed contents. - let buf = try!(read_bytes(f, location)); - let compress = try!(read_bytes(f, - &BlockHandle::new(location.offset() + location.size(), - table_builder::TABLE_BLOCK_COMPRESS_LEN))); - let cksum = try!(read_bytes(f, - &BlockHandle::new(location.offset() + location.size() + - table_builder::TABLE_BLOCK_COMPRESS_LEN, - table_builder::TABLE_BLOCK_CKSUM_LEN))); - - let mut decomp_buf = Vec::new(); - if let Some(ctype) = options::int_to_compressiontype(compress[0] as u32) { - if !TableBlock::verify(&buf, ctype as u8, unmask_crc(u32::decode_fixed(&cksum))) { - return err(StatusCode::Corruption, - &format!("checksum verification failed for block at {}", - location.offset())); - } - - if ctype == CompressionType::CompressionNone { - decomp_buf = buf; - } else if ctype == CompressionType::CompressionSnappy { - let mut decoder = Decoder::new(); - decomp_buf = decoder.decompress_vec(&buf)?; - } - } else { - return err(StatusCode::InvalidData, "invalid compression type"); - } - - assert!(!decomp_buf.is_empty()); - Ok(TableBlock { block: Block::new(opt, decomp_buf) }) - } - - /// Verify checksum of block - fn verify(data: &[u8], compression: u8, want: u32) -> bool { - let mut digest = crc32::Digest::new(crc32::CASTAGNOLI); - digest.write(data); - digest.write(&[compression; 1]); - digest.sum32() == want - } -} - #[derive(Clone)] pub struct Table { file: Rc<Box<RandomAccess>>, @@ -101,28 +44,13 @@ fn new_raw(opt: Options, file: Rc<Box<RandomAccess>>, size: usize) -> Result<Table> { let footer = try!(read_footer(file.as_ref().as_ref(), size)); let indexblock = - try!(TableBlock::read_block(opt.clone(), file.as_ref().as_ref(), &footer.index)); - let metaindexblock = - try!(TableBlock::read_block(opt.clone(), file.as_ref().as_ref(), &footer.meta_index)); - - // Open filter block for reading - let mut filter_block_reader = None; - let filter_name = format!("filter.{}", opt.filter_policy.name()).as_bytes().to_vec(); - - let mut metaindexiter = metaindexblock.block.iter(); + try!(table_block::read_table_block(opt.clone(), file.as_ref().as_ref(), &footer.index)); + let metaindexblock = try!(table_block::read_table_block(opt.clone(), + file.as_ref().as_ref(), + &footer.meta_index)); - metaindexiter.seek(&filter_name); - - if let Some((_key, val)) = current_key_val(&metaindexiter) { - let filter_block_location = BlockHandle::decode(&val).0; - - if filter_block_location.size() > 0 { - let buf = try!(read_bytes(file.as_ref().as_ref(), &filter_block_location)); - filter_block_reader = Some(FilterBlockReader::new_owned(opt.filter_policy.clone(), - buf)); - } - } - metaindexiter.reset(); + let filter_block_reader = + Table::read_filter_block(&metaindexblock, file.as_ref().as_ref(), &opt)?; let cache_id = opt.block_cache.borrow_mut().new_cache_id(); Ok(Table { @@ -132,10 +60,31 @@ opt: opt, footer: footer, filters: filter_block_reader, - indexblock: indexblock.block, + indexblock: indexblock, }) } + fn read_filter_block(metaix: &Block, + file: &RandomAccess, + options: &Options) + -> Result<Option<FilterBlockReader>> { + // Open filter block for reading + let filter_name = format!("filter.{}", options.filter_policy.name()).as_bytes().to_vec(); + + let mut metaindexiter = metaix.iter(); + metaindexiter.seek(&filter_name); + + if let Some((_key, val)) = current_key_val(&metaindexiter) { + let filter_block_location = BlockHandle::decode(&val).0; + if filter_block_location.size() > 0 { + return Ok(Some(table_block::read_filter_block(file, + &filter_block_location, + options.filter_policy.clone())?)); + } + } + Ok(None) + } + /// Creates a new table reader operating on internal keys (i.e., InternalKey). This means that /// a different comparator (internal_key_cmp) and a different filter policy /// (InternalFilterPolicy) are used. @@ -156,15 +105,16 @@ /// Read a block from the current table at `location`, and cache it in the options' block /// cache. - fn read_block(&self, location: &BlockHandle) -> Result<TableBlock> { + fn read_block(&self, location: &BlockHandle) -> Result<Block> { let cachekey = self.block_cache_handle(location.offset()); if let Some(block) = self.opt.block_cache.borrow_mut().get(&cachekey) { return Ok(block.clone()); } // Two times as_ref(): First time to get a ref from Rc<>, then one from Box<>. - let b = - try!(TableBlock::read_block(self.opt.clone(), self.file.as_ref().as_ref(), location)); + let b = try!(table_block::read_table_block(self.opt.clone(), + self.file.as_ref().as_ref(), + location)); // insert a cheap copy (Rc). self.opt.block_cache.borrow_mut().insert(&cachekey, b.clone()); @@ -234,7 +184,7 @@ // Read block (potentially from cache) let tb = self.read_block(&handle)?; - let mut iter = tb.block.iter(); + let mut iter = tb.iter(); // Go to entry and check if it's the wanted entry. iter.seek(key); @@ -280,7 +230,7 @@ let (new_block_handle, _) = BlockHandle::decode(handle); let block = self.table.read_block(&new_block_handle)?; - self.current_block = Some(block.block.iter()); + self.current_block = Some(block.iter()); self.current_block_off = new_block_handle.offset(); Ok(())