changeset 41:d2559e2729e4

Completely rebase sstable code on rusty_leveldb 0.2.2
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 11 Mar 2018 16:00:32 +0100
parents 2452c9af0021
children 9cf43f11d5a6
files Cargo.toml src/block.rs src/blockhandle.rs src/filter.rs src/filter_block.rs src/lib.rs src/options.rs src/table_builder.rs src/table_reader.rs src/types.rs
diffstat 10 files changed, 1044 insertions(+), 757 deletions(-) [+]
line wrap: on
line diff
--- a/Cargo.toml	Sun Mar 11 11:29:39 2018 +0100
+++ b/Cargo.toml	Sun Mar 11 16:00:32 2018 +0100
@@ -12,3 +12,7 @@
 [dependencies]
 crc = "1.2"
 integer-encoding = "1.0"
+snap = "0.2"
+
+[dev-dependencies]
+time-test = "0.2"
--- a/src/block.rs	Sun Mar 11 11:29:39 2018 +0100
+++ b/src/block.rs	Sun Mar 11 16:00:32 2018 +0100
@@ -3,13 +3,17 @@
 use std::rc::Rc;
 
 use options::Options;
-use iterator::SSIterator;
+use types::SSIterator;
 
 use integer_encoding::FixedInt;
 use integer_encoding::VarInt;
 
 pub type BlockContents = Vec<u8>;
 
+/// A Block is an immutable ordered set of key/value entries.
+///
+/// The structure internally looks like follows:
+///
 /// A block is a list of ENTRIES followed by a list of RESTARTS, terminated by a fixed u32
 /// N_RESTARTS.
 ///
@@ -26,6 +30,7 @@
 /// A RESTART is a fixed u32 pointing to the beginning of an ENTRY.
 ///
 /// N_RESTARTS contains the number of restarts.
+#[derive(Clone)]
 pub struct Block {
     block: Rc<BlockContents>,
     opt: Options,
@@ -34,7 +39,7 @@
 impl Block {
     /// Return an iterator over this block.
     /// Note that the iterator isn't bound to the block's lifetime; the iterator uses the same
-    /// refcounted block contents as this block. (meaning also that if the iterator isn't released,
+    /// refcounted block contents as this block, meaning that if the iterator isn't released,
     /// the memory occupied by the block isn't, either)
     pub fn iter(&self) -> BlockIter {
         let restarts = u32::decode_fixed(&self.block[self.block.len() - 4..]);
@@ -67,6 +72,8 @@
     }
 }
 
+/// BlockIter is an iterator over the entries in a block. It doesn't depend on the Block's
+/// lifetime, as it uses a refcounted block underneath.
 pub struct BlockIter {
     /// The underlying block contents.
     /// TODO: Maybe (probably...) this needs an Arc.
@@ -106,8 +113,8 @@
         let (shared, non_shared, _, head_len) = self.parse_entry_and_advance();
 
         assert_eq!(shared, 0);
-
         self.assemble_key(off + head_len, shared, non_shared);
+        assert!(self.valid());
     }
 
     /// Return the offset that restart `ix` points to.
@@ -123,7 +130,7 @@
     /// Returns SHARED, NON_SHARED, VALSIZE and [length of length spec] from the current position,
     /// where 'length spec' is the length of the three values in the entry header, as described
     /// above.
-    /// Advances self.offset to point to the beginning of the next entry.
+    /// Advances self.offset to the beginning of the next entry.
     fn parse_entry_and_advance(&mut self) -> (usize, usize, usize, usize) {
         let mut i = 0;
         let (shared, sharedlen) = usize::decode_var(&self.block[self.offset..]);
@@ -136,7 +143,7 @@
         i += valsizelen;
 
         self.val_offset = self.offset + i + non_shared;
-        self.offset = self.offset + i + non_shared + valsize;
+        self.offset = self.val_offset + valsize;
 
         (shared, non_shared, valsize, i)
     }
@@ -149,8 +156,9 @@
     /// respectively non-shared parts of the key.
     /// Only self.key is mutated.
     fn assemble_key(&mut self, off: usize, shared: usize, non_shared: usize) {
-        self.key.resize(shared, 0);
-        self.key.extend_from_slice(&self.block[off..off + non_shared]);
+        self.key.truncate(shared);
+        self.key
+            .extend_from_slice(&self.block[off..off + non_shared]);
     }
 
     pub fn seek_to_last(&mut self) {
@@ -161,40 +169,41 @@
             self.reset();
         }
 
-        while let Some((_, _)) = self.next() {
+        // Stop at last entry, before the iterator becomes invalid.
+        //
+        // We're checking the position before calling advance; if a restart point points to the
+        // last entry, calling advance() will directly reset the iterator.
+        while self.offset < self.restarts_off {
+            self.advance();
         }
+        assert!(self.valid());
     }
 }
 
-impl Iterator for BlockIter {
-    type Item = (Vec<u8>, Vec<u8>);
-
-    fn next(&mut self) -> Option<Self::Item> {
+impl SSIterator for BlockIter {
+    fn advance(&mut self) -> bool {
         if self.offset >= self.restarts_off {
-            self.offset = self.restarts_off;
-            // current_entry_offset is left at the offset of the last entry
-            return None;
+            self.reset();
+            return false;
         } else {
             self.current_entry_offset = self.offset;
         }
 
         let current_off = self.current_entry_offset;
 
-        let (shared, non_shared, valsize, entry_head_len) = self.parse_entry_and_advance();
+        let (shared, non_shared, _valsize, entry_head_len) = self.parse_entry_and_advance();
         self.assemble_key(current_off + entry_head_len, shared, non_shared);
 
         // Adjust current_restart_ix
         let num_restarts = self.number_restarts();
-        while self.current_restart_ix + 1 < num_restarts &&
-              self.get_restart_point(self.current_restart_ix + 1) < self.current_entry_offset {
+        while self.current_restart_ix + 1 < num_restarts
+            && self.get_restart_point(self.current_restart_ix + 1) < self.current_entry_offset
+        {
             self.current_restart_ix += 1;
         }
-
-        Some((self.key.clone(), Vec::from(&self.block[self.val_offset..self.val_offset + valsize])))
+        true
     }
-}
 
-impl SSIterator for BlockIter {
     fn reset(&mut self) {
         self.offset = 0;
         self.val_offset = 0;
@@ -202,14 +211,14 @@
         self.key.clear();
     }
 
-    fn prev(&mut self) -> Option<Self::Item> {
+    fn prev(&mut self) -> bool {
         // as in the original implementation -- seek to last restart point, then look for key
         let orig_offset = self.current_entry_offset;
 
         // At the beginning, can't go further back
         if orig_offset == 0 {
             self.reset();
-            return None;
+            return false;
         }
 
         while self.get_restart_point(self.current_restart_ix) >= orig_offset {
@@ -230,8 +239,7 @@
         // Stop if the next entry would be the original one (self.offset always points to the start
         // of the next entry)
         loop {
-            result = self.next();
-
+            result = self.advance();
             if self.offset >= orig_offset {
                 break;
             }
@@ -276,159 +284,62 @@
     }
 
     fn valid(&self) -> bool {
-        !self.key.is_empty() && self.val_offset > 0 && self.val_offset < self.restarts_off
+        !self.key.is_empty() && self.val_offset > 0 && self.val_offset <= self.restarts_off
     }
 
-    fn current(&self) -> Option<Self::Item> {
+    fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool {
         if self.valid() {
-            Some((self.key.clone(), Vec::from(&self.block[self.val_offset..self.offset])))
+            key.clear();
+            val.clear();
+            key.extend_from_slice(&self.key);
+            val.extend_from_slice(&self.block[self.val_offset..self.offset]);
+            true
         } else {
-            None
+            false
         }
     }
 }
 
-pub struct BlockBuilder {
-    opt: Options,
-    buffer: Vec<u8>,
-    restarts: Vec<u32>,
-
-    last_key: Vec<u8>,
-    counter: usize,
-}
-
-impl BlockBuilder {
-    pub fn new(o: Options) -> BlockBuilder {
-        let mut restarts = vec![0];
-        restarts.reserve(1023);
-
-        BlockBuilder {
-            buffer: Vec::with_capacity(o.block_size),
-            opt: o,
-            restarts: restarts,
-            last_key: Vec::new(),
-            counter: 0,
-        }
-    }
-
-    pub fn entries(&self) -> usize {
-        self.counter
-    }
-
-    pub fn last_key<'a>(&'a self) -> &'a [u8] {
-        &self.last_key
-    }
-
-    pub fn size_estimate(&self) -> usize {
-        self.buffer.len() + self.restarts.len() * 4 + 4
-    }
-
-    #[allow(dead_code)]
-    pub fn reset(&mut self) {
-        self.buffer.clear();
-        self.restarts.clear();
-        self.last_key.clear();
-        self.counter = 0;
-    }
-
-    pub fn add(&mut self, key: &[u8], val: &[u8]) {
-        assert!(self.counter <= self.opt.block_restart_interval);
-        assert!(self.buffer.is_empty() ||
-                self.opt.cmp.cmp(self.last_key.as_slice(), key) == Ordering::Less);
-
-        let mut shared = 0;
-
-        if self.counter < self.opt.block_restart_interval {
-            let smallest = if self.last_key.len() < key.len() {
-                self.last_key.len()
-            } else {
-                key.len()
-            };
-
-            while shared < smallest && self.last_key[shared] == key[shared] {
-                shared += 1;
-            }
-        } else {
-            self.restarts.push(self.buffer.len() as u32);
-            self.last_key.resize(0, 0);
-            self.counter = 0;
-        }
-
-        let non_shared = key.len() - shared;
-
-        let mut buf = [0 as u8; 4];
-
-        let mut sz = shared.encode_var(&mut buf[..]);
-        self.buffer.extend_from_slice(&buf[0..sz]);
-        sz = non_shared.encode_var(&mut buf[..]);
-        self.buffer.extend_from_slice(&buf[0..sz]);
-        sz = val.len().encode_var(&mut buf[..]);
-        self.buffer.extend_from_slice(&buf[0..sz]);
-
-        self.buffer.extend_from_slice(&key[shared..]);
-        self.buffer.extend_from_slice(val);
-
-        // Update key
-        self.last_key.resize(shared, 0);
-        self.last_key.extend_from_slice(&key[shared..]);
-
-        self.counter += 1;
-    }
-
-    pub fn finish(mut self) -> BlockContents {
-        // 1. Append RESTARTS
-        let mut i = self.buffer.len();
-        self.buffer.resize(i + self.restarts.len() * 4 + 4, 0);
-
-        for r in self.restarts.iter() {
-            r.encode_fixed(&mut self.buffer[i..i + 4]);
-            i += 4;
-        }
-
-        // 2. Append N_RESTARTS
-        (self.restarts.len() as u32).encode_fixed(&mut self.buffer[i..i + 4]);
-
-        // done
-        self.buffer
-    }
-}
-
-
 #[cfg(test)]
 mod tests {
     use super::*;
-    use iterator::SSIterator;
-    use options::*;
+    use block_builder::BlockBuilder;
+    use options;
+    use test_util::{test_iterator_properties, SSIteratorIter};
+    use types::{current_key_val, SSIterator};
 
     fn get_data() -> Vec<(&'static [u8], &'static [u8])> {
-        vec![("key1".as_bytes(), "value1".as_bytes()),
-             ("loooooooooooooooooooooooooooooooooongerkey1".as_bytes(), "shrtvl1".as_bytes()),
-             ("medium length key 1".as_bytes(), "some value 2".as_bytes()),
-             ("prefix_key1".as_bytes(), "value".as_bytes()),
-             ("prefix_key2".as_bytes(), "value".as_bytes()),
-             ("prefix_key3".as_bytes(), "value".as_bytes())]
+        vec![
+            ("key1".as_bytes(), "value1".as_bytes()),
+            (
+                "loooooooooooooooooooooooooooooooooongerkey1".as_bytes(),
+                "shrtvl1".as_bytes(),
+            ),
+            ("medium length key 1".as_bytes(), "some value 2".as_bytes()),
+            ("prefix_key1".as_bytes(), "value".as_bytes()),
+            ("prefix_key2".as_bytes(), "value".as_bytes()),
+            ("prefix_key3".as_bytes(), "value".as_bytes()),
+        ]
     }
 
     #[test]
-    fn test_block_builder() {
-        let mut o = Options::default();
-        o.block_restart_interval = 3;
-
-        let mut builder = BlockBuilder::new(o);
+    fn test_block_iterator_properties() {
+        let o = options::for_test();
+        let mut builder = BlockBuilder::new(o.clone());
+        let mut data = get_data();
+        data.truncate(4);
+        for &(k, v) in data.iter() {
+            builder.add(k, v);
+        }
+        let block_contents = builder.finish();
 
-        for &(k, v) in get_data().iter() {
-            builder.add(k, v);
-            assert!(builder.counter <= 3);
-            assert_eq!(builder.last_key(), k);
-        }
-
-        let block = builder.finish();
-        assert_eq!(block.len(), 149);
+        let block = Block::new(o.clone(), block_contents).iter();
+        test_iterator_properties(block);
     }
 
     #[test]
     fn test_block_empty() {
-        let mut o = Options::default();
+        let mut o = options::for_test();
         o.block_restart_interval = 16;
         let builder = BlockBuilder::new(o);
 
@@ -436,9 +347,9 @@
         assert_eq!(blockc.len(), 8);
         assert_eq!(blockc, vec![0, 0, 0, 0, 1, 0, 0, 0]);
 
-        let block = Block::new(Options::default(), blockc);
+        let block = Block::new(options::for_test(), blockc);
 
-        for _ in block.iter() {
+        for _ in SSIteratorIter::wrap(&mut block.iter()) {
             panic!("expected 0 iterations");
         }
     }
@@ -446,19 +357,19 @@
     #[test]
     fn test_block_build_iterate() {
         let data = get_data();
-        let mut builder = BlockBuilder::new(Options::default());
+        let mut builder = BlockBuilder::new(options::for_test());
 
         for &(k, v) in data.iter() {
             builder.add(k, v);
         }
 
         let block_contents = builder.finish();
-        let block = Block::new(Options::default(), block_contents).iter();
+        let mut block = Block::new(options::for_test(), block_contents).iter();
         let mut i = 0;
 
         assert!(!block.valid());
 
-        for (k, v) in block {
+        for (k, v) in SSIteratorIter::wrap(&mut block) {
             assert_eq!(&k[..], data[i].0);
             assert_eq!(v, data[i].1);
             i += 1;
@@ -468,7 +379,7 @@
 
     #[test]
     fn test_block_iterate_reverse() {
-        let mut o = Options::default();
+        let mut o = options::for_test();
         o.block_restart_interval = 3;
         let data = get_data();
         let mut builder = BlockBuilder::new(o.clone());
@@ -481,33 +392,40 @@
         let mut block = Block::new(o.clone(), block_contents).iter();
 
         assert!(!block.valid());
-        assert_eq!(block.next(),
-                   Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec())));
+        assert_eq!(
+            block.next(),
+            Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec()))
+        );
         assert!(block.valid());
         block.next();
         assert!(block.valid());
         block.prev();
         assert!(block.valid());
-        assert_eq!(block.current(),
-                   Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec())));
+        assert_eq!(
+            current_key_val(&block),
+            Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec()))
+        );
         block.prev();
         assert!(!block.valid());
 
         // Verify that prev() from the last entry goes to the prev-to-last entry
         // (essentially, that next() returning None doesn't advance anything)
-
-        while let Some(_) = block.next() {
-        }
+        while let Some(_) = block.next() {}
 
         block.prev();
         assert!(block.valid());
-        assert_eq!(block.current(),
-                   Some(("prefix_key2".as_bytes().to_vec(), "value".as_bytes().to_vec())));
+        assert_eq!(
+            current_key_val(&block),
+            Some((
+                "prefix_key2".as_bytes().to_vec(),
+                "value".as_bytes().to_vec()
+            ))
+        );
     }
 
     #[test]
     fn test_block_seek() {
-        let mut o = Options::default();
+        let mut o = options::for_test();
         o.block_restart_interval = 3;
 
         let data = get_data();
@@ -523,33 +441,49 @@
 
         block.seek(&"prefix_key2".as_bytes());
         assert!(block.valid());
-        assert_eq!(block.current(),
-                   Some(("prefix_key2".as_bytes().to_vec(), "value".as_bytes().to_vec())));
+        assert_eq!(
+            current_key_val(&block),
+            Some((
+                "prefix_key2".as_bytes().to_vec(),
+                "value".as_bytes().to_vec()
+            ))
+        );
 
         block.seek(&"prefix_key0".as_bytes());
         assert!(block.valid());
-        assert_eq!(block.current(),
-                   Some(("prefix_key1".as_bytes().to_vec(), "value".as_bytes().to_vec())));
+        assert_eq!(
+            current_key_val(&block),
+            Some((
+                "prefix_key1".as_bytes().to_vec(),
+                "value".as_bytes().to_vec()
+            ))
+        );
 
         block.seek(&"key1".as_bytes());
         assert!(block.valid());
-        assert_eq!(block.current(),
-                   Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec())));
+        assert_eq!(
+            current_key_val(&block),
+            Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec()))
+        );
 
         block.seek(&"prefix_key3".as_bytes());
         assert!(block.valid());
