changeset 547:c29852db909a

Update integer-encoding crate to 3.0 There will be some fix-ups to make error handling more useful.
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 05 Mar 2022 12:53:45 +0100
parents 21885bd7b9fc
children cb5111b68cf1
files Cargo.toml src/block.rs src/blockhandle.rs src/key_types.rs src/lib.rs src/table_builder.rs src/table_reader.rs src/write_batch.rs
diffstat 8 files changed, 66 insertions(+), 47 deletions(-) [+]
line wrap: on
line diff
--- a/Cargo.toml	Sat Mar 05 12:15:39 2022 +0100
+++ b/Cargo.toml	Sat Mar 05 12:53:45 2022 +0100
@@ -9,12 +9,12 @@
 keywords = ["LevelDB", "key-value", "database", "SSTable", "Google"]
 license = "MIT"
 publish = true
-
+edition = "2015"
 include = ["src/**/*", "src/*", "Cargo.toml", "LICENSE", "README.md"]
 
 [dependencies]
 crc = "1.8"
-integer-encoding = "1.0"
+integer-encoding = "3.0"
 rand = "0.7"
 snap = "1.0"
 errno = "0.2"
--- a/src/block.rs	Sat Mar 05 12:15:39 2022 +0100
+++ b/src/block.rs	Sat Mar 05 12:53:45 2022 +0100
@@ -103,18 +103,19 @@
 
     /// Seek to restart point `ix`. After the seek, current() will return the entry at that restart
     /// point.
-    fn seek_to_restart_point(&mut self, ix: usize) {
+    fn seek_to_restart_point(&mut self, ix: usize) -> Option<()> {
         let off = self.get_restart_point(ix);
 
         self.offset = off;
         self.current_entry_offset = off;
         self.current_restart_ix = ix;
         // advances self.offset to point to the next entry
-        let (shared, non_shared, _, head_len) = self.parse_entry_and_advance();
+        let (shared, non_shared, _, head_len) = self.parse_entry_and_advance()?;
 
         assert_eq!(shared, 0);
         self.assemble_key(off + head_len, shared, non_shared);
         assert!(self.valid());
+        Some(())
     }
 
     /// Return the offset that restart `ix` points to.
@@ -131,21 +132,21 @@
     /// where 'length spec' is the length of the three values in the entry header, as described
     /// above.
     /// Advances self.offset to the beginning of the next entry.
-    fn parse_entry_and_advance(&mut self) -> (usize, usize, usize, usize) {
+    fn parse_entry_and_advance(&mut self) -> Option<(usize, usize, usize, usize)> {
         let mut i = 0;
-        let (shared, sharedlen) = usize::decode_var(&self.block[self.offset..]);
+        let (shared, sharedlen) = usize::decode_var(&self.block[self.offset..])?;
         i += sharedlen;
 
-        let (non_shared, non_sharedlen) = usize::decode_var(&self.block[self.offset + i..]);
+        let (non_shared, non_sharedlen) = usize::decode_var(&self.block[self.offset + i..])?;
         i += non_sharedlen;
 
-        let (valsize, valsizelen) = usize::decode_var(&self.block[self.offset + i..]);
+        let (valsize, valsizelen) = usize::decode_var(&self.block[self.offset + i..])?;
         i += valsizelen;
 
         self.val_offset = self.offset + i + non_shared;
         self.offset = self.val_offset + valsize;
 
-        (shared, non_shared, valsize, i)
+        Some((shared, non_shared, valsize, i))
     }
 
     /// Assemble the current key from shared and non-shared parts (an entry usually contains only
@@ -161,10 +162,10 @@
             .extend_from_slice(&self.block[off..off + non_shared]);
     }
 
-    pub fn seek_to_last(&mut self) {
+    pub fn seek_to_last(&mut self) -> Option<()> {
         if self.number_restarts() > 0 {
             let num_restarts = self.number_restarts();
-            self.seek_to_restart_point(num_restarts - 1);
+            self.seek_to_restart_point(num_restarts - 1)?;
         } else {
             self.reset();
         }
@@ -177,6 +178,7 @@
             self.advance();
         }
         assert!(self.valid());
+        Some(())
     }
 }
 
@@ -191,17 +193,23 @@
 
         let current_off = self.current_entry_offset;
 
