changeset 24:cdfc2e935c8a

Rebase checksum verification on (internal) leveldb work
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 24 Dec 2016 14:51:24 +0000
parents e1bf0485bc96
children aa3136fdfc73
files src/block.rs src/options.rs src/table_builder.rs src/table_reader.rs
diffstat 4 files changed, 64 insertions(+), 38 deletions(-) [+]
line wrap: on
line diff
--- a/src/block.rs	Mon Jan 02 12:53:29 2017 +0100
+++ b/src/block.rs	Sat Dec 24 14:51:24 2016 +0000
@@ -49,8 +49,7 @@
         }
     }
 
-    #[allow(dead_code)]
-    fn contents(&self) -> Rc<BlockContents> {
+    pub fn contents(&self) -> Rc<BlockContents> {
         self.block.clone()
     }
 
--- a/src/options.rs	Mon Jan 02 12:53:29 2017 +0100
+++ b/src/options.rs	Sat Dec 24 14:51:24 2016 +0000
@@ -6,6 +6,14 @@
     CompressionSnappy = 1,
 }
 
+pub fn int_to_compressiontype(i: u32) -> Option<CompressionType> {
+    match i {
+        0 => Some(CompressionType::CompressionNone),
+        1 => Some(CompressionType::CompressionSnappy),
+        _ => None
+    }
+}
+
 /// [not all member types implemented yet]
 ///
 #[derive(Clone, Copy)]
--- a/src/table_builder.rs	Mon Jan 02 12:53:29 2017 +0100
+++ b/src/table_builder.rs	Sat Dec 24 14:51:24 2016 +0000
@@ -17,6 +17,9 @@
 pub const MAGIC_FOOTER_NUMBER: u64 = 0xdb4775248b80fb57;
 pub const MAGIC_FOOTER_ENCODED: [u8; 8] = [0x57, 0xfb, 0x80, 0x8b, 0x24, 0x75, 0x47, 0xdb];
 
+pub const TABLE_BLOCK_COMPRESS_LEN: usize = 1;
+pub const TABLE_BLOCK_CKSUM_LEN: usize = 4;
+
 fn find_shortest_sep<C: Comparator>(c: &C, lo: &[u8], hi: &[u8]) -> Vec<u8> {
     let min;
 
--- a/src/table_reader.rs	Mon Jan 02 12:53:29 2017 +0100
+++ b/src/table_reader.rs	Sat Dec 24 14:51:24 2016 +0000
@@ -1,15 +1,15 @@
-use block::{Block, BlockContents, BlockIter};
+use block::{Block, BlockIter};
 use blockhandle::BlockHandle;
 use table_builder::{self, Footer};
 use iterator::{Comparator, StandardComparator, SSIterator};
-use options::ReadOptions;
+use options::{self, ReadOptions, CompressionType};
 
 use integer_encoding::FixedInt;
 use crc::crc32;
 use crc::Hasher32;
 
 use std::cmp::Ordering;
-use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Result};
+use std::io::{self, Read, Seek, SeekFrom, Result};
 use std::fs::{File, OpenOptions};
 use std::path::Path;
 
@@ -33,9 +33,43 @@
 }
 
 /// Reads a block at location.