-        assert_eq!(block.current(),
-                   Some(("prefix_key3".as_bytes().to_vec(), "value".as_bytes().to_vec())));
+        assert_eq!(
+            current_key_val(&block),
+            Some((
+                "prefix_key3".as_bytes().to_vec(),
+                "value".as_bytes().to_vec()
+            ))
+        );
 
         block.seek(&"prefix_key8".as_bytes());
-        assert!(block.valid());
-        assert_eq!(block.current(),
-                   Some(("prefix_key3".as_bytes().to_vec(), "value".as_bytes().to_vec())));
+        assert!(!block.valid());
+        assert_eq!(current_key_val(&block), None);
     }
 
     #[test]
     fn test_block_seek_to_last() {
-        let mut o = Options::default();
+        let mut o = options::for_test();
 
         // Test with different number of restarts
         for block_restart_interval in vec![2, 6, 10] {
@@ -568,8 +502,13 @@
 
             block.seek_to_last();
             assert!(block.valid());
-            assert_eq!(block.current(),
-                       Some(("prefix_key3".as_bytes().to_vec(), "value".as_bytes().to_vec())));
+            assert_eq!(
+                current_key_val(&block),
+                Some((
+                    "prefix_key3".as_bytes().to_vec(),
+                    "value".as_bytes().to_vec()
+                ))
+            );
         }
     }
 }
--- a/src/blockhandle.rs	Sun Mar 11 11:29:39 2018 +0100
+++ b/src/blockhandle.rs	Sun Mar 11 16:00:32 2018 +0100
@@ -4,7 +4,7 @@
 /// used typically as file-internal pointer in table (SSTable) files. For example, the index block
 /// in an SSTable is a block of (key = largest key in block) -> (value = encoded blockhandle of
 /// block).
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct BlockHandle {
     offset: usize,
     size: usize,
@@ -17,11 +17,13 @@
         let (off, offsize) = usize::decode_var(from);
         let (sz, szsize) = usize::decode_var(&from[offsize..]);
 
-        (BlockHandle {
-            offset: off,
-            size: sz,
-        },
-         offsize + szsize)
+        (
+            BlockHandle {
+                offset: off,
+                size: sz,
+            },
+            offsize + szsize,
+        )
     }
 
     pub fn new(offset: usize, size: usize) -> BlockHandle {
--- a/src/filter.rs	Sun Mar 11 11:29:39 2018 +0100
+++ b/src/filter.rs	Sun Mar 11 16:00:32 2018 +0100
@@ -1,9 +1,4 @@
-#![allow(dead_code)]
-
-//! This module is yet to be used, but will speed up table lookups.
-//!
-
-use std::sync::Arc;
+use std::rc::Rc;
 
 use integer_encoding::FixedInt;
 
@@ -13,23 +8,36 @@
 pub trait FilterPolicy {
     /// Returns a string identifying this policy.
     fn name(&self) -> &'static str;
-    /// Create a filter matching the given keys.
-    fn create_filter(&self, keys: &Vec<&[u8]>) -> Vec<u8>;
+    /// Create a filter matching the given keys. Keys are given as a long byte array that is
+    /// indexed by the offsets contained in key_offsets.
+    fn create_filter(&self, keys: &[u8], key_offsets: &[usize]) -> Vec<u8>;
     /// Check whether the given key may match the filter.
     fn key_may_match(&self, key: &[u8], filter: &[u8]) -> bool;
 }
 
 /// A boxed and refcounted filter policy (reference-counted because a Box with unsized content
 /// couldn't be cloned otherwise)
-pub type BoxedFilterPolicy = Arc<Box<FilterPolicy>>;
+pub type BoxedFilterPolicy = Rc<Box<FilterPolicy>>;
+
+impl FilterPolicy for BoxedFilterPolicy {
+    fn name(&self) -> &'static str {
+        (**self).name()
+    }
+    fn create_filter(&self, keys: &[u8], key_offsets: &[usize]) -> Vec<u8> {
+        (**self).create_filter(keys, key_offsets)
+    }
+    fn key_may_match(&self, key: &[u8], filter: &[u8]) -> bool {
+        (**self).key_may_match(key, filter)
+    }
+}
 
 /// Used for tables that don't have filter blocks but need a type parameter.
 #[derive(Clone)]
 pub struct NoFilterPolicy;
 
 impl NoFilterPolicy {
-    pub fn new() -> BoxedFilterPolicy {
-        Arc::new(Box::new(NoFilterPolicy))
+    pub fn new() -> NoFilterPolicy {
+        NoFilterPolicy
     }
 }
 
@@ -37,7 +45,7 @@
     fn name(&self) -> &'static str {
         "_"
     }
