Mercurial > lbo > hg > sstable
changeset 24:cdfc2e935c8a
Rebase checksum verification on (internal) leveldb work
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 24 Dec 2016 14:51:24 +0000 |
parents | e1bf0485bc96 |
children | aa3136fdfc73 |
files | src/block.rs src/options.rs src/table_builder.rs src/table_reader.rs |
diffstat | 4 files changed, 64 insertions(+), 38 deletions(-) [+] |
line wrap: on
line diff
--- a/src/block.rs Mon Jan 02 12:53:29 2017 +0100 +++ b/src/block.rs Sat Dec 24 14:51:24 2016 +0000 @@ -49,8 +49,7 @@ } } - #[allow(dead_code)] - fn contents(&self) -> Rc<BlockContents> { + pub fn contents(&self) -> Rc<BlockContents> { self.block.clone() }
--- a/src/options.rs Mon Jan 02 12:53:29 2017 +0100 +++ b/src/options.rs Sat Dec 24 14:51:24 2016 +0000 @@ -6,6 +6,14 @@ CompressionSnappy = 1, } +pub fn int_to_compressiontype(i: u32) -> Option<CompressionType> { + match i { + 0 => Some(CompressionType::CompressionNone), + 1 => Some(CompressionType::CompressionSnappy), + _ => None + } +} + /// [not all member types implemented yet] /// #[derive(Clone, Copy)]
--- a/src/table_builder.rs Mon Jan 02 12:53:29 2017 +0100 +++ b/src/table_builder.rs Sat Dec 24 14:51:24 2016 +0000 @@ -17,6 +17,9 @@ pub const MAGIC_FOOTER_NUMBER: u64 = 0xdb4775248b80fb57; pub const MAGIC_FOOTER_ENCODED: [u8; 8] = [0x57, 0xfb, 0x80, 0x8b, 0x24, 0x75, 0x47, 0xdb]; +pub const TABLE_BLOCK_COMPRESS_LEN: usize = 1; +pub const TABLE_BLOCK_CKSUM_LEN: usize = 4; + fn find_shortest_sep<C: Comparator>(c: &C, lo: &[u8], hi: &[u8]) -> Vec<u8> { let min;
--- a/src/table_reader.rs Mon Jan 02 12:53:29 2017 +0100 +++ b/src/table_reader.rs Sat Dec 24 14:51:24 2016 +0000 @@ -1,15 +1,15 @@ -use block::{Block, BlockContents, BlockIter}; +use block::{Block, BlockIter}; use blockhandle::BlockHandle; use table_builder::{self, Footer}; use iterator::{Comparator, StandardComparator, SSIterator}; -use options::ReadOptions; +use options::{self, ReadOptions, CompressionType}; use integer_encoding::FixedInt; use crc::crc32; use crc::Hasher32; use std::cmp::Ordering; -use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Result}; +use std::io::{self, Read, Seek, SeekFrom, Result}; use std::fs::{File, OpenOptions}; use std::path::Path; @@ -33,9 +33,43 @@ } /// Reads a block at location. -fn read_block<R: Read + Seek>(f: &mut R, location: &BlockHandle) -> Result<BlockContents> { +fn read_block<R: Read + Seek, C: Comparator>(cmp: &C, + f: &mut R, + location: &BlockHandle) + -> Result<TableBlock<C>> { + // The block is denoted by offset and length in BlockHandle. A block in an encoded + // table is followed by 1B compression type and 4B checksum. let buf = try!(read_bytes(f, location)); - Ok(buf) + let compress = try!(read_bytes(f, + &BlockHandle::new(location.offset() + location.size(), + table_builder::TABLE_BLOCK_COMPRESS_LEN))); + let cksum = try!(read_bytes(f, + &BlockHandle::new(location.offset() + location.size() + + table_builder::TABLE_BLOCK_COMPRESS_LEN, + table_builder::TABLE_BLOCK_CKSUM_LEN))); + Ok(TableBlock { + block: Block::new(buf, *cmp), + checksum: u32::decode_fixed(&cksum), + compression: options::int_to_compressiontype(compress[0] as u32) + .unwrap_or(CompressionType::CompressionNone), + }) +} + +struct TableBlock<C: Comparator> { + block: Block<C>, + checksum: u32, + compression: CompressionType, +} + +impl<C: Comparator> TableBlock<C> { + /// Verify checksum of block + fn verify(&self) -> bool { + let mut digest = crc32::Digest::new(crc32::CASTAGNOLI); + digest.write(&self.block.contents()); + digest.write(&[self.compression as u8; 1]); + + digest.sum32() == self.checksum + } } pub struct Table<R: Read + Seek, C: Comparator> { @@ -71,19 +105,25 @@ pub fn new(mut file: R, size: usize, opt: ReadOptions, cmp: C) -> Result<Table<R, C>> { let footer = try!(read_footer(&mut file, size)); - let indexblock = Block::new(try!(read_block(&mut file, &footer.index)), cmp); + let indexblock = try!(read_block(&cmp, &mut file, &footer.index)); Ok(Table { file: file, file_size: size, opt: opt, cmp: cmp, - indexblock: indexblock, + indexblock: indexblock.block, }) } - fn read_block_(&mut self, location: &BlockHandle) -> Result<BlockContents> { - read_block(&mut self.file, location) + fn read_block(&mut self, location: &BlockHandle) -> Result<TableBlock<C>> { + let b = try!(read_block(&self.cmp, &mut self.file, location)); + + if !b.verify() && self.opt.skip_bad_blocks { + Err(io::Error::new(io::ErrorKind::InvalidData, "Data block failed verification")) + } else { + Ok(b) + } } /// Returns the offset of the block that contains `key`. @@ -153,38 +193,14 @@ } } - /// Verifies the CRC checksum of a block. - fn verify_block(&self, block: &BlockContents) -> bool { - let payload = &block[0..block.len() - 4]; - let checksum = &block[block.len() - 4..]; - let checksum = u32::decode_fixed(checksum); - - let mut digest = crc32::Digest::new(crc32::CASTAGNOLI); - digest.write(payload); - - digest.sum32() == checksum - } - /// Load the block at `handle` into `self.current_block` fn load_block(&mut self, handle: &[u8]) -> Result<()> { - const TABLE_BLOCK_FOOTER_SIZE: usize = 5; let (new_block_handle, _) = BlockHandle::decode(handle); - // Also read checksum and compression! (5B) - let full_block_handle = BlockHandle::new(new_block_handle.offset(), - new_block_handle.size() + TABLE_BLOCK_FOOTER_SIZE); - let mut full_block = try!(self.table.read_block_(&full_block_handle)); + let block = try!(self.table.read_block(&new_block_handle)); + self.current_block = block.block.iter(); - if !self.verify_block(&full_block) && self.table.opt.skip_bad_blocks { - Err(Error::new(ErrorKind::InvalidData, "Bad block checksum!".to_string())) - } else { - // Truncate by 5, so the checksum and compression type are gone - full_block.resize(new_block_handle.size(), 0); - let block = Block::new(full_block, self.table.cmp); - self.current_block = block.iter(); - - Ok(()) - } + Ok(()) } }