changeset 3:14484cc26b69

Check checksums when reading blocks
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 19 Nov 2016 14:02:15 +0100
parents 97e5170753af
children c0e7fd030c24
files src/block.rs src/key_types.rs src/lib.rs src/options.rs src/table_builder.rs src/table_reader.rs
diffstat 6 files changed, 105 insertions(+), 46 deletions(-) [+]
line wrap: on
line diff
--- a/src/block.rs	Sat Nov 19 12:57:43 2016 +0100
+++ b/src/block.rs	Sat Nov 19 14:02:15 2016 +0100
@@ -49,6 +49,11 @@
         }
     }
 
+    #[allow(dead_code)]
+    fn contents(&self) -> Rc<BlockContents> {
+        self.block.clone()
+    }
+
     pub fn new(contents: BlockContents, cmp: C) -> Block<C> {
         assert!(contents.len() > 4);
         Block {
--- a/src/key_types.rs	Sat Nov 19 12:57:43 2016 +0100
+++ b/src/key_types.rs	Sat Nov 19 14:02:15 2016 +0100
@@ -1,11 +1,1 @@
-// The following typedefs are used to distinguish between the different key formats used internally
-// by different modules.
 
-/// A UserKey is the actual key supplied by the calling application, without any internal
-/// decorations.
-pub type UserKey<'a> = &'a [u8];
-
-/// An InternalKey consists of [key, tag], so it's basically a MemtableKey without the initial
-/// length specification. This type is used as item type of MemtableIterator, and as the key
-/// type of tables.
-pub type InternalKey<'a> = &'a [u8];
--- a/src/lib.rs	Sat Nov 19 12:57:43 2016 +0100
+++ b/src/lib.rs	Sat Nov 19 14:02:15 2016 +0100
@@ -3,10 +3,16 @@
 
 mod block;
 mod blockhandle;
-mod iterator;
 mod key_types;
 
+pub mod iterator;
 pub mod options;
 pub mod table_builder;
 pub mod table_reader;
 
+pub use iterator::StandardComparator;
+pub use iterator::SSIterator;
+pub use options::Options;
+
+pub use table_builder::TableBuilder;
+pub use table_reader::{Table, TableIterator};
--- a/src/options.rs	Sat Nov 19 12:57:43 2016 +0100
+++ b/src/options.rs	Sat Nov 19 14:02:15 2016 +0100
@@ -12,6 +12,7 @@
 pub struct Options {
     pub block_size: usize,
     pub block_restart_interval: usize,
+    // Note: Compression is not implemented.
     pub compression_type: CompressionType,
 }
 
--- a/src/table_builder.rs	Sat Nov 19 12:57:43 2016 +0100
+++ b/src/table_builder.rs	Sat Nov 19 14:02:15 2016 +0100
@@ -1,6 +1,5 @@
 use block::{BlockBuilder, BlockContents};
 use blockhandle::BlockHandle;
-use key_types::InternalKey;
 use options::{CompressionType, Options};
 use iterator::Comparator;
 
@@ -16,10 +15,7 @@
 pub const MAGIC_FOOTER_NUMBER: u64 = 0xdb4775248b80fb57;
 pub const MAGIC_FOOTER_ENCODED: [u8; 8] = [0x57, 0xfb, 0x80, 0x8b, 0x24, 0x75, 0x47, 0xdb];
 
-fn find_shortest_sep<'a, C: Comparator>(c: &C,
-                                        lo: InternalKey<'a>,
-                                        hi: InternalKey<'a>)
-                                        -> Vec<u8> {
+fn find_shortest_sep<C: Comparator>(c: &C, lo: &[u8], hi: &[u8]) -> Vec<u8> {
     let min;
 
     if lo.len() < hi.len() {
@@ -120,7 +116,9 @@
         self.num_entries
     }
 
-    pub fn add<'a>(&mut self, key: InternalKey<'a>, val: &[u8]) {
+    /// Add an entry to this table. The key must be lexicographically greater than the last entry
+    /// written.
+    pub fn add(&mut self, key: &[u8], val: &[u8]) {
         assert!(self.data_block.is_some());
         assert!(self.num_entries == 0 ||
                 self.cmp.cmp(&self.prev_block_last_key, key) == Ordering::Less);
@@ -137,7 +135,7 @@
 
     /// Writes an index entry for the current data_block where `next_key` is the first key of the
     /// next block.
-    fn write_data_block<'b>(&mut self, next_key: InternalKey<'b>) {
+    fn write_data_block(&mut self, next_key: &[u8]) {
         assert!(self.data_block.is_some());
 
         let block = self.data_block.take().unwrap();
@@ -156,6 +154,7 @@
         self.write_block(contents, ctype);
     }
 
+    /// Writes a block to disk, with a trailing 4 byte CRC checksum.
     fn write_block(&mut self, c: BlockContents, t: CompressionType) -> BlockHandle {
         // compression is still unimplemented
         assert_eq!(t, CompressionType::CompressionNone);
--- a/src/table_reader.rs	Sat Nov 19 12:57:43 2016 +0100
+++ b/src/table_reader.rs	Sat Nov 19 14:02:15 2016 +0100
@@ -1,10 +1,14 @@
-use block::{Block, BlockIter};
+use block::{Block, BlockContents, BlockIter};
 use blockhandle::BlockHandle;
 use table_builder::{self, Footer};
 use iterator::{Comparator, SSIterator};
 
-use std::io::{Read, Seek, SeekFrom, Result};
+use integer_encoding::FixedInt;
+use crc::crc32;
+use crc::Hasher32;
+
 use std::cmp::Ordering;
+use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Result};
 
 /// Reads the table footer.
 fn read_footer<R: Read + Seek>(f: &mut R, size: usize) -> Result<Footer> {
@@ -26,12 +30,9 @@
 }
 
 /// Reads a block at location.
-fn read_block<R: Read + Seek, C: Comparator>(cmp: &C,
-                                             f: &mut R,
-                                             location: &BlockHandle)
-                                             -> Result<Block<C>> {
+fn read_block<R: Read + Seek>(f: &mut R, location: &BlockHandle) -> Result<BlockContents> {
     let buf = try!(read_bytes(f, location));
-    Ok(Block::new(buf, *cmp))
+    Ok(buf)
 }
 
 pub struct Table<R: Read + Seek, C: Comparator> {
@@ -47,7 +48,7 @@
     pub fn new(mut file: R, size: usize, cmp: C) -> Result<Table<R, C>> {
         let footer = try!(read_footer(&mut file, size));
 
-        let indexblock = try!(read_block(&cmp, &mut file, &footer.index));
+        let indexblock = Block::new(try!(read_block(&mut file, &footer.index)), cmp);
 
         Ok(Table {
             file: file,
@@ -57,8 +58,8 @@
         })
     }
 
-    fn read_block_(&mut self, location: &BlockHandle) -> Result<Block<C>> {
-        read_block(&self.cmp, &mut self.file, location)
+    fn read_block_(&mut self, location: &BlockHandle) -> Result<BlockContents> {
+        read_block(&mut self.file, location)
     }
 
     /// Returns the offset of the block that contains `key`.
@@ -77,13 +78,12 @@
 
     // Iterators read from the file; thus only one iterator can be borrowed (mutably) per scope
     fn iter<'a>(&'a mut self) -> TableIterator<'a, R, C> {
-        let mut iter = TableIterator {
+        let iter = TableIterator {
             current_block: self.indexblock.iter(), // just for filling in here
             index_block: self.indexblock.iter(),
             table: self,
             init: false,
         };
-        iter.skip_to_next_entry();
         iter
     }
 
@@ -112,24 +112,51 @@
 }
 
 impl<'a, C: Comparator, R: Read + Seek> TableIterator<'a, R, C> {
-    // Skips to the entry referenced by the next entry in the index block.
-    // This is called once a block has run out of entries.
-    fn skip_to_next_entry(&mut self) -> bool {
+    /// Skips to the entry referenced by the next entry in the index block.
+    /// This is called once a block has run out of entries.
+    /// Returns Ok(false) if the end has been reached, returns Err(...) if it should be retried.
+    fn skip_to_next_entry(&mut self) -> Result<bool> {
         if let Some((_key, val)) = self.index_block.next() {
-            self.load_block(&val).is_ok()
+            let r = self.load_block(&val);
+
+            if let Err(e) = r { Err(e) } else { Ok(true) }
         } else {
-            false
+            Ok(false)
         }
     }
 
-    // Load the block at `handle` into `self.current_block`
+    /// Verifies the CRC checksum of a block.
+    fn verify_block(&self, block: &BlockContents) -> bool {
+        let payload = &block[0..block.len() - 4];
+        let checksum = &block[block.len() - 4..];
+        let checksum = u32::decode_fixed(checksum);
+
+        let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
+        digest.write(payload);
+
+        digest.sum32() == checksum
+    }
+
+    /// Load the block at `handle` into `self.current_block`
     fn load_block(&mut self, handle: &[u8]) -> Result<()> {
+        const TABLE_BLOCK_FOOTER_SIZE: usize = 5;
         let (new_block_handle, _) = BlockHandle::decode(handle);
 
-        let block = try!(self.table.read_block_(&new_block_handle));
-        self.current_block = block.iter();
+        // Also read checksum and compression! (5B)
+        let full_block_handle = BlockHandle::new(new_block_handle.offset(),
+                                                 new_block_handle.size() + TABLE_BLOCK_FOOTER_SIZE);
+        let mut full_block = try!(self.table.read_block_(&full_block_handle));
 
-        Ok(())
+        if !self.verify_block(&full_block) {
+            Err(Error::new(ErrorKind::InvalidData, "Bad block checksum!".to_string()))
+        } else {
+            // Truncate by 5, so the checksum and compression type are gone
+            full_block.resize(new_block_handle.size(), 0);
+            let block = Block::new(full_block, self.table.cmp);
+            self.current_block = block.iter();
+
+            Ok(())
+        }
     }
 }
 
@@ -137,14 +164,23 @@
     type Item = (Vec<u8>, Vec<u8>);
 
     fn next(&mut self) -> Option<Self::Item> {
-        self.init = true;
+        if !self.init {
+            return match self.skip_to_next_entry() {
+                Ok(true) => {
+                    self.init = true;
+                    self.next()
+                }
+                Ok(false) => None,
+                Err(_) => self.next(),
+            };
+        }
         if let Some((key, val)) = self.current_block.next() {
             Some((key, val))
         } else {
-            if self.skip_to_next_entry() {
-                self.next()
-            } else {
-                None
+            match self.skip_to_next_entry() {
+                Ok(true) => self.next(),
+                Ok(false) => None,
+                Err(_) => self.next(),
             }
         }
     }
@@ -204,7 +240,9 @@
     fn reset(&mut self) {
         self.index_block.reset();
         self.init = false;
-        self.skip_to_next_entry();
+
+        while let Err(_) = self.skip_to_next_entry() {
+        }
     }
 
     // This iterator is special in that it's valid even before the first call to next(). It behaves
@@ -243,7 +281,7 @@
         let mut d = Vec::with_capacity(512);
         let mut opt = Options::default();
         opt.block_restart_interval = 2;
-        opt.block_size = 64;
+        opt.block_size = 32;
 
         {
             let mut b = TableBuilder::new(opt, StandardComparator, &mut d);
@@ -278,6 +316,26 @@
     }
 
     #[test]
+    fn test_table_data_corruption() {
+        let (mut src, size) = build_table();
+
+        // Mess with first block
+        src[28] += 1;
+
+        let mut table = Table::new(Cursor::new(&src as &[u8]), size, StandardComparator).unwrap();
+        let mut iter = table.iter();
+
+        // defective blocks are skipped, i.e. we should start with the second block
+
+        assert!(iter.next().is_some());
+        assert_eq!(iter.current(),
+                   Some(("bsr".as_bytes().to_vec(), "a00".as_bytes().to_vec())));
+        assert!(iter.next().is_some());
+        assert_eq!(iter.current(),
+                   Some(("xyz".as_bytes().to_vec(), "xxx".as_bytes().to_vec())));
+    }
+
+    #[test]
     fn test_table_get() {
         let (src, size) = build_table();