-    fn create_filter(&self, _: &Vec<&[u8]>) -> Vec<u8> {
+    fn create_filter(&self, _: &[u8], _: &[usize]) -> Vec<u8> {
         vec![]
     }
     fn key_may_match(&self, _: &[u8], _: &[u8]) -> bool {
@@ -57,8 +65,8 @@
 /// Beware the magic numbers...
 impl BloomPolicy {
     /// Returns a new boxed BloomPolicy.
-    pub fn new(bits_per_key: u32) -> BoxedFilterPolicy {
-        Arc::new(Box::new(BloomPolicy::new_unwrapped(bits_per_key)))
+    pub fn new(bits_per_key: u32) -> BloomPolicy {
+        BloomPolicy::new_unwrapped(bits_per_key)
     }
 
     /// Returns a new BloomPolicy with the given parameter.
@@ -102,7 +110,7 @@
             let mut i = 0;
 
             for b in data[ix..].iter() {
-                h += (*b as u32) << (8 * i);
+                h = h.overflowing_add((*b as u32) << (8 * i)).0;
                 i += 1;
             }
 
@@ -117,17 +125,15 @@
     fn name(&self) -> &'static str {
         "leveldb.BuiltinBloomFilter2"
     }
-    fn create_filter(&self, keys: &Vec<&[u8]>) -> Vec<u8> {
-        let filter_bits = keys.len() * self.bits_per_key as usize;
-        let mut filter = Vec::new();
+    fn create_filter(&self, keys: &[u8], key_offsets: &[usize]) -> Vec<u8> {
+        let filter_bits = key_offsets.len() * self.bits_per_key as usize;
+        let mut filter: Vec<u8>;
 
         if filter_bits < 64 {
-            // Preallocate, then resize
-            filter.reserve(8 + 1);
-            filter.resize(8, 0 as u8);
+            filter = Vec::with_capacity(8 + 1);
+            filter.resize(8, 0);
         } else {
-            // Preallocate, then resize
-            filter.reserve(1 + ((filter_bits + 7) / 8));
+            filter = Vec::with_capacity(1 + ((filter_bits + 7) / 8));
             filter.resize((filter_bits + 7) / 8, 0);
         }
 
@@ -136,16 +142,16 @@
         // Encode k at the end of the filter.
         filter.push(self.k as u8);
 
-        for key in keys {
+        // Add all keys to the filter.
+        offset_data_iterate(keys, key_offsets, |key| {
             let mut h = self.bloom_hash(key);
             let delta = (h >> 17) | (h << 15);
-
             for _ in 0..self.k {
                 let bitpos = (h % adj_filter_bits) as usize;
                 filter[bitpos / 8] |= 1 << (bitpos % 8);
                 h = (h as u64 + delta as u64) as u32;
             }
-        }
+        });
 
         filter
     }
@@ -180,30 +186,30 @@
 /// A User Key is u8*.
 /// An Internal Key is u8* u8{8} (where the second part encodes a tag and a sequence number).
 #[derive(Clone)]
-pub struct InternalFilterPolicy {
-    internal: BoxedFilterPolicy,
+pub struct InternalFilterPolicy<FP: FilterPolicy> {
+    internal: FP,
 }
 
-impl InternalFilterPolicy {
-    pub fn new(inner: BoxedFilterPolicy) -> BoxedFilterPolicy {
-        Arc::new(Box::new(InternalFilterPolicy { internal: inner }))
+impl<FP: FilterPolicy> InternalFilterPolicy<FP> {
+    pub fn new(inner: FP) -> InternalFilterPolicy<FP> {
+        InternalFilterPolicy { internal: inner }
     }
 }
 
-impl FilterPolicy for InternalFilterPolicy {
+impl<FP: FilterPolicy> FilterPolicy for InternalFilterPolicy<FP> {
     fn name(&self) -> &'static str {
         self.internal.name()
     }
 
-    fn create_filter(&self, keys: &Vec<&[u8]>) -> Vec<u8> {
-        let mut mod_keys = keys.clone();
-        let mut i = 0;
+    fn create_filter(&self, keys: &[u8], key_offsets: &[usize]) -> Vec<u8> {
+        let mut mod_keys = Vec::with_capacity(keys.len() - key_offsets.len() * 8);
+        let mut mod_key_offsets = Vec::with_capacity(key_offsets.len());
 
-        for key in keys {
-            mod_keys[i] = &key[0..key.len() - 8];
-            i += 1;
-        }
-        self.internal.create_filter(&mod_keys)
+        offset_data_iterate(keys, key_offsets, |key| {
+            mod_key_offsets.push(mod_keys.len());
+            mod_keys.extend_from_slice(&key[0..key.len() - 8]);
+        });
+        self.internal.create_filter(&mod_keys, &mod_key_offsets)
     }
 
     fn key_may_match(&self, key: &[u8], filter: &[u8]) -> bool {
@@ -211,6 +217,19 @@
     }
 }
 
+/// offset_data_iterate iterates over the entries in data that are indexed by the offsets given in
+/// offsets. This is e.g. the internal format of a FilterBlock.
+fn offset_data_iterate<F: FnMut(&[u8])>(data: &[u8], offsets: &[usize], mut f: F) {
+    for offix in 0..offsets.len() {
+        let upper = if offix == offsets.len() - 1 {
+            data.len()
+        } else {
+            offsets[offix + 1]
+        };
+        let piece = &data[offsets[offix]..upper];
+        f(piece);
+    }
+}
 
 #[cfg(test)]
 mod tests {
@@ -218,18 +237,28 @@
 
     const _BITS_PER_KEY: u32 = 12;
 
-    fn input_data() -> Vec<&'static [u8]> {
-        let data = vec!["abc123def456".as_bytes(),
-                        "xxx111xxx222".as_bytes(),
-                        "ab00cd00ab".as_bytes(),
-                        "908070605040302010".as_bytes()];
-        data
+    fn input_data() -> (Vec<u8>, Vec<usize>) {
+        let mut concat = vec![];
+        let mut offs = vec![];
+
+        for d in [
+            "abc123def456".as_bytes(),
+            "xxx111xxx222".as_bytes(),
+            "ab00cd00ab".as_bytes(),
+            "908070605040302010".as_bytes(),
+        ].iter()
+        {
+            offs.push(concat.len());
+            concat.extend_from_slice(d);
+        }
+        (concat, offs)
     }
 
     /// Creates a filter using the keys from input_data().
     fn create_filter() -> Vec<u8> {
         let fpol = BloomPolicy::new(_BITS_PER_KEY);
-        let filter = fpol.create_filter(&input_data());
+        let (data, offs) = input_data();
+        let filter = fpol.create_filter(&data, &offs);
 
         assert_eq!(filter, vec![194, 148, 129, 140, 192, 196, 132, 164, 8]);
         filter
@@ -239,10 +268,11 @@
     fn test_filter_bloom() {
         let f = create_filter();
         let fp = BloomPolicy::new(_BITS_PER_KEY);
+        let (data, offs) = input_data();
 
-        for k in input_data().iter() {
-            assert!(fp.key_may_match(k, &f));
-        }
+        offset_data_iterate(&data, &offs, |key| {
+            assert!(fp.key_may_match(key, &f));
+        });
     }
 
     #[test]
--- a/src/filter_block.rs	Sun Mar 11 11:29:39 2018 +0100
+++ b/src/filter_block.rs	Sun Mar 11 16:00:32 2018 +0100
@@ -1,7 +1,3 @@
-#![allow(dead_code)]
-
-//! parts of this module are not yet used (the reading part).
-
 use block::BlockContents;
 use filter::BoxedFilterPolicy;
 
@@ -10,8 +6,6 @@
 use integer_encoding::FixedInt;
 
 const FILTER_BASE_LOG2: u32 = 11;
-
-#[allow(dead_code)]
 const FILTER_BASE: u32 = 1 << FILTER_BASE_LOG2; // 2 KiB
 
 /// For a given byte offset, returns the index of the filter that includes the key at that offset.
@@ -28,33 +22,42 @@
 ///
 /// where offsets are 4 bytes, offset of offsets is 4 bytes, and log2 of FILTER_BASE is 1 byte.
 /// Two consecutive filter offsets may be the same.
-pub struct FilterBlockBuilder<'a> {
+///
+/// TODO: See if we can remove the lifetime parameter.
+pub struct FilterBlockBuilder {
     policy: BoxedFilterPolicy,
     // filters, concatenated
     filters: Vec<u8>,
     filter_offsets: Vec<usize>,
 
     // Reset on every start_block()
-    keys: Vec<&'a [u8]>,
+    key_offsets: Vec<usize>,
+    keys: Vec<u8>,
 }
 
-impl<'a> FilterBlockBuilder<'a> {
-    pub fn new(fp: BoxedFilterPolicy) -> FilterBlockBuilder<'a> {
+impl FilterBlockBuilder {
+    pub fn new(fp: BoxedFilterPolicy) -> FilterBlockBuilder {
         FilterBlockBuilder {
             policy: fp,
             // some pre-allocation
-            filters: Vec::with_capacity(1024 * 40),
+            filters: Vec::with_capacity(1024),
             filter_offsets: Vec::with_capacity(1024),
+            key_offsets: Vec::with_capacity(1024),
             keys: Vec::with_capacity(1024),
         }
     }
 
+    pub fn size_estimate(&self) -> usize {
+        self.filters.len() + 4 * self.filter_offsets.len() + 4 + 1
+    }
+
     pub fn filter_name(&self) -> &'static str {
         self.policy.name()
     }
 
-    pub fn add_key(&mut self, key: &'a [u8]) {
-        self.keys.push(key);
+    pub fn add_key(&mut self, key: &[u8]) {
+        self.key_offsets.push(self.keys.len());
+        self.keys.extend_from_slice(key);
     }
 
     pub fn start_block(&mut self, offset: usize) {
@@ -68,15 +71,15 @@
 
     fn generate_filter(&mut self) {
         self.filter_offsets.push(self.filters.len());
-
         if self.keys.is_empty() {
             return;
         }
 
-        let filter = self.policy.create_filter(&self.keys);
+        let filter = self.policy.create_filter(&self.keys, &self.key_offsets);
         self.filters.extend_from_slice(&filter);
 
         self.keys.clear();
+        self.key_offsets.clear();
     }
 
     pub fn finish(mut self) -> Vec<u8> {
@@ -87,7 +90,6 @@
         let mut result = self.filters;
         let offsets_offset = result.len();
         let mut ix = result.len();
-
         result.resize(ix + 4 * self.filter_offsets.len() + 5, 0);
 
         // Put filter offsets at the end
@@ -98,7 +100,6 @@
 
         (offsets_offset as u32).encode_fixed(&mut result[ix..ix + 4]);
         ix += 4;
-
         result[ix] = FILTER_BASE_LOG2 as u8;
 
         result
@@ -157,7 +158,8 @@
         assert!(filter_begin < filter_end);
         assert!(filter_end <= self.offsets_offset);
 
-        self.policy.key_may_match(key, &self.block[filter_begin..filter_end])
+        self.policy
+            .key_may_match(key, &self.block[filter_begin..filter_end])
     }
 }
 
@@ -175,12 +177,17 @@
     }
 
     fn get_keys() -> Vec<&'static [u8]> {
-        vec!["abcd".as_bytes(), "efgh".as_bytes(), "ijkl".as_bytes(), "mnopqrstuvwxyz".as_bytes()]
+        vec![
+            "abcd".as_bytes(),
+            "efgh".as_bytes(),
+            "ijkl".as_bytes(),
+            "mnopqrstuvwxyz".as_bytes(),
+        ]
     }
 
     fn produce_filter_block() -> Vec<u8> {
         let keys = get_keys();
-        let mut bld = FilterBlockBuilder::new(BloomPolicy::new(32));
+        let mut bld = FilterBlockBuilder::new(Rc::new(Box::new(BloomPolicy::new(32))));
 
         bld.start_block(0);
 
@@ -204,26 +211,38 @@
         // 2 blocks of 4 filters of 4 bytes plus 1B for `k`; plus three filter offsets (because of
         //   the block offsets of 0 and 5000); plus footer
         assert_eq!(result.len(), 2 * (get_keys().len() * 4 + 1) + (3 * 4) + 5);
-        assert_eq!(result,
-                   vec![234, 195, 25, 155, 61, 141, 173, 140, 221, 28, 222, 92, 220, 112, 234,
-                        227, 22, 234, 195, 25, 155, 61, 141, 173, 140, 221, 28, 222, 92, 220,
-                        112, 234, 227, 22, 0, 0, 0, 0, 17, 0, 0, 0, 17, 0, 0, 0, 34, 0, 0, 0, 11]);
+        assert_eq!(
+            result,
+            vec![
+                234, 195, 25, 155, 61, 141, 173, 140, 221, 28, 222, 92, 220, 112, 234, 227, 22,
+                234, 195, 25, 155, 61, 141, 173, 140, 221, 28, 222, 92, 220, 112, 234, 227, 22, 0,
+                0, 0, 0, 17, 0, 0, 0, 17, 0, 0, 0, 34, 0, 0, 0, 11,
+            ]
+        );
     }
 
     #[test]
     fn test_filter_block_build_read() {
         let result = produce_filter_block();
-        let reader = FilterBlockReader::new_owned(BloomPolicy::new(32), result);
+        let reader = FilterBlockReader::new_owned(Rc::new(Box::new(BloomPolicy::new(32))), result);
 
-        assert_eq!(reader.offset_of(get_filter_index(5121, FILTER_BASE_LOG2)),
-                   17); // third block in third filter
+        assert_eq!(
+            reader.offset_of(get_filter_index(5121, FILTER_BASE_LOG2)),
+            17
+        ); // third block in third filter
 
-        let unknown_keys = vec!["xsb".as_bytes(), "9sad".as_bytes(), "assssaaaass".as_bytes()];
+        let unknown_keys = vec![
+            "xsb".as_bytes(),
+            "9sad".as_bytes(),
+            "assssaaaass".as_bytes(),
+        ];
 
         for block_offset in vec![0, 1024, 5000, 6025].into_iter() {
             for key in get_keys().iter() {
-                assert!(reader.key_may_match(block_offset, key),
-                        format!("{} {:?} ", block_offset, key));
+                assert!(
+                    reader.key_may_match(block_offset, key),
+                    format!("{} {:?} ", block_offset, key)
+                );
             }
             for key in unknown_keys.iter() {
                 assert!(!reader.key_may_match(block_offset, key));
--- a/src/lib.rs	Sun Mar 11 11:29:39 2018 +0100
+++ b/src/lib.rs	Sun Mar 11 16:00:32 2018 +0100
@@ -1,10 +1,19 @@
 extern crate crc;
 extern crate integer_encoding;
+extern crate snap;
+#[cfg(test)]
+#[macro_use]
+extern crate time_test;
 
 mod block;
+mod block_builder;
 mod blockhandle;
+mod cache;
+pub mod error;
 pub mod filter;
 mod filter_block;
+mod table_block;
+mod test_util;
 mod types;
 
 pub mod cmp;
@@ -16,7 +25,7 @@
 pub use cmp::Cmp;
 pub use iterator::StandardComparator;
 pub use iterator::SSIterator;
-pub use options::{Options, ReadOptions};
+pub use options::Options;
 
 pub use table_builder::TableBuilder;
 pub use table_reader::{Table, TableIterator};
--- a/src/options.rs	Sun Mar 11 11:29:39 2018 +0100
+++ b/src/options.rs	Sun Mar 11 16:00:32 2018 +0100
@@ -1,14 +1,19 @@
+use block::Block;
+use cache::Cache;
 use cmp::{Cmp, DefaultCmp};
-use types::SequenceNumber;
+use filter;
+use types::{share, Shared};
 
 use std::default::Default;
-use std::sync::Arc;
+use std::rc::Rc;
 
 const KB: usize = 1 << 10;
 const MB: usize = KB * KB;
 
 const BLOCK_MAX_SIZE: usize = 4 * KB;
+const BLOCK_CACHE_CAPACITY: usize = 8 * MB;
 const WRITE_BUFFER_SIZE: usize = 4 * MB;
+const DEFAULT_BITS_PER_KEY: u32 = 10; // NOTE: This may need to be optimized.
 
 #[derive(Clone, Copy, PartialEq, Debug)]
 pub enum CompressionType {
@@ -24,62 +29,36 @@
     }
 }
 
-/// [not all member types implemented yet]
+/// Options contains general parameters for a LevelDB instance. Most of the names are
+/// self-explanatory; the defaults are defined in the `Default` implementation.
 ///
+/// Note: Compression is not yet implemented.
 #[derive(Clone)]
 pub struct Options {
-    pub cmp: Arc<Box<Cmp>>,
-    pub create_if_missing: bool,
-    pub error_if_exists: bool,
-    pub paranoid_checks: bool,
-    // pub logger: Logger,
+    pub cmp: Rc<Box<Cmp>>,
     pub write_buffer_size: usize,
-    pub max_open_files: usize,
+    pub block_cache: Shared<Cache<Block>>,
     pub block_size: usize,
     pub block_restart_interval: usize,
     pub compression_type: CompressionType,
-    pub reuse_logs: bool,
+    pub filter_policy: filter::BoxedFilterPolicy,
 }
 
 impl Default for Options {
     fn default() -> Options {
         Options {
-            cmp: Arc::new(Box::new(DefaultCmp)),
-            create_if_missing: true,
-            error_if_exists: false,
-            paranoid_checks: false,
+            cmp: Rc::new(Box::new(DefaultCmp)),
             write_buffer_size: WRITE_BUFFER_SIZE,
-            max_open_files: 1 << 10,
+            // 2000 elements by default
+            block_cache: share(Cache::new(BLOCK_CACHE_CAPACITY / BLOCK_MAX_SIZE)),
             block_size: BLOCK_MAX_SIZE,
             block_restart_interval: 16,
-            reuse_logs: false,
             compression_type: CompressionType::CompressionNone,
+            filter_policy: Rc::new(Box::new(filter::BloomPolicy::new(DEFAULT_BITS_PER_KEY))),
         }
     }
 }
 
-/// Supplied to DB read operations.
-pub struct ReadOptions {
-    pub verify_checksums: bool,
-    pub snapshot: Option<SequenceNumber>,
+pub fn for_test() -> Options {
+    Options::default()
 }
-
-impl Default for ReadOptions {
-    fn default() -> Self {
-        ReadOptions {
-            verify_checksums: false,
-            snapshot: None,
-        }
-    }
-}
-
-/// Supplied to write operations
-pub struct WriteOptions {
-    pub sync: bool,
-}
-
-impl Default for WriteOptions {
-    fn default() -> Self {
-        WriteOptions { sync: false }
-    }
-}
--- a/src/table_builder.rs	Sun Mar 11 11:29:39 2018 +0100
+++ b/src/table_builder.rs	Sun Mar 11 16:00:32 2018 +0100
@@ -1,15 +1,20 @@
-use block::{BlockBuilder, BlockContents};
+use block::BlockContents;
+use block_builder::BlockBuilder;
 use blockhandle::BlockHandle;
-use filter::{BoxedFilterPolicy, NoFilterPolicy};
+use error::Result;
+use filter::NoFilterPolicy;
 use filter_block::FilterBlockBuilder;
 use options::{CompressionType, Options};
+use types::{mask_crc, unmask_crc};
 
+use std::cmp::Ordering;
 use std::io::Write;
-use std::cmp::Ordering;
+use std::rc::Rc;
 
 use crc::crc32;
 use crc::Hasher32;
-use integer_encoding::FixedInt;
+use integer_encoding::FixedIntWriter;
+use snap::Encoder;
 
 pub const FOOTER_LENGTH: usize = 40;
 pub const FULL_FOOTER_LENGTH: usize = FOOTER_LENGTH + 8;
@@ -20,7 +25,7 @@
 pub const TABLE_BLOCK_CKSUM_LEN: usize = 4;
 
 /// Footer is a helper for encoding/decoding a table footer.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct Footer {
     pub meta_index: BlockHandle,
     pub index: BlockHandle,
@@ -70,11 +75,11 @@
 /// DATA BLOCKs, META BLOCKs, INDEX BLOCK and METAINDEX BLOCK are built using the code in
 /// the `block` module.
 ///
-/// The FOOTER consists of a BlockHandle wthat points to the metaindex block, another pointing to
+/// The FOOTER consists of a BlockHandle that points to the metaindex block, another pointing to
 /// the index block, padding to fill up to 40 B and at the end the 8B magic number
 /// 0xdb4775248b80fb57.
 
-pub struct TableBuilder<'a, Dst: Write> {
+pub struct TableBuilder<Dst: Write> {
     opt: Options,
     dst: Dst,
 
@@ -84,21 +89,21 @@
 
     data_block: Option<BlockBuilder>,
     index_block: Option<BlockBuilder>,
-    filter_block: Option<FilterBlockBuilder<'a>>,
+    filter_block: Option<FilterBlockBuilder>,
 }
 
-impl<'a, Dst: Write> TableBuilder<'a, Dst> {
-    pub fn new_no_filter(opt: Options, dst: Dst) -> TableBuilder<'a, Dst> {
-        TableBuilder::new(opt, dst, NoFilterPolicy::new())
+impl<Dst: Write> TableBuilder<Dst> {
+    pub fn new_no_filter(mut opt: Options, dst: Dst) -> TableBuilder<Dst> {
+        opt.filter_policy = Rc::new(Box::new(NoFilterPolicy::new()));
+        TableBuilder::new(opt, dst)
     }
 }
 
 /// TableBuilder is used for building a new SSTable. It groups entries into blocks,
 /// calculating checksums and bloom filters.
-impl<'a, Dst: Write> TableBuilder<'a, Dst> {
-    /// Create a new TableBuilder. Currently the best choice for `fpol` is `NoFilterPolicy` (mod
-    /// filter; or use new_no_filter())
-    pub fn new(opt: Options, dst: Dst, fpol: BoxedFilterPolicy) -> TableBuilder<'a, Dst> {
+impl<Dst: Write> TableBuilder<Dst> {
+    /// Create a new table builder.
+    pub fn new(opt: Options, dst: Dst) -> TableBuilder<Dst> {
         TableBuilder {
             opt: opt.clone(),
             dst: dst,
@@ -106,8 +111,8 @@
             prev_block_last_key: vec![],
             num_entries: 0,
             data_block: Some(BlockBuilder::new(opt.clone())),
+            filter_block: Some(FilterBlockBuilder::new(opt.filter_policy.clone())),
             index_block: Some(BlockBuilder::new(opt)),
-            filter_block: Some(FilterBlockBuilder::new(fpol)),
         }
     }
 
@@ -115,8 +120,22 @@
         self.num_entries
     }
 
+    pub fn size_estimate(&self) -> usize {
+        let mut size = 0;
+        if let Some(ref b) = self.data_block {
+            size += b.size_estimate();
+        }
+        if let Some(ref b) = self.index_block {
+            size += b.size_estimate();
+        }
+        if let Some(ref b) = self.filter_block {
+            size += b.size_estimate();
+        }
+        size + self.offset + FULL_FOOTER_LENGTH
+    }
+
     /// Add a key to the table. The key as to be lexically greater or equal to the last one added.
-    pub fn add(&mut self, key: &'a [u8], val: &[u8]) {
+    pub fn add(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
         assert!(self.data_block.is_some());
 
         if !self.prev_block_last_key.is_empty() {
@@ -124,7 +143,7 @@
         }
 
         if self.data_block.as_ref().unwrap().size_estimate() > self.opt.block_size {
-            self.write_data_block(key);
+            self.write_data_block(key)?;
         }
 
         let dblock = &mut self.data_block.as_mut().unwrap();
@@ -135,12 +154,13 @@
 
         self.num_entries += 1;
         dblock.add(key, val);
+        Ok(())
     }
 
     /// Writes an index entry for the current data_block where `next_key` is the first key of the
     /// next block.
     /// Calls write_block() for writing the block to disk.
-    fn write_data_block(&mut self, next_key: &[u8]) {
+    fn write_data_block(&mut self, next_key: &[u8]) -> Result<()> {
         assert!(self.data_block.is_some());
 
         let block = self.data_block.take().unwrap();
@@ -148,56 +168,59 @@
         self.prev_block_last_key = Vec::from(block.last_key());
         let contents = block.finish();
 
-        let handle = BlockHandle::new(self.offset, contents.len());
+        let ctype = self.opt.compression_type;
+        let handle = self.write_block(contents, ctype)?;
+
         let mut handle_enc = [0 as u8; 16];
         let enc_len = handle.encode_to(&mut handle_enc);
 
-        self.index_block.as_mut().unwrap().add(&sep, &handle_enc[0..enc_len]);
+        self.index_block
+            .as_mut()
+            .unwrap()
+            .add(&sep, &handle_enc[0..enc_len]);
         self.data_block = Some(BlockBuilder::new(self.opt.clone()));
 
-        let ctype = self.opt.compression_type;
-
-        self.write_block(contents, ctype);
-
         if let Some(ref mut fblock) = self.filter_block {
             fblock.start_block(self.offset);
         }
+
+        Ok(())
     }
 
     /// Calculates the checksum, writes the block to disk and updates the offset.
-    fn write_block(&mut self, block: BlockContents, t: CompressionType) -> BlockHandle {
-        // compression is still unimplemented
-        assert_eq!(t, CompressionType::CompressionNone);
+    fn write_block(&mut self, block: BlockContents, ctype: CompressionType) -> Result<BlockHandle> {
+        let mut data = block;
+        if ctype == CompressionType::CompressionSnappy {
+            let mut encoder = Encoder::new();
+            data = encoder.compress_vec(&data)?;
+        }
 
-        let mut buf = [0 as u8; TABLE_BLOCK_CKSUM_LEN];
         let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
 
-        digest.write(&block);
-        digest.write(&[self.opt.compression_type as u8; TABLE_BLOCK_COMPRESS_LEN]);
-        digest.sum32().encode_fixed(&mut buf);
+        digest.write(&data);
+        digest.write(&[ctype as u8; TABLE_BLOCK_COMPRESS_LEN]);
 
-        // TODO: Handle errors here.
-        let _ = self.dst.write(&block);
-        let _ = self.dst.write(&[t as u8; TABLE_BLOCK_COMPRESS_LEN]);
-        let _ = self.dst.write(&buf);
+        self.dst.write(&data)?;
+        self.dst.write(&[ctype as u8; TABLE_BLOCK_COMPRESS_LEN])?;
+        self.dst.write_fixedint(mask_crc(digest.sum32()))?;
 
-        let handle = BlockHandle::new(self.offset, block.len());
+        let handle = BlockHandle::new(self.offset, data.len());
+        self.offset += data.len() + TABLE_BLOCK_COMPRESS_LEN + TABLE_BLOCK_CKSUM_LEN;
 
-        self.offset += block.len() + TABLE_BLOCK_COMPRESS_LEN + TABLE_BLOCK_CKSUM_LEN;
-
-        handle
+        Ok(handle)
     }
 
-    pub fn finish(mut self) {
+    pub fn finish(mut self) -> Result<usize> {
         assert!(self.data_block.is_some());
         let ctype = self.opt.compression_type;
 
         // If there's a pending data block, write it
         if self.data_block.as_ref().unwrap().entries() > 0 {
             // Find a key reliably past the last key
-            let key_past_last =
-                self.opt.cmp.find_short_succ(self.data_block.as_ref().unwrap().last_key());
-            self.write_data_block(&key_past_last);
+            let key_past_last = self.opt
+                .cmp
+                .find_short_succ(self.data_block.as_ref().unwrap().last_key());
+            self.write_data_block(&key_past_last)?;
         }
 
         // Create metaindex block
@@ -208,7 +231,7 @@
             let fblock = self.filter_block.take().unwrap();
             let filter_key = format!("filter.{}", fblock.filter_name());
             let fblock_data = fblock.finish();
-            let fblock_handle = self.write_block(fblock_data, CompressionType::CompressionNone);
+            let fblock_handle = self.write_block(fblock_data, CompressionType::CompressionNone)?;
 
             let mut handle_enc = [0 as u8; 16];
             let enc_len = fblock_handle.encode_to(&mut handle_enc);
@@ -218,27 +241,28 @@
 
         // write metaindex block
         let meta_ix = meta_ix_block.finish();
-        let meta_ix_handle = self.write_block(meta_ix, ctype);
+        let meta_ix_handle = self.write_block(meta_ix, ctype)?;
 
         // write index block
         let index_cont = self.index_block.take().unwrap().finish();
-        let ix_handle = self.write_block(index_cont, ctype);
+        let ix_handle = self.write_block(index_cont, ctype)?;
 
         // write footer.
         let footer = Footer::new(meta_ix_handle, ix_handle);
         let mut buf = [0; FULL_FOOTER_LENGTH];
         footer.encode(&mut buf);
 
-        self.offset += self.dst.write(&buf[..]).unwrap();
+        self.offset += self.dst.write(&buf[..])?;
+        self.dst.flush()?;
+        Ok(self.offset)
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use super::{Footer, TableBuilder};
+    use super::*;
     use blockhandle::BlockHandle;
-    use filter::BloomPolicy;
-    use options::Options;
+    use options;
 
     #[test]
     fn test_footer() {
@@ -251,39 +275,63 @@
         assert_eq!(f2.meta_index.size(), 4);
         assert_eq!(f2.index.offset(), 55);
         assert_eq!(f2.index.size(), 5);
-
     }
 
     #[test]
     fn test_table_builder() {
         let mut d = Vec::with_capacity(512);
-        let mut opt = Options::default();
+        let mut opt = options::for_test();
         opt.block_restart_interval = 3;
-        let mut b = TableBuilder::new(opt, &mut d, BloomPolicy::new(4));
+        opt.compression_type = CompressionType::CompressionSnappy;
+        let mut b = TableBuilder::new(opt, &mut d);
 
-        let data = vec![("abc", "def"), ("abd", "dee"), ("bcd", "asa"), ("bsr", "a00")];
+        let data = vec![
+            ("abc", "def"),
+            ("abe", "dee"),
+            ("bcd", "asa"),
+            ("dcc", "a00"),
+        ];
+        let data2 = vec![
+            ("abd", "def"),
+            ("abf", "dee"),
+            ("ccd", "asa"),
+            ("dcd", "a00"),
+        ];
 
-        for &(k, v) in data.iter() {
-            b.add(k.as_bytes(), v.as_bytes());
+        for i in 0..data.len() {
+            b.add(&data[i].0.as_bytes(), &data[i].1.as_bytes()).unwrap();
+            b.add(&data2[i].0.as_bytes(), &data2[i].1.as_bytes())
+                .unwrap();
         }
 
+        let estimate = b.size_estimate();
+
+        assert_eq!(143, estimate);
         assert!(b.filter_block.is_some());
-        b.finish();
+
+        let actual = b.finish().unwrap();
+        assert_eq!(223, actual);
     }
 
     #[test]
     #[should_panic]
     fn test_bad_input() {
         let mut d = Vec::with_capacity(512);
-        let mut opt = Options::default();
+        let mut opt = options::for_test();
         opt.block_restart_interval = 3;
-        let mut b = TableBuilder::new(opt, &mut d, BloomPolicy::new(4));
+        let mut b = TableBuilder::new(opt, &mut d);
 
         // Test two equal consecutive keys
-        let data = vec![("abc", "def"), ("abc", "dee"), ("bcd", "asa"), ("bsr", "a00")];
+        let data = vec![
+            ("abc", "def"),
+            ("abc", "dee"),
+            ("bcd", "asa"),
+            ("bsr", "a00"),
+        ];
 
         for &(k, v) in data.iter() {
-            b.add(k.as_bytes(), v.as_bytes());
+            b.add(k.as_bytes(), v.as_bytes()).unwrap();
         }
+        b.finish().unwrap();
     }
 }
--- a/src/table_reader.rs	Sun Mar 11 11:29:39 2018 +0100
+++ b/src/table_reader.rs	Sun Mar 11 16:00:32 2018 +0100
@@ -1,138 +1,130 @@
-#![allow(dead_code)]
-
 use block::{Block, BlockIter};
 use blockhandle::BlockHandle;
-use filter::BoxedFilterPolicy;
+use cache;
+use error::Result;
+use filter;
 use filter_block::FilterBlockReader;
-use options::{self, CompressionType, Options};
+use options::Options;
+use table_block;
 use table_builder::{self, Footer};
-use iterator::SSIterator;
+use types::{current_key_val, RandomAccess, SSIterator};
 
 use std::cmp::Ordering;
-use std::io::{self, Read, Seek, SeekFrom, Result};
+use std::rc::Rc;
 
-use integer_encoding::FixedInt;
-use crc::crc32::{self, Hasher32};
+use integer_encoding::FixedIntWriter;
 
 /// Reads the table footer.
-fn read_footer<R: Read + Seek>(f: &mut R, size: usize) -> Result<Footer> {
-    try!(f.seek(SeekFrom::Start((size - table_builder::FULL_FOOTER_LENGTH) as u64)));
-    let mut buf = [0; table_builder::FULL_FOOTER_LENGTH];
-    try!(f.read_exact(&mut buf));
+fn read_footer(f: &RandomAccess, size: usize) -> Result<Footer> {
+    let mut buf = vec![0; table_builder::FULL_FOOTER_LENGTH];
+    f.read_at(size - table_builder::FULL_FOOTER_LENGTH, &mut buf)?;
     Ok(Footer::decode(&buf))
 }
 
-fn read_bytes<R: Read + Seek>(f: &mut R, location: &BlockHandle) -> Result<Vec<u8>> {
-    try!(f.seek(SeekFrom::Start(location.offset() as u64)));
-
-    let mut buf = Vec::new();
-    buf.resize(location.size(), 0);
-
-    try!(f.read_exact(&mut buf[0..location.size()]));
-
-    Ok(buf)
-}
-
-struct TableBlock {
-    block: Block,
-    checksum: u32,
-    compression: CompressionType,
-}
+#[derive(Clone)]
+pub struct Table {
+    file: Rc<Box<RandomAccess>>,
+    file_size: usize,
+    cache_id: cache::CacheID,
 
-impl TableBlock {
-    /// Reads a block at location.
-    fn read_block<R: Read + Seek>(opt: Options,
-                                  f: &mut R,
-                                  location: &BlockHandle)
-                                  -> Result<TableBlock> {
-        // The block is denoted by offset and length in BlockHandle. A block in an encoded
-        // table is followed by 1B compression type and 4B checksum.
-        let buf = try!(read_bytes(f, location));
-        let compress = try!(read_bytes(f,
-                                       &BlockHandle::new(location.offset() + location.size(),
-                                                         table_builder::TABLE_BLOCK_COMPRESS_LEN)));
-        let cksum = try!(read_bytes(f,
-                                    &BlockHandle::new(location.offset() + location.size() +
-                                                      table_builder::TABLE_BLOCK_COMPRESS_LEN,
-                                                      table_builder::TABLE_BLOCK_CKSUM_LEN)));
-        Ok(TableBlock {
-            block: Block::new(opt, buf),
-            checksum: u32::decode_fixed(&cksum),
-            compression: options::int_to_compressiontype(compress[0] as u32)
-                .unwrap_or(CompressionType::CompressionNone),
-        })
-    }
-
-    /// Verify checksum of block
-    fn verify(&self) -> bool {
-        let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
-        digest.write(&self.block.contents());
-        digest.write(&[self.compression as u8; 1]);
-
-        digest.sum32() == self.checksum
-    }
-}
-
-pub struct Table<R: Read + Seek> {
-    file: R,
     opt: Options,
 
     footer: Footer,
     indexblock: Block,
-    #[allow(dead_code)]
     filters: Option<FilterBlockReader>,
 }
 
-impl<R: Read + Seek> Table<R> {
-    /// Creates a new table reader.
-    pub fn new(opt: Options, mut file: R, size: usize, fp: BoxedFilterPolicy) -> Result<Table<R>> {
-        let footer = try!(read_footer(&mut file, size));
-
-        let indexblock = try!(TableBlock::read_block(opt.clone(), &mut file, &footer.index));
-        let metaindexblock =
-            try!(TableBlock::read_block(opt.clone(), &mut file, &footer.meta_index));
-
-        if !indexblock.verify() || !metaindexblock.verify() {
-            return Err(io::Error::new(io::ErrorKind::InvalidData,
-                                      "Indexblock/Metaindexblock failed verification"));
-        }
+impl Table {
+    /// Creates a new table reader operating on unformatted keys (i.e., UserKey).
+    fn new(opt: Options, file: Rc<Box<RandomAccess>>, size: usize) -> Result<Table> {
+        let footer = try!(read_footer(file.as_ref().as_ref(), size));
+        let indexblock = try!(table_block::read_table_block(
+            opt.clone(),
+            file.as_ref().as_ref(),
+            &footer.index
+        ));
+        let metaindexblock = try!(table_block::read_table_block(
+            opt.clone(),
+            file.as_ref().as_ref(),
+            &footer.meta_index
+        ));
 
-        // Open filter block for reading
-        let mut filter_block_reader = None;
-        let filter_name = format!("filter.{}", fp.name()).as_bytes().to_vec();
-
-        let mut metaindexiter = metaindexblock.block.iter();
-
-        metaindexiter.seek(&filter_name);
-
-        if let Some((_key, val)) = metaindexiter.current() {
-            let filter_block_location = BlockHandle::decode(&val).0;
-
-            if filter_block_location.size() > 0 {
-                let buf = try!(read_bytes(&mut file, &filter_block_location));
-                filter_block_reader = Some(FilterBlockReader::new_owned(fp, buf));
-            }
-        }
-
-        metaindexiter.reset();
+        let filter_block_reader =
+            Table::read_filter_block(&metaindexblock, file.as_ref().as_ref(), &opt)?;
+        let cache_id = opt.block_cache.borrow_mut().new_cache_id();
 
         Ok(Table {
             file: file,
+            file_size: size,
+            cache_id: cache_id,
             opt: opt,
             footer: footer,
             filters: filter_block_reader,
-            indexblock: indexblock.block,
+            indexblock: indexblock,
         })
     }
 
-    fn read_block(&mut self, location: &BlockHandle) -> Result<TableBlock> {
-        let b = try!(TableBlock::read_block(self.opt.clone(), &mut self.file, location));
+    fn read_filter_block(
+        metaix: &Block,
+        file: &RandomAccess,
+        options: &Options,
+    ) -> Result<Option<FilterBlockReader>> {
+        // Open filter block for reading
+        let filter_name = format!("filter.{}", options.filter_policy.name())
+            .as_bytes()
+            .to_vec();
+
+        let mut metaindexiter = metaix.iter();
+        metaindexiter.seek(&filter_name);
+
+        if let Some((_key, val)) = current_key_val(&metaindexiter) {
+            let filter_block_location = BlockHandle::decode(&val).0;
+            if filter_block_location.size() > 0 {
+                return Ok(Some(table_block::read_filter_block(
+                    file,
+                    &filter_block_location,
+                    options.filter_policy.clone(),
+                )?));
+            }
+        }
+        Ok(None)
+    }
 
-        if !b.verify() {
-            Err(io::Error::new(io::ErrorKind::InvalidData, "Data block failed verification"))
-        } else {
-            Ok(b)
+    /// block_cache_handle creates a CacheKey for a block with a given offset to be used in the
+    /// block cache.
+    fn block_cache_handle(&self, block_off: usize) -> cache::CacheKey {
+        let mut dst = [0; 2 * 8];
+        (&mut dst[..8])
+            .write_fixedint(self.cache_id)
+            .expect("error writing to vec");
+        (&mut dst[8..])
+            .write_fixedint(block_off as u64)
+            .expect("error writing to vec");
+        dst
+    }
+
+    /// Read a block from the current table at `location`, and cache it in the options' block
+    /// cache.
+    fn read_block(&self, location: &BlockHandle) -> Result<Block> {
+        let cachekey = self.block_cache_handle(location.offset());
+        if let Some(block) = self.opt.block_cache.borrow_mut().get(&cachekey) {
+            return Ok(block.clone());
         }
+
+        // Two times as_ref(): First time to get a ref from Rc<>, then one from Box<>.
+        let b = try!(table_block::read_table_block(
+            self.opt.clone(),
+            self.file.as_ref().as_ref(),
+            location
+        ));
+
+        // insert a cheap copy (Rc).
+        self.opt
+            .block_cache
+            .borrow_mut()
+            .insert(&cachekey, b.clone());
+
+        Ok(b)
     }
 
     /// Returns the offset of the block that contains `key`.
@@ -141,7 +133,7 @@
 
         iter.seek(key);
 
-        if let Some((_, val)) = iter.current() {
+        if let Some((_, val)) = current_key_val(&iter) {
             let location = BlockHandle::decode(&val).0;
             return location.offset();
         }
@@ -149,82 +141,83 @@
         return self.footer.meta_index.offset();
     }
 
-    // Iterators read from the file; thus only one iterator can be borrowed (mutably) per scope
-    fn iter<'a>(&'a mut self) -> TableIterator<'a, R> {
+    /// Iterators read from the file; thus only one iterator can be borrowed (mutably) per scope
+    pub fn iter(&self) -> TableIterator {
         let iter = TableIterator {
-            current_block: self.indexblock.iter(),
-            init: false,
+            current_block: None,
             current_block_off: 0,
             index_block: self.indexblock.iter(),
-            opt: self.opt.clone(),
-            table: self,
+            table: self.clone(),
         };
         iter
     }
 
-    /// Retrieve value from table. This function uses the attached filters, so is better suited if
-    /// you frequently look for non-existing values (as it will detect the non-existence of an
-    /// entry in a block without having to load the block).
-    pub fn get(&mut self, key: &[u8]) -> Option<Vec<u8>> {
+    /// Retrieve next-biggest entry for key from table. This function uses the attached filters, so
+    /// is better suited if you frequently look for non-existing values (as it will detect the
+    /// non-existence of an entry in a block without having to load the block).
+    ///
+    /// The caller must check if the returned key, which is the raw key (not e.g. the user portion
+    /// of an InternalKey) is acceptable (e.g. correct value type or sequence number), as it may
+    /// not be an exact match for key.
+    ///
+    /// This is done this way because some key types, like internal keys, will not result in an
+    /// exact match; it depends on other comparators than the one that the table reader knows
+    /// whether a match is acceptable.
+    pub fn get(&self, key: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
         let mut index_iter = self.indexblock.iter();
         index_iter.seek(key);
 
         let handle;
-        if let Some((last_in_block, h)) = index_iter.current() {
+        if let Some((last_in_block, h)) = current_key_val(&index_iter) {
             if self.opt.cmp.cmp(key, &last_in_block) == Ordering::Less {
                 handle = BlockHandle::decode(&h).0;
             } else {
-                return None;
+                return Ok(None);
             }
         } else {
-            return None;
+            return Ok(None);
         }
 
         // found correct block.
-        // Check bloom filter
+
+        // Check bloom (or whatever) filter
         if let Some(ref filters) = self.filters {
             if !filters.key_may_match(handle.offset(), key) {
-                return None;
+                return Ok(None);
             }
         }
 
         // Read block (potentially from cache)
-        let mut iter;
-        if let Ok(tb) = self.read_block(&handle) {
-            iter = tb.block.iter();
-        } else {
-            return None;
-        }
+        let tb = self.read_block(&handle)?;
+        let mut iter = tb.iter();
 
         // Go to entry and check if it's the wanted entry.
         iter.seek(key);
-        if let Some((k, v)) = iter.current() {
-            if self.opt.cmp.cmp(key, &k) == Ordering::Equal {
-                Some(v)
-            } else {
-                None
+        if let Some((k, v)) = current_key_val(&iter) {
+            if self.opt.cmp.cmp(&k, key) >= Ordering::Equal {
+                return Ok(Some((k, v)));
             }
-        } else {
-            None
         }
+        Ok(None)
     }
 }
 
 /// This iterator is a "TwoLevelIterator"; it uses an index block in order to get an offset hint
 /// into the data blocks.
-pub struct TableIterator<'a, R: 'a + Read + Seek> {
-    table: &'a mut Table<R>,
-    opt: Options,
-    // We're not using Option<BlockIter>, but instead a separate `init` field. That makes it easier
-    // working with the current block in the iterator methods (no borrowing annoyance as with
-    // Option<>)
-    current_block: BlockIter,
+pub struct TableIterator {
+    // A TableIterator is independent of its table (on the syntax level -- it doesn't know its
+    // Table's lifetime). This is mainly required by the dynamic iterators used everywhere, where a
+    // lifetime makes things like returning an iterator from a function neigh-impossible.
+    //
+    // Instead, reference-counted pointers and locks inside the Table ensure that all
+    // TableIterators still share a table.
+    table: Table,
+    current_block: Option<BlockIter>,
     current_block_off: usize,
-    init: bool,
     index_block: BlockIter,
 }
 
-impl<'a, R: Read + Seek> TableIterator<'a, R> {
+impl TableIterator {
     // Skips to the entry referenced by the next entry in the index block.
     // This is called once a block has run out of entries.
     // Err means corruption or I/O error; Ok(true) means a new block was loaded; Ok(false) means
@@ -240,112 +233,114 @@
     // Load the block at `handle` into `self.current_block`
     fn load_block(&mut self, handle: &[u8]) -> Result<()> {
         let (new_block_handle, _) = BlockHandle::decode(handle);
+        let block = self.table.read_block(&new_block_handle)?;
 
-        let block = try!(self.table.read_block(&new_block_handle));
-
-        self.current_block = block.block.iter();
+        self.current_block = Some(block.iter());
         self.current_block_off = new_block_handle.offset();
 
         Ok(())
     }
 }
 
-impl<'a, R: Read + Seek> Iterator for TableIterator<'a, R> {
-    type Item = (Vec<u8>, Vec<u8>);
-
-    fn next(&mut self) -> Option<Self::Item> {
-        // init essentially means that `current_block` is a data block (it's initially filled with
-        // an index block as filler).
-        if self.init {
-            if let Some((key, val)) = self.current_block.next() {
-                Some((key, val))
-            } else {
-                match self.skip_to_next_entry() {
-                    Ok(true) => self.next(),
-                    Ok(false) => None,
-                    // try next block, this might be corruption
-                    Err(_) => self.next(),
+impl SSIterator for TableIterator {
+    fn advance(&mut self) -> bool {
+        // Uninitialized case.
+        if self.current_block.is_none() {
+            match self.skip_to_next_entry() {
+                Ok(true) => return self.advance(),
+                Ok(false) => {
+                    self.reset();
+                    return false;
                 }
-            }
-        } else {
-            match self.skip_to_next_entry() {
-                Ok(true) => {
-                    // Only initialize if we got an entry
-                    self.init = true;
-                    self.next()
-                }
-                Ok(false) => None,
                 // try next block from index, this might be corruption
-                Err(_) => self.next(),
+                Err(_) => return self.advance(),
             }
         }
+
+        // Initialized case -- does the current block have more entries?
+        if let Some(ref mut cb) = self.current_block {
+            if cb.advance() {
+                return true;
+            }
+        }
+
+        // If the current block is exhausted, try loading the next block.
+        self.current_block = None;
+        match self.skip_to_next_entry() {
+            Ok(true) => self.advance(),
+            Ok(false) => {
+                self.reset();
+                false
+            }
+            // try next block, this might be corruption
+            Err(_) => self.advance(),
+        }
     }
-}
 
-impl<'a, R: Read + Seek> SSIterator for TableIterator<'a, R> {
     // A call to valid() after seeking is necessary to ensure that the seek worked (e.g., no error
     // while reading from disk)
     fn seek(&mut self, to: &[u8]) {
         // first seek in index block, rewind by one entry (so we get the next smaller index entry),
         // then set current_block and seek there
-
         self.index_block.seek(to);
 
-        if let Some((past_block, handle)) = self.index_block.current() {
-            if self.opt.cmp.cmp(to, &past_block) <= Ordering::Equal {
+        // It's possible that this is a seek past-last; reset in that case.
+        if let Some((past_block, handle)) = current_key_val(&self.index_block) {
+            if self.table.opt.cmp.cmp(to, &past_block) <= Ordering::Equal {
                 // ok, found right block: continue
                 if let Ok(()) = self.load_block(&handle) {
-                    self.current_block.seek(to);
-                    self.init = true;
+                    // current_block is always set if load_block() returned Ok.
+                    self.current_block.as_mut().unwrap().seek(to);
+                    return;
+                }
+            }
+        }
+        // Reached in case of failure.
+        self.reset();
+    }
+
+    fn prev(&mut self) -> bool {
+        // happy path: current block contains previous entry
+        if let Some(ref mut cb) = self.current_block {
+            if cb.prev() {
+                return true;
+            }
+        }
+
+        // Go back one block and look for the last entry in the previous block
+        if self.index_block.prev() {
+            if let Some((_, handle)) = current_key_val(&self.index_block) {
+                if self.load_block(&handle).is_ok() {
+                    self.current_block.as_mut().unwrap().seek_to_last();
+                    self.current_block.as_ref().unwrap().valid()
                 } else {
                     self.reset();
-                    return;
+                    false
                 }
             } else {
-                self.reset();
-                return;
+                false
             }
         } else {
-            panic!("Unexpected None from current() (bug)");
-        }
-    }
-
-    fn prev(&mut self) -> Option<Self::Item> {
-        // happy path: current block contains previous entry
-        if let Some(result) = self.current_block.prev() {
-            Some(result)
-        } else {
-            // Go back one block and look for the last entry in the previous block
-            if let Some((_, handle)) = self.index_block.prev() {
-                if self.load_block(&handle).is_ok() {
-                    self.current_block.seek_to_last();
-                    self.current_block.current()
-                } else {
-                    self.reset();
-                    None
-                }
-            } else {
-                None
-            }
+            false
         }
     }
 
     fn reset(&mut self) {
         self.index_block.reset();
-        self.init = false;
+        self.current_block = None;
     }
 
-    // This iterator is special in that it's valid even before the first call to next(). It behaves
-    // correctly, though.
+    // This iterator is special in that it's valid even before the first call to advance(). It
+    // behaves correctly, though.
     fn valid(&self) -> bool {
-        self.init && (self.current_block.valid() || self.index_block.valid())
+        self.current_block.is_some() && (self.current_block.as_ref().unwrap().valid())
     }
 
-    fn current(&self) -> Option<Self::Item> {
-        if self.init {
-            self.current_block.current()
+    fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool {
+        if let Some(ref cb) = self.current_block {
+            cb.current(key, val)
         } else {
-            None
+            false
         }
     }
 }
@@ -353,45 +348,71 @@
 #[cfg(test)]
 mod tests {
     use filter::BloomPolicy;
-    use options::Options;
+    use options::{self, CompressionType};
     use table_builder::TableBuilder;
-    use iterator::SSIterator;
-
-    use std::io::Cursor;
+    use test_util::{test_iterator_properties, SSIteratorIter};
+    use types::{current_key_val, SSIterator};
 
     use super::*;
 
     fn build_data() -> Vec<(&'static str, &'static str)> {
-        vec![// block 1
-             ("abc", "def"),
-             ("abd", "dee"),
-             ("bcd", "asa"),
-             // block 2
-             ("bsr", "a00"),
-             ("xyz", "xxx"),
-             ("xzz", "yyy"),
-             // block 3
-             ("zzz", "111")]
+        vec![
+            // block 1
+            ("abc", "def"),
+            ("abd", "dee"),
+            ("bcd", "asa"),
+            // block 2
+            ("bsr", "a00"),
+            ("xyz", "xxx"),
+            ("xzz", "yyy"),
+            // block 3
+            ("zzz", "111"),
+        ]
     }
 
-    // Build a table containing raw keys (no format)
-    fn build_table() -> (Vec<u8>, usize) {
+    // Build a table containing raw keys (no format). It returns (vector, length) for convenience
+    // reason, a call f(v, v.len()) doesn't work for borrowing reasons.
+    fn build_table(data: Vec<(&'static str, &'static str)>) -> (Vec<u8>, usize) {
         let mut d = Vec::with_capacity(512);
-        let mut opt = Options::default();
+        let mut opt = options::for_test();
         opt.block_restart_interval = 2;
         opt.block_size = 32;
+        opt.compression_type = CompressionType::CompressionSnappy;
 
         {
             // Uses the standard comparator in opt.
-            let mut b = TableBuilder::new(opt, &mut d, BloomPolicy::new(4));
-            let data = build_data();
+            let mut b = TableBuilder::new(opt, &mut d);
 
             for &(k, v) in data.iter() {
-                b.add(k.as_bytes(), v.as_bytes());
+                b.add(k.as_bytes(), v.as_bytes()).unwrap();
             }
 
-            b.finish();
+            b.finish().unwrap();
+        }
+
+        let size = d.len();
+        (d, size)
+    }
 
+    // Build a table containing keys.
+    fn build_internal_table() -> (Vec<u8>, usize) {
+        let mut d = Vec::with_capacity(512);
+        let mut opt = options::for_test();
+        opt.block_restart_interval = 1;
+        opt.block_size = 32;
+        opt.filter_policy = Rc::new(Box::new(BloomPolicy::new(4)));
+
+        let mut i = 1 as u64;
+        let data = build_data();
+
+        {
+            let mut b = TableBuilder::new(opt, &mut d);
+
+            for &(ref k, ref v) in data.iter() {
+                b.add(k.as_bytes(), v.as_bytes()).unwrap();
+            }
+
+            b.finish().unwrap();
         }
 
         let size = d.len();
@@ -399,71 +420,94 @@
         (d, size)
     }
 
-    #[test]
-    fn test_table_reader_checksum() {
-        let (mut src, size) = build_table();
-        println!("{}", size);
-
-        src[10] += 1;
+    fn wrap_buffer(src: Vec<u8>) -> Rc<Box<RandomAccess>> {
+        Rc::new(Box::new(src))
+    }
 
-        let mut table = Table::new(Options::default(),
-                                   Cursor::new(&src as &[u8]),
-                                   size,
-                                   BloomPolicy::new(4))
-            .unwrap();
+    #[test]
+    fn test_table_approximate_offset() {
+        let (src, size) = build_table(build_data());
+        let mut opt = options::for_test();
+        opt.block_size = 32;
+        let table = Table::new(opt, wrap_buffer(src), size).unwrap();
+        let mut iter = table.iter();
 
-        assert!(table.filters.is_some());
-        assert_eq!(table.filters.as_ref().unwrap().num(), 1);
-
-        {
-            let iter = table.iter();
-            // first block is skipped
-            assert_eq!(iter.count(), 4);
+        let expected_offsets = vec![0, 0, 0, 44, 44, 44, 89];
+        let mut i = 0;
+        for (k, _) in SSIteratorIter::wrap(&mut iter) {
+            assert_eq!(expected_offsets[i], table.approx_offset_of(&k));
+            i += 1;
         }
 
-        {
-            let iter = table.iter();
+        // Key-past-last returns offset of metaindex block.
+        assert_eq!(137, table.approx_offset_of("{aa".as_bytes()));
+    }
+
+    #[test]
+    fn test_table_block_cache_use() {
+        let (src, size) = build_table(build_data());
+        let mut opt = options::for_test();
+        opt.block_size = 32;
 
-            for (k, _) in iter {
-                if k == build_data()[5].0.as_bytes() {
-                    return;
-                }
-            }
+        let table = Table::new(opt.clone(), wrap_buffer(src), size).unwrap();
+        let mut iter = table.iter();
 
-            panic!("Should have hit 5th record in table!");
-        }
+        // index/metaindex blocks are not cached. That'd be a waste of memory.
+        assert_eq!(opt.block_cache.borrow().count(), 0);
+        iter.next();
+        assert_eq!(opt.block_cache.borrow().count(), 1);
+        // This may fail if block parameters or data change. In that case, adapt it.
+        iter.next();
+        iter.next();
+        iter.next();
+        iter.next();
+        assert_eq!(opt.block_cache.borrow().count(), 2);
     }
 
     #[test]
     fn test_table_iterator_fwd_bwd() {
-        let (src, size) = build_table();
+        let (src, size) = build_table(build_data());
         let data = build_data();
 
-        let mut table = Table::new(Options::default(),
-                                   Cursor::new(&src as &[u8]),
-                                   size,
-                                   BloomPolicy::new(4))
-            .unwrap();
+        let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap();
         let mut iter = table.iter();
         let mut i = 0;
 
         while let Some((k, v)) = iter.next() {
-            assert_eq!((data[i].0.as_bytes(), data[i].1.as_bytes()),
-                       (k.as_ref(), v.as_ref()));
+            assert_eq!(
+                (data[i].0.as_bytes(), data[i].1.as_bytes()),
+                (k.as_ref(), v.as_ref())
+            );
             i += 1;
         }
 
         assert_eq!(i, data.len());
-        assert!(iter.next().is_none());
+        assert!(!iter.valid());
 
+        // Go forward again, to last entry.
+        while let Some((key, _)) = iter.next() {
+            if key.as_slice() == b"zzz" {
+                break;
+            }
+        }
+
+        assert!(iter.valid());
         // backwards count
         let mut j = 0;
 
-        while let Some((k, v)) = iter.prev() {
-            j += 1;
-            assert_eq!((data[data.len() - 1 - j].0.as_bytes(),
-                        data[data.len() - 1 - j].1.as_bytes()),
-                       (k.as_ref(), v.as_ref()));
+        while iter.prev() {
+            if let Some((k, v)) = current_key_val(&iter) {
+                j += 1;
+                assert_eq!(
+                    (
+                        data[data.len() - 1 - j].0.as_bytes(),
+                        data[data.len() - 1 - j].1.as_bytes()
+                    ),
+                    (k.as_ref(), v.as_ref())
+                );
+            } else {
+                break;
+            }
         }
 
         // expecting 7 - 1, because the last entry that the iterator stopped on is the last entry
@@ -473,21 +517,17 @@
 
     #[test]
     fn test_table_iterator_filter() {
-        let (src, size) = build_table();
+        let (src, size) = build_table(build_data());
 
-        let mut table = Table::new(Options::default(),
-                                   Cursor::new(&src as &[u8]),
-                                   size,
-                                   BloomPolicy::new(4))
-            .unwrap();
+        let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap();
+        assert!(table.filters.is_some());
         let filter_reader = table.filters.clone().unwrap();
         let mut iter = table.iter();
 
         loop {
             if let Some((k, _)) = iter.next() {
                 assert!(filter_reader.key_may_match(iter.current_block_off, &k));
-                assert!(!filter_reader.key_may_match(iter.current_block_off,
-                                                     "somerandomkey".as_bytes()));
+                assert!(!filter_reader.key_may_match(iter.current_block_off, b"somerandomkey"));
             } else {
                 break;
             }
@@ -496,47 +536,48 @@
 
     #[test]
     fn test_table_iterator_state_behavior() {
-        let (src, size) = build_table();
+        let (src, size) = build_table(build_data());
 
-        let mut table = Table::new(Options::default(),
-                                   Cursor::new(&src as &[u8]),
-                                   size,
-                                   BloomPolicy::new(4))
-            .unwrap();
+        let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap();
         let mut iter = table.iter();
 
         // behavior test
 
         // See comment on valid()
         assert!(!iter.valid());
-        assert!(iter.current().is_none());
-        assert!(iter.prev().is_none());
+        assert!(current_key_val(&iter).is_none());
+        assert!(!iter.prev());
 
-        assert!(iter.next().is_some());
-        let first = iter.current();
+        assert!(iter.advance());
+        let first = current_key_val(&iter);
         assert!(iter.valid());
-        assert!(iter.current().is_some());
+        assert!(current_key_val(&iter).is_some());
 
-        assert!(iter.next().is_some());
-        assert!(iter.prev().is_some());
-        assert!(iter.current().is_some());
+        assert!(iter.advance());
+        assert!(iter.prev());
+        assert!(iter.valid());
 
         iter.reset();
         assert!(!iter.valid());
-        assert!(iter.current().is_none());
+        assert!(current_key_val(&iter).is_none());
         assert_eq!(first, iter.next());
     }
 
     #[test]
+    fn test_table_iterator_behavior_standard() {
+        let mut data = build_data();
+        data.truncate(4);
+        let (src, size) = build_table(data);
+        let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap();
+        test_iterator_properties(table.iter());
+    }
+
+    #[test]
     fn test_table_iterator_values() {
-        let (src, size) = build_table();
+        let (src, size) = build_table(build_data());
         let data = build_data();
 
-        let mut table = Table::new(Options::default(),
-                                   Cursor::new(&src as &[u8]),
-                                   size,
-                                   BloomPolicy::new(4))
-            .unwrap();
+        let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap();
         let mut iter = table.iter();
         let mut i = 0;
 
@@ -548,9 +589,11 @@
         loop {
             iter.prev();
 
-            if let Some((k, v)) = iter.current() {
-                assert_eq!((data[i].0.as_bytes(), data[i].1.as_bytes()),
-                           (k.as_ref(), v.as_ref()));
+            if let Some((k, v)) = current_key_val(&iter) {
+                assert_eq!(
+                    (data[i].0.as_bytes(), data[i].1.as_bytes()),
+                    (k.as_ref(), v.as_ref())
+                );
             } else {
                 break;
             }
@@ -567,40 +610,88 @@
 
     #[test]
     fn test_table_iterator_seek() {
-        let (src, size) = build_table();
+        let (src, size) = build_table(build_data());
 
-        let mut table = Table::new(Options::default(),
-                                   Cursor::new(&src as &[u8]),
-                                   size,
-                                   BloomPolicy::new(4))
-            .unwrap();
+        let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap();
         let mut iter = table.iter();
 
-        iter.seek("bcd".as_bytes());
+        iter.seek(b"bcd");
+        assert!(iter.valid());
+        assert_eq!(
+            current_key_val(&iter),
+            Some((b"bcd".to_vec(), b"asa".to_vec()))
+        );
+        iter.seek(b"abc");
         assert!(iter.valid());
-        assert_eq!(iter.current(),
-                   Some(("bcd".as_bytes().to_vec(), "asa".as_bytes().to_vec())));
-        iter.seek("abc".as_bytes());
+        assert_eq!(
+            current_key_val(&iter),
+            Some((b"abc".to_vec(), b"def".to_vec()))
+        );
+
+        // Seek-past-last invalidates.
+        iter.seek("{{{".as_bytes());
+        assert!(!iter.valid());
+        iter.seek(b"bbb");
         assert!(iter.valid());
-        assert_eq!(iter.current(),
-                   Some(("abc".as_bytes().to_vec(), "def".as_bytes().to_vec())));
     }
 
     #[test]
     fn test_table_get() {
-        let (src, size) = build_table();
+        let (src, size) = build_table(build_data());
+
+        let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap();
+        let table2 = table.clone();
+
+        let mut _iter = table.iter();
+        // Test that all of the table's entries are reachable via get()
+        for (k, v) in SSIteratorIter::wrap(&mut _iter) {
+            let r = table2.get(&k);
+            assert_eq!(Ok(Some((k, v))), r);
+        }
+
+        assert_eq!(table.opt.block_cache.borrow().count(), 3);
 
-        let mut table = Table::new(Options::default(),
-                                   Cursor::new(&src as &[u8]),
-                                   size,
-                                   BloomPolicy::new(4))
-            .unwrap();
+        // test that filters work and don't return anything at all.
+        assert!(table.get(b"aaa").unwrap().is_none());
+        assert!(table.get(b"aaaa").unwrap().is_none());
+        assert!(table.get(b"aa").unwrap().is_none());
+        assert!(table.get(b"abcd").unwrap().is_none());
+        assert!(table.get(b"abb").unwrap().is_none());
+        assert!(table.get(b"zzy").unwrap().is_none());
+        assert!(table.get(b"zz1").unwrap().is_none());
+        assert!(table.get("zz{".as_bytes()).unwrap().is_none());
+    }
+
+    #[test]
+    fn test_table_reader_checksum() {
+        let (mut src, size) = build_table(build_data());
 
-        assert!(table.get("aaa".as_bytes()).is_none());
-        assert_eq!(table.get("abc".as_bytes()), Some("def".as_bytes().to_vec()));
-        assert!(table.get("abcd".as_bytes()).is_none());
-        assert_eq!(table.get("bcd".as_bytes()), Some("asa".as_bytes().to_vec()));
-        assert_eq!(table.get("zzz".as_bytes()), Some("111".as_bytes().to_vec()));
-        assert!(table.get("zz1".as_bytes()).is_none());
+        src[10] += 1;
+
+        let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap();
+
+        assert!(table.filters.is_some());
+        assert_eq!(table.filters.as_ref().unwrap().num(), 1);
+
+        {
+            let mut _iter = table.iter();
+            let iter = SSIteratorIter::wrap(&mut _iter);
+            // first block is skipped
+            assert_eq!(iter.count(), 4);
+        }
+
+        {
+            let mut _iter = table.iter();
+            let iter = SSIteratorIter::wrap(&mut _iter);
+
+            for (k, _) in iter {
+                if k == build_data()[5].0.as_bytes() {
+                    return;
+                }
+            }
+
+            panic!("Should have hit 5th record in table!");
+        }
     }
+
 }
--- a/src/types.rs	Sun Mar 11 11:29:39 2018 +0100
+++ b/src/types.rs	Sun Mar 11 16:00:32 2018 +0100
@@ -1,75 +1,241 @@
-#![allow(dead_code)]
+//! A collection of fundamental and/or simple types used by other modules. A bit of a grab bag :-)
+
+use error::{err, Result, StatusCode};
+
+use std::cell::RefCell;
+use std::rc::Rc;
 
-//! A collection of fundamental and/or simple types used by other modules
+pub trait RandomAccess {
+    fn read_at(&self, off: usize, dst: &mut [u8]) -> Result<usize>;
+}
+
+/// BufferBackedFile is a simple type implementing RandomAccess on a Vec<u8>.
+pub type BufferBackedFile = Vec<u8>;
 
-use std::error::Error;
-use std::fmt::{self, Display, Formatter};
-use std::io;
-use std::result;
+impl RandomAccess for BufferBackedFile {
+    fn read_at(&self, off: usize, dst: &mut [u8]) -> Result<usize> {
+        if off > self.len() {
+            return Ok(0);
+        }
+        let remaining = self.len() - off;
+        let to_read = if dst.len() > remaining {
+            remaining
+        } else {
+            dst.len()
+        };
+        (&mut dst[0..to_read]).copy_from_slice(&self[off..off + to_read]);
+        Ok(to_read)
+    }
+}
 
-#[derive(Debug, PartialOrd, PartialEq)]
-pub enum ValueType {
-    TypeDeletion = 0,
-    TypeValue = 1,
-}
+pub const NUM_LEVELS: usize = 7;
 
 /// Represents a sequence number of a single entry.
 pub type SequenceNumber = u64;
 
 pub const MAX_SEQUENCE_NUMBER: SequenceNumber = (1 << 56) - 1;
 
-#[derive(Clone, Debug)]
-#[allow(dead_code)]
-pub enum Status {
-    OK,
-    NotFound(String),
-    Corruption(String),
-    NotSupported(String),
-    InvalidArgument(String),
-    PermissionDenied(String),
-    IOError(String),
-    Unknown(String),
+/// A shared thingy with interior mutability.
+pub type Shared<T> = Rc<RefCell<T>>;
+
+pub fn share<T>(t: T) -> Rc<RefCell<T>> {
+    Rc::new(RefCell::new(t))
+}
+
+#[derive(PartialEq)]
+pub enum Direction {
+    Forward,
+    Reverse,
+}
+
+/// Denotes a key range
+pub struct Range<'a> {
+    pub start: &'a [u8],
+    pub limit: &'a [u8],
 }
 
-impl Display for Status {
-    fn fmt(&self, fmt: &mut Formatter) -> result::Result<(), fmt::Error> {
-        fmt.write_str(self.description())
+/// An extension of the standard `Iterator` trait that supports some methods necessary for LevelDB.
+/// This works because the iterators used are stateful and keep the last returned element.
+///
+/// Note: Implementing types are expected to hold `!valid()` before the first call to `advance()`.
+///
+/// test_util::test_iterator_properties() verifies that all properties hold.
+pub trait SSIterator {
+    /// Advances the position of the iterator by one element (which can be retrieved using
+    /// current(). If no more elements are available, advance() returns false, and the iterator
+    /// becomes invalid (i.e. as if reset() had been called).
+    fn advance(&mut self) -> bool;
+    /// Return the current item (i.e. the item most recently returned by `next()`).
+    fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool;
+    /// Seek the iterator to `key` or the next bigger key. If the seek is invalid (past last
+    /// element, or before first element), the iterator is `reset()` and not valid.
+    fn seek(&mut self, key: &[u8]);
+    /// Resets the iterator to be `!valid()`, i.e. positioned before the first element.
+    fn reset(&mut self);
+    /// Returns true if the iterator is not positioned before the first or after the last element,
+    /// i.e. if `current()` would succeed.
+    fn valid(&self) -> bool;
+    /// Go to the previous item; if the iterator is moved beyond the first element, `prev()`
+    /// returns false and it will be `!valid()`. This is inefficient for most iterator
+    /// implementations.
+    fn prev(&mut self) -> bool;
+
+    // default implementations.
+
+    /// next is like Iterator::next(). It's implemented here because Rust disallows implementing a
+    /// foreign trait for any type, thus we can't do `impl<T: SSIterator> Iterator<Item=Vec<u8>>
+    /// for T {}`.
+    fn next(&mut self) -> Option<(Vec<u8>, Vec<u8>)> {
+        if !self.advance() {
+            return None;
+        }
+        let (mut key, mut val) = (vec![], vec![]);
+        if self.current(&mut key, &mut val) {
+            Some((key, val))
+        } else {
+            None
+        }
+    }
+
+    /// seek_to_first seeks to the first element.
+    fn seek_to_first(&mut self) {
+        self.reset();
+        self.advance();
+    }
+}
+
+/// current_key_val is a helper allocating two vectors and filling them with the current key/value
+/// of the specified iterator.
+pub fn current_key_val<It: SSIterator + ?Sized>(it: &It) -> Option<(Vec<u8>, Vec<u8>)> {
+    let (mut k, mut v) = (vec![], vec![]);
+    if it.current(&mut k, &mut v) {
+        Some((k, v))
+    } else {
+        None
+    }
+}
+
+impl SSIterator for Box<SSIterator> {
+    fn advance(&mut self) -> bool {
+        self.as_mut().advance()
+    }
+    fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool {
+        self.as_ref().current(key, val)
+    }
+    fn seek(&mut self, key: &[u8]) {
+        self.as_mut().seek(key)
+    }
+    fn reset(&mut self) {
+        self.as_mut().reset()
+    }
+    fn valid(&self) -> bool {
+        self.as_ref().valid()
+    }
+    fn prev(&mut self) -> bool {
+        self.as_mut().prev()
     }
 }
 
-impl Error for Status {
-    fn description(&self) -> &str {
-        match *self {
-            Status::OK => "ok",
-            Status::NotFound(ref s) => s,
-            Status::Corruption(ref s) => s,
-            Status::NotSupported(ref s) => s,
-            Status::InvalidArgument(ref s) => s,
-            Status::PermissionDenied(ref s) => s,
-            Status::IOError(ref s) => s,
-            Status::Unknown(ref s) => s,
-        }
-    }
+/// The unique (sequential) number of a file.
+pub type FileNum = u64;
+
+/// Describes a file on disk.
+#[derive(Clone, Debug, Default, PartialEq)]
+pub struct FileMetaData {
+    // default: size / 16384.
+    pub allowed_seeks: usize,
+    pub num: FileNum,
+    pub size: usize,
+    // these are in InternalKey format:
+    pub smallest: Vec<u8>,
+    pub largest: Vec<u8>,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum FileType {
+    Log,
+    DBLock,
+    Table,
+    Descriptor,
+    Current,
+    Temp,
+    InfoLog,
 }
 
-/// LevelDB's result type
-pub type Result<T> = result::Result<T, Status>;
-
-pub fn from_io_result<T>(e: io::Result<T>) -> Result<T> {
-    match e {
-        Ok(r) => result::Result::Ok(r),
-        Err(e) => {
-            let err = e.description().to_string();
+pub fn parse_file_name(f: &str) -> Result<(FileNum, FileType)> {
+    if f == "CURRENT" {
+        return Ok((0, FileType::Current));
+    } else if f == "LOCK" {
+        return Ok((0, FileType::DBLock));
+    } else if f == "LOG" || f == "LOG.old" {
+        return Ok((0, FileType::InfoLog));
+    } else if f.starts_with("MANIFEST-") {
+        if let Some(ix) = f.find('-') {
+            if let Ok(num) = FileNum::from_str_radix(&f[ix + 1..], 10) {
+                return Ok((num, FileType::Descriptor));
+            }
+            return err(
+                StatusCode::InvalidArgument,
+                "manifest file number is invalid",
+            );
+        }
+        return err(StatusCode::InvalidArgument, "manifest file has no dash");
+    } else if let Some(ix) = f.find('.') {
+        // 00012345.log 00123.sst ...
+        if let Ok(num) = FileNum::from_str_radix(&f[0..ix], 10) {
+            let typ = match &f[ix + 1..] {
+                "log" => FileType::Log,
+                "sst" | "ldb" => FileType::Table,
+                "dbtmp" => FileType::Temp,
+                _ => {
+                    return err(
+                        StatusCode::InvalidArgument,
+                        "unknown numbered file extension",
+                    )
+                }
+            };
+            return Ok((num, typ));
+        }
+        return err(
+            StatusCode::InvalidArgument,
+            "invalid file number for table or temp file",
+        );
+    }
+    err(StatusCode::InvalidArgument, "unknown file type")
+}
 
-            let r = match e.kind() {
-                io::ErrorKind::NotFound => Err(Status::NotFound(err)),
-                io::ErrorKind::InvalidData => Err(Status::Corruption(err)),
-                io::ErrorKind::InvalidInput => Err(Status::InvalidArgument(err)),
-                io::ErrorKind::PermissionDenied => Err(Status::PermissionDenied(err)),
-                _ => Err(Status::IOError(err)),
-            };
+const MASK_DELTA: u32 = 0xa282ead8;
+
+pub fn mask_crc(c: u32) -> u32 {
+    (c.wrapping_shr(15) | c.wrapping_shl(17)).wrapping_add(MASK_DELTA)
+}
+
+pub fn unmask_crc(mc: u32) -> u32 {
+    let rot = mc.wrapping_sub(MASK_DELTA);
+    (rot.wrapping_shr(17) | rot.wrapping_shl(15))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
 
-            r
+    #[test]
+    fn test_types_parse_file_name() {
+        for c in &[
+            ("CURRENT", (0, FileType::Current)),
+            ("LOCK", (0, FileType::DBLock)),
+            ("LOG", (0, FileType::InfoLog)),
+            ("LOG.old", (0, FileType::InfoLog)),
+            ("MANIFEST-01234", (1234, FileType::Descriptor)),
+            ("001122.sst", (1122, FileType::Table)),
+            ("001122.ldb", (1122, FileType::Table)),
+            ("001122.dbtmp", (1122, FileType::Temp)),
+        ] {
+            assert_eq!(parse_file_name(c.0).unwrap(), c.1);
         }
+        assert!(parse_file_name("xyz.LOCK").is_err());
+        assert!(parse_file_name("01a.sst").is_err());
+        assert!(parse_file_name("0011.abc").is_err());
+        assert!(parse_file_name("MANIFEST-trolol").is_err());
     }
 }