-fn read_block<R: Read + Seek>(f: &mut R, location: &BlockHandle) -> Result<BlockContents> {
+fn read_block<R: Read + Seek, C: Comparator>(cmp: &C,
+                                             f: &mut R,
+                                             location: &BlockHandle)
+                                             -> Result<TableBlock<C>> {
+    // The block is denoted by offset and length in BlockHandle. A block in an encoded
+    // table is followed by 1B compression type and 4B checksum.
     let buf = try!(read_bytes(f, location));
-    Ok(buf)
+    let compress = try!(read_bytes(f,
+                                   &BlockHandle::new(location.offset() + location.size(),
+                                                     table_builder::TABLE_BLOCK_COMPRESS_LEN)));
+    let cksum = try!(read_bytes(f,
+                                &BlockHandle::new(location.offset() + location.size() +
+                                                  table_builder::TABLE_BLOCK_COMPRESS_LEN,
+                                                  table_builder::TABLE_BLOCK_CKSUM_LEN)));
+    Ok(TableBlock {
+        block: Block::new(buf, *cmp),
+        checksum: u32::decode_fixed(&cksum),
+        compression: options::int_to_compressiontype(compress[0] as u32)
+            .unwrap_or(CompressionType::CompressionNone),
+    })
+}
+
+struct TableBlock<C: Comparator> {
+    block: Block<C>,
+    checksum: u32,
+    compression: CompressionType,
+}
+
+impl<C: Comparator> TableBlock<C> {
+    /// Verify checksum of block
+    fn verify(&self) -> bool {
+        let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
+        digest.write(&self.block.contents());
+        digest.write(&[self.compression as u8; 1]);
+
+        digest.sum32() == self.checksum
+    }
 }
 
 pub struct Table<R: Read + Seek, C: Comparator> {
@@ -71,19 +105,25 @@
     pub fn new(mut file: R, size: usize, opt: ReadOptions, cmp: C) -> Result<Table<R, C>> {
         let footer = try!(read_footer(&mut file, size));
 
-        let indexblock = Block::new(try!(read_block(&mut file, &footer.index)), cmp);
+        let indexblock = try!(read_block(&cmp, &mut file, &footer.index));
 
         Ok(Table {
             file: file,
             file_size: size,
             opt: opt,
             cmp: cmp,
-            indexblock: indexblock,
+            indexblock: indexblock.block,
         })
     }
 
-    fn read_block_(&mut self, location: &BlockHandle) -> Result<BlockContents> {
-        read_block(&mut self.file, location)
+    fn read_block(&mut self, location: &BlockHandle) -> Result<TableBlock<C>> {
+        let b = try!(read_block(&self.cmp, &mut self.file, location));
+
+        if !b.verify() && self.opt.skip_bad_blocks {
+            Err(io::Error::new(io::ErrorKind::InvalidData, "Data block failed verification"))
+        } else {
+            Ok(b)
+        }
     }
 
     /// Returns the offset of the block that contains `key`.
@@ -153,38 +193,14 @@
         }
     }
 
-    /// Verifies the CRC checksum of a block.
-    fn verify_block(&self, block: &BlockContents) -> bool {
-        let payload = &block[0..block.len() - 4];
-        let checksum = &block[block.len() - 4..];
-        let checksum = u32::decode_fixed(checksum);
-
-        let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
-        digest.write(payload);
-
-        digest.sum32() == checksum
-    }
-
     /// Load the block at `handle` into `self.current_block`
     fn load_block(&mut self, handle: &[u8]) -> Result<()> {
-        const TABLE_BLOCK_FOOTER_SIZE: usize = 5;
         let (new_block_handle, _) = BlockHandle::decode(handle);
 
-        // Also read checksum and compression! (5B)
-        let full_block_handle = BlockHandle::new(new_block_handle.offset(),
-                                                 new_block_handle.size() + TABLE_BLOCK_FOOTER_SIZE);
-        let mut full_block = try!(self.table.read_block_(&full_block_handle));
+        let block = try!(self.table.read_block(&new_block_handle));
+        self.current_block = block.block.iter();
 
-        if !self.verify_block(&full_block) && self.table.opt.skip_bad_blocks {
-            Err(Error::new(ErrorKind::InvalidData, "Bad block checksum!".to_string()))
-        } else {
-            // Truncate by 5, so the checksum and compression type are gone
-            full_block.resize(new_block_handle.size(), 0);
-            let block = Block::new(full_block, self.table.cmp);
-            self.current_block = block.iter();
-
-            Ok(())
-        }
+        Ok(())
     }
 }