Mercurial > lbo > hg > sstable
changeset 3:14484cc26b69
Check checksums when reading blocks
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 19 Nov 2016 14:02:15 +0100 |
parents | 97e5170753af |
children | c0e7fd030c24 |
files | src/block.rs src/key_types.rs src/lib.rs src/options.rs src/table_builder.rs src/table_reader.rs |
diffstat | 6 files changed, 105 insertions(+), 46 deletions(-) [+] |
line wrap: on
line diff
--- a/src/block.rs Sat Nov 19 12:57:43 2016 +0100 +++ b/src/block.rs Sat Nov 19 14:02:15 2016 +0100 @@ -49,6 +49,11 @@ } } + #[allow(dead_code)] + fn contents(&self) -> Rc<BlockContents> { + self.block.clone() + } + pub fn new(contents: BlockContents, cmp: C) -> Block<C> { assert!(contents.len() > 4); Block {
--- a/src/key_types.rs Sat Nov 19 12:57:43 2016 +0100 +++ b/src/key_types.rs Sat Nov 19 14:02:15 2016 +0100 @@ -1,11 +1,1 @@ -// The following typedefs are used to distinguish between the different key formats used internally -// by different modules. -/// A UserKey is the actual key supplied by the calling application, without any internal -/// decorations. -pub type UserKey<'a> = &'a [u8]; - -/// An InternalKey consists of [key, tag], so it's basically a MemtableKey without the initial -/// length specification. This type is used as item type of MemtableIterator, and as the key -/// type of tables. -pub type InternalKey<'a> = &'a [u8];
--- a/src/lib.rs Sat Nov 19 12:57:43 2016 +0100 +++ b/src/lib.rs Sat Nov 19 14:02:15 2016 +0100 @@ -3,10 +3,16 @@ mod block; mod blockhandle; -mod iterator; mod key_types; +pub mod iterator; pub mod options; pub mod table_builder; pub mod table_reader; +pub use iterator::StandardComparator; +pub use iterator::SSIterator; +pub use options::Options; + +pub use table_builder::TableBuilder; +pub use table_reader::{Table, TableIterator};
--- a/src/options.rs Sat Nov 19 12:57:43 2016 +0100 +++ b/src/options.rs Sat Nov 19 14:02:15 2016 +0100 @@ -12,6 +12,7 @@ pub struct Options { pub block_size: usize, pub block_restart_interval: usize, + // Note: Compression is not implemented. pub compression_type: CompressionType, }
--- a/src/table_builder.rs Sat Nov 19 12:57:43 2016 +0100 +++ b/src/table_builder.rs Sat Nov 19 14:02:15 2016 +0100 @@ -1,6 +1,5 @@ use block::{BlockBuilder, BlockContents}; use blockhandle::BlockHandle; -use key_types::InternalKey; use options::{CompressionType, Options}; use iterator::Comparator; @@ -16,10 +15,7 @@ pub const MAGIC_FOOTER_NUMBER: u64 = 0xdb4775248b80fb57; pub const MAGIC_FOOTER_ENCODED: [u8; 8] = [0x57, 0xfb, 0x80, 0x8b, 0x24, 0x75, 0x47, 0xdb]; -fn find_shortest_sep<'a, C: Comparator>(c: &C, - lo: InternalKey<'a>, - hi: InternalKey<'a>) - -> Vec<u8> { +fn find_shortest_sep<C: Comparator>(c: &C, lo: &[u8], hi: &[u8]) -> Vec<u8> { let min; if lo.len() < hi.len() { @@ -120,7 +116,9 @@ self.num_entries } - pub fn add<'a>(&mut self, key: InternalKey<'a>, val: &[u8]) { + /// Add an entry to this table. The key must be lexicographically greater than the last entry + /// written. + pub fn add(&mut self, key: &[u8], val: &[u8]) { assert!(self.data_block.is_some()); assert!(self.num_entries == 0 || self.cmp.cmp(&self.prev_block_last_key, key) == Ordering::Less); @@ -137,7 +135,7 @@ /// Writes an index entry for the current data_block where `next_key` is the first key of the /// next block. - fn write_data_block<'b>(&mut self, next_key: InternalKey<'b>) { + fn write_data_block(&mut self, next_key: &[u8]) { assert!(self.data_block.is_some()); let block = self.data_block.take().unwrap(); @@ -156,6 +154,7 @@ self.write_block(contents, ctype); } + /// Writes a block to disk, with a trailing 4 byte CRC checksum. fn write_block(&mut self, c: BlockContents, t: CompressionType) -> BlockHandle { // compression is still unimplemented assert_eq!(t, CompressionType::CompressionNone);
--- a/src/table_reader.rs Sat Nov 19 12:57:43 2016 +0100 +++ b/src/table_reader.rs Sat Nov 19 14:02:15 2016 +0100 @@ -1,10 +1,14 @@ -use block::{Block, BlockIter}; +use block::{Block, BlockContents, BlockIter}; use blockhandle::BlockHandle; use table_builder::{self, Footer}; use iterator::{Comparator, SSIterator}; -use std::io::{Read, Seek, SeekFrom, Result}; +use integer_encoding::FixedInt; +use crc::crc32; +use crc::Hasher32; + use std::cmp::Ordering; +use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Result}; /// Reads the table footer. fn read_footer<R: Read + Seek>(f: &mut R, size: usize) -> Result<Footer> { @@ -26,12 +30,9 @@ } /// Reads a block at location. -fn read_block<R: Read + Seek, C: Comparator>(cmp: &C, - f: &mut R, - location: &BlockHandle) - -> Result<Block<C>> { +fn read_block<R: Read + Seek>(f: &mut R, location: &BlockHandle) -> Result<BlockContents> { let buf = try!(read_bytes(f, location)); - Ok(Block::new(buf, *cmp)) + Ok(buf) } pub struct Table<R: Read + Seek, C: Comparator> { @@ -47,7 +48,7 @@ pub fn new(mut file: R, size: usize, cmp: C) -> Result<Table<R, C>> { let footer = try!(read_footer(&mut file, size)); - let indexblock = try!(read_block(&cmp, &mut file, &footer.index)); + let indexblock = Block::new(try!(read_block(&mut file, &footer.index)), cmp); Ok(Table { file: file, @@ -57,8 +58,8 @@ }) } - fn read_block_(&mut self, location: &BlockHandle) -> Result<Block<C>> { - read_block(&self.cmp, &mut self.file, location) + fn read_block_(&mut self, location: &BlockHandle) -> Result<BlockContents> { + read_block(&mut self.file, location) } /// Returns the offset of the block that contains `key`. @@ -77,13 +78,12 @@ // Iterators read from the file; thus only one iterator can be borrowed (mutably) per scope fn iter<'a>(&'a mut self) -> TableIterator<'a, R, C> { - let mut iter = TableIterator { + let iter = TableIterator { current_block: self.indexblock.iter(), // just for filling in here index_block: self.indexblock.iter(), table: self, init: false, }; - iter.skip_to_next_entry(); iter } @@ -112,24 +112,51 @@ } impl<'a, C: Comparator, R: Read + Seek> TableIterator<'a, R, C> { - // Skips to the entry referenced by the next entry in the index block. - // This is called once a block has run out of entries. - fn skip_to_next_entry(&mut self) -> bool { + /// Skips to the entry referenced by the next entry in the index block. + /// This is called once a block has run out of entries. + /// Returns Ok(false) if the end has been reached, returns Err(...) if it should be retried. + fn skip_to_next_entry(&mut self) -> Result<bool> { if let Some((_key, val)) = self.index_block.next() { - self.load_block(&val).is_ok() + let r = self.load_block(&val); + + if let Err(e) = r { Err(e) } else { Ok(true) } } else { - false + Ok(false) } } - // Load the block at `handle` into `self.current_block` + /// Verifies the CRC checksum of a block. + fn verify_block(&self, block: &BlockContents) -> bool { + let payload = &block[0..block.len() - 4]; + let checksum = &block[block.len() - 4..]; + let checksum = u32::decode_fixed(checksum); + + let mut digest = crc32::Digest::new(crc32::CASTAGNOLI); + digest.write(payload); + + digest.sum32() == checksum + } + + /// Load the block at `handle` into `self.current_block` fn load_block(&mut self, handle: &[u8]) -> Result<()> { + const TABLE_BLOCK_FOOTER_SIZE: usize = 5; let (new_block_handle, _) = BlockHandle::decode(handle); - let block = try!(self.table.read_block_(&new_block_handle)); - self.current_block = block.iter(); + // Also read checksum and compression! (5B) + let full_block_handle = BlockHandle::new(new_block_handle.offset(), + new_block_handle.size() + TABLE_BLOCK_FOOTER_SIZE); + let mut full_block = try!(self.table.read_block_(&full_block_handle)); - Ok(()) + if !self.verify_block(&full_block) { + Err(Error::new(ErrorKind::InvalidData, "Bad block checksum!".to_string())) + } else { + // Truncate by 5, so the checksum and compression type are gone + full_block.resize(new_block_handle.size(), 0); + let block = Block::new(full_block, self.table.cmp); + self.current_block = block.iter(); + + Ok(()) + } } } @@ -137,14 +164,23 @@ type Item = (Vec<u8>, Vec<u8>); fn next(&mut self) -> Option<Self::Item> { - self.init = true; + if !self.init { + return match self.skip_to_next_entry() { + Ok(true) => { + self.init = true; + self.next() + } + Ok(false) => None, + Err(_) => self.next(), + }; + } if let Some((key, val)) = self.current_block.next() { Some((key, val)) } else { - if self.skip_to_next_entry() { - self.next() - } else { - None + match self.skip_to_next_entry() { + Ok(true) => self.next(), + Ok(false) => None, + Err(_) => self.next(), } } } @@ -204,7 +240,9 @@ fn reset(&mut self) { self.index_block.reset(); self.init = false; - self.skip_to_next_entry(); + + while let Err(_) = self.skip_to_next_entry() { + } } // This iterator is special in that it's valid even before the first call to next(). It behaves @@ -243,7 +281,7 @@ let mut d = Vec::with_capacity(512); let mut opt = Options::default(); opt.block_restart_interval = 2; - opt.block_size = 64; + opt.block_size = 32; { let mut b = TableBuilder::new(opt, StandardComparator, &mut d); @@ -278,6 +316,26 @@ } #[test] + fn test_table_data_corruption() { + let (mut src, size) = build_table(); + + // Mess with first block + src[28] += 1; + + let mut table = Table::new(Cursor::new(&src as &[u8]), size, StandardComparator).unwrap(); + let mut iter = table.iter(); + + // defective blocks are skipped, i.e. we should start with the second block + + assert!(iter.next().is_some()); + assert_eq!(iter.current(), + Some(("bsr".as_bytes().to_vec(), "a00".as_bytes().to_vec()))); + assert!(iter.next().is_some()); + assert_eq!(iter.current(), + Some(("xyz".as_bytes().to_vec(), "xxx".as_bytes().to_vec()))); + } + + #[test] fn test_table_get() { let (src, size) = build_table();