-        let (shared, non_shared, _valsize, entry_head_len) = self.parse_entry_and_advance();
-        self.assemble_key(current_off + entry_head_len, shared, non_shared);
+        if let Some((shared, non_shared, _valsize, entry_head_len)) = self.parse_entry_and_advance() {
+            self.assemble_key(current_off + entry_head_len, shared, non_shared);
 
-        // Adjust current_restart_ix
-        let num_restarts = self.number_restarts();
-        while self.current_restart_ix + 1 < num_restarts
-            && self.get_restart_point(self.current_restart_ix + 1) < self.current_entry_offset
-        {
-            self.current_restart_ix += 1;
+            // Adjust current_restart_ix
+            let num_restarts = self.number_restarts();
+            while self.current_restart_ix + 1 < num_restarts
+                && self.get_restart_point(self.current_restart_ix + 1) < self.current_entry_offset
+                {
+                    self.current_restart_ix += 1;
+                }
+            true
+        } else {
+            #[cfg(debug_assertions)]
+            panic!("parse_entry_and_advance(): couldn't parse entry head at/after {:?}", self.key);
+            #[allow(unreachable_code)]
+            false
         }
-        true
     }
 
     fn reset(&mut self) {
--- a/src/blockhandle.rs	Sat Mar 05 12:15:39 2022 +0100
+++ b/src/blockhandle.rs	Sat Mar 05 12:53:45 2022 +0100
@@ -13,16 +13,16 @@
 impl BlockHandle {
     /// Decodes a block handle from `from` and returns a block handle
     /// together with how many bytes were read from the slice.
-    pub fn decode(from: &[u8]) -> (BlockHandle, usize) {
-        let (off, offsize) = usize::decode_var(from);
-        let (sz, szsize) = usize::decode_var(&from[offsize..]);
+    pub fn decode(from: &[u8]) -> Option<(BlockHandle, usize)> {
+        let (off, offsize) = usize::decode_var(from)?;
+        let (sz, szsize) = usize::decode_var(&from[offsize..])?;
 
-        (
+        Some((
             BlockHandle {
                 offset: off,
                 size: sz,
             },
-            offsize + szsize,
+            offsize + szsize)
         )
     }
 
@@ -57,7 +57,7 @@
         let mut dst = [0 as u8; 128];
         let enc_sz = bh.encode_to(&mut dst[..]);
 
-        let (bh2, dec_sz) = BlockHandle::decode(&dst);
+        let (bh2, dec_sz) = BlockHandle::decode(&dst).unwrap();
 
         assert_eq!(enc_sz, dec_sz);
         assert_eq!(bh.size(), bh2.size());
--- a/src/key_types.rs	Sat Mar 05 12:15:39 2022 +0100
+++ b/src/key_types.rs	Sat Mar 05 12:53:45 2022 +0100
@@ -135,14 +135,14 @@
 /// If the key only contains (keylen, key, tag), the vallen and val offset return values will be
 /// meaningless.
 pub fn parse_memtable_key(mkey: MemtableKey) -> (usize, usize, u64, usize, usize) {
-    let (keylen, mut i): (usize, usize) = VarInt::decode_var(&mkey);
+    let (keylen, mut i): (usize, usize) = VarInt::decode_var(&mkey).unwrap();
     let keyoff = i;
     i += keylen - 8;
 
     if mkey.len() > i {
         let tag = FixedInt::decode_fixed(&mkey[i..i + 8]);
         i += 8;
-        let (vallen, j): (usize, usize) = VarInt::decode_var(&mkey[i..]);
+        let (vallen, j): (usize, usize) = VarInt::decode_var(&mkey[i..]).unwrap();
         i += j;
         let valoff = i;
         (keylen - 8, keyoff, tag, vallen, valoff)
@@ -157,8 +157,8 @@
     a: MemtableKey<'a>,
     b: MemtableKey<'b>,
 ) -> Ordering {
-    let (alen, aoff): (usize, usize) = VarInt::decode_var(&a);
-    let (blen, boff): (usize, usize) = VarInt::decode_var(&b);
+    let (alen, aoff): (usize, usize) = VarInt::decode_var(&a).unwrap();
+    let (blen, boff): (usize, usize) = VarInt::decode_var(&b).unwrap();
     let userkey_a = &a[aoff..aoff + alen - 8];
     let userkey_b = &b[boff..boff + blen - 8];
 
@@ -229,7 +229,7 @@
         assert_eq!(lk1.key.capacity(), 14);
 
         assert_eq!(lk1.user_key(), "abcde".as_bytes());
-        assert_eq!(u32::decode_var(lk1.memtable_key()), (13, 1));
+        assert_eq!(u32::decode_var(lk1.memtable_key()).unwrap(), (13, 1));
         assert_eq!(
             lk2.internal_key(),
             vec![120, 121, 97, 98, 120, 121, 1, 97, 0, 0, 0, 0, 0, 0].as_slice()
--- a/src/lib.rs	Sat Mar 05 12:15:39 2022 +0100
+++ b/src/lib.rs	Sat Mar 05 12:53:45 2022 +0100
@@ -34,6 +34,9 @@
 #[macro_use]
 extern crate time_test;
 
+#[macro_use]
+mod infolog;
+
 mod block;
 mod block_builder;
 mod blockhandle;
@@ -45,8 +48,6 @@
 mod error;
 mod filter;
 mod filter_block;
-#[macro_use]
-mod infolog;
 mod key_types;
 mod log;
 mod mem_env;
--- a/src/table_builder.rs	Sat Mar 05 12:15:39 2022 +0100
+++ b/src/table_builder.rs	Sat Mar 05 12:53:45 2022 +0100
@@ -45,16 +45,16 @@
         }
     }
 
-    pub fn decode(from: &[u8]) -> Footer {
+    pub fn decode(from: &[u8]) -> Option<Footer> {
         assert!(from.len() >= FULL_FOOTER_LENGTH);
         assert_eq!(&from[FOOTER_LENGTH..], &MAGIC_FOOTER_ENCODED);
-        let (meta, metalen) = BlockHandle::decode(&from[0..]);
-        let (ix, _) = BlockHandle::decode(&from[metalen..]);
+        let (meta, metalen) = BlockHandle::decode(&from[0..])?;
+        let (ix, _) = BlockHandle::decode(&from[metalen..])?;
 
-        Footer {
+        Some(Footer {
             meta_index: meta,
             index: ix,
-        }
+        })
     }
 
     pub fn encode(&self, to: &mut [u8]) {
@@ -286,7 +286,7 @@
         let mut buf = [0; 48];
         f.encode(&mut buf[..]);
 
-        let f2 = Footer::decode(&buf);
+        let f2 = Footer::decode(&buf).unwrap();
         assert_eq!(f2.meta_index.offset(), 44);
         assert_eq!(f2.meta_index.size(), 4);
         assert_eq!(f2.index.offset(), 55);
--- a/src/table_reader.rs	Sat Mar 05 12:15:39 2022 +0100
+++ b/src/table_reader.rs	Sat Mar 05 12:53:45 2022 +0100
@@ -3,7 +3,7 @@
 use cache;
 use cmp::InternalKeyCmp;
 use env::RandomAccess;
-use error::Result;
+use error::{self, err, Result};
 use filter;
 use filter_block::FilterBlockReader;
 use key_types::InternalKey;
@@ -21,7 +21,10 @@
 fn read_footer(f: &dyn RandomAccess, size: usize) -> Result<Footer> {
     let mut buf = vec![0; table_builder::FULL_FOOTER_LENGTH];
     f.read_at(size - table_builder::FULL_FOOTER_LENGTH, &mut buf)?;
-    Ok(Footer::decode(&buf))
+    match Footer::decode(&buf) {
+        Some(ok) => Ok(ok),
+        None => err(error::StatusCode::Corruption, &format!("Couldn't decode damaged footer {:?}", &buf))
+    }
 }
 
 #[derive(Clone)]
@@ -75,7 +78,11 @@
         metaindexiter.seek(&filter_name);
 
         if let Some((_key, val)) = current_key_val(&metaindexiter) {
-            let filter_block_location = BlockHandle::decode(&val).0;
+            let fbl = BlockHandle::decode(&val);
+            let filter_block_location = match fbl {
+                None => return err(error::StatusCode::Corruption, &format!("Couldn't decode corrupt blockhandle {:?}", &val)),
+                Some(ok) => ok.0
+            };
             if filter_block_location.size() > 0 {
                 return Ok(Some(table_block::read_filter_block(
                     file,
@@ -139,7 +146,7 @@
         iter.seek(key);
 
         if let Some((_, val)) = current_key_val(&iter) {
-            let location = BlockHandle::decode(&val).0;
+            let location = BlockHandle::decode(&val).unwrap().0;
             return location.offset();
         }
 
@@ -174,7 +181,7 @@
         let handle;
         if let Some((last_in_block, h)) = current_key_val(&index_iter) {
             if self.opt.cmp.cmp(key, &last_in_block) == Ordering::Less {
-                handle = BlockHandle::decode(&h).0;
+                handle = BlockHandle::decode(&h).unwrap().0;
             } else {
                 return Ok(None);
             }
@@ -236,7 +243,10 @@
 
     // Load the block at `handle` into `self.current_block`
     fn load_block(&mut self, handle: &[u8]) -> Result<()> {
-        let (new_block_handle, _) = BlockHandle::decode(handle);
+        let (new_block_handle, _) = match BlockHandle::decode(handle) {
+            None => return err(error::StatusCode::Corruption, "Couldn't decode corrupt block handle"),
+            Some(ok) => ok
+        };
         let block = self.table.read_block(&new_block_handle)?;
 
         self.current_block = Some(block.iter());
--- a/src/write_batch.rs	Sat Mar 05 12:15:39 2022 +0100
+++ b/src/write_batch.rs	Sat Mar 05 12:53:45 2022 +0100
@@ -125,13 +125,13 @@
         let tag = self.batch.entries[self.ix];
         self.ix += 1;
 
-        let (klen, l) = usize::decode_var(&self.batch.entries[self.ix..]);
+        let (klen, l) = usize::decode_var(&self.batch.entries[self.ix..])?;
         self.ix += l;
         let k = &self.batch.entries[self.ix..self.ix + klen];
         self.ix += klen;
 
         if tag == ValueType::TypeValue as u8 {
-            let (vlen, m) = usize::decode_var(&self.batch.entries[self.ix..]);
+            let (vlen, m) = usize::decode_var(&self.batch.entries[self.ix..])?;
             self.ix += m;
             let v = &self.batch.entries[self.ix..self.ix + vlen];
             self.ix += vlen;