changeset 25:04468480cdcf

Rebase SSTable code on latest LevelDB development (non-public)
author Lewin Bormann <lbo@spheniscida.de>
date Tue, 03 Jan 2017 19:28:14 +0100
parents 620ac9082634
children d380295dba2f
files src/block.rs src/lib.rs src/options.rs src/table_builder.rs src/table_reader.rs
diffstat 5 files changed, 690 insertions(+), 468 deletions(-) [+]
line wrap: on
line diff
--- a/src/block.rs	Mon Jan 02 12:53:29 2017 +0100
+++ b/src/block.rs	Tue Jan 03 19:28:14 2017 +0100
@@ -2,8 +2,8 @@
 
 use std::rc::Rc;
 
-use options::BuildOptions;
-use iterator::{SSIterator, Comparator};
+use options::Options;
+use iterator::SSIterator;
 
 use integer_encoding::FixedInt;
 use integer_encoding::VarInt;
@@ -26,19 +26,24 @@
 /// A RESTART is a fixed u32 pointing to the beginning of an ENTRY.
 ///
 /// N_RESTARTS contains the number of restarts.
-pub struct Block<C: Comparator> {
+pub struct Block {
     block: Rc<BlockContents>,
-    cmp: C,
+    opt: Options,
 }
 
-impl<C: Comparator> Block<C> {
-    pub fn iter(&self) -> BlockIter<C> {
+impl Block {
+    /// Return an iterator over this block.
+    /// Note that the iterator isn't bound to the block's lifetime; the iterator uses the same
+    /// refcounted block contents as this block. (meaning also that if the iterator isn't released,
+    /// the memory occupied by the block isn't, either)
+    pub fn iter(&self) -> BlockIter {
         let restarts = u32::decode_fixed(&self.block[self.block.len() - 4..]);
         let restart_offset = self.block.len() - 4 - 4 * restarts as usize;
 
         BlockIter {
             block: self.block.clone(),
-            cmp: self.cmp,
+            opt: self.opt.clone(),
+
             offset: 0,
             restarts_off: restart_offset,
             current_entry_offset: 0,
@@ -53,47 +58,73 @@
         self.block.clone()
     }
 
-    pub fn new(contents: BlockContents, cmp: C) -> Block<C> {
+    pub fn new(opt: Options, contents: BlockContents) -> Block {
         assert!(contents.len() > 4);
         Block {
             block: Rc::new(contents),
-            cmp: cmp,
+            opt: opt,
         }
     }
 }
 
-
-#[derive(Debug)]
-pub struct BlockIter<C: Comparator> {
+pub struct BlockIter {
+    /// The underlying block contents.
+    /// TODO: Maybe (probably...) this needs an Arc.
     block: Rc<BlockContents>,
-    cmp: C,
-    // start of next entry
+    opt: Options,
+    /// offset of restarts area within the block.
+    restarts_off: usize,
+
+    /// start of next entry to be parsed.
     offset: usize,
-    // offset of restarts area
-    restarts_off: usize,
+    /// offset of the current entry.
     current_entry_offset: usize,
-    // tracks the last restart we encountered
+    /// index of the most recent restart.
     current_restart_ix: usize,
 
-    // We assemble the key from two parts usually, so we keep the current full key here.
+    /// We assemble the key from two parts usually, so we keep the current full key here.
     key: Vec<u8>,
+    /// Offset of the current value within the block.
     val_offset: usize,
 }
 
-impl<C: Comparator> BlockIter<C> {
+impl BlockIter {
+    /// Return the number of restarts in this block.
     fn number_restarts(&self) -> usize {
         u32::decode_fixed(&self.block[self.block.len() - 4..]) as usize
     }
 
+    /// Seek to restart point `ix`. After the seek, current() will return the entry at that restart
+    /// point.
+    fn seek_to_restart_point(&mut self, ix: usize) {
+        let off = self.get_restart_point(ix);
+
+        self.offset = off;
+        self.current_entry_offset = off;
+        self.current_restart_ix = ix;
+        // advances self.offset to point to the next entry
+        let (shared, non_shared, _, head_len) = self.parse_entry_and_advance();
+
+        assert_eq!(shared, 0);
+
+        self.assemble_key(off + head_len, shared, non_shared);
+    }
+
+    /// Return the offset that restart `ix` points to.
     fn get_restart_point(&self, ix: usize) -> usize {
         let restart = self.restarts_off + 4 * ix;
         u32::decode_fixed(&self.block[restart..restart + 4]) as usize
     }
-}
 
-impl<C: Comparator> BlockIter<C> {
-    // Returns SHARED, NON_SHARED and VALSIZE from the current position. Advances self.offset.
-    fn parse_entry(&mut self) -> (usize, usize, usize) {
+    /// The layout of an entry is
+    /// [SHARED varint, NON_SHARED varint, VALSIZE varint, KEY (NON_SHARED bytes),
+    ///  VALUE (VALSIZE bytes)].
+    ///
+    /// Returns SHARED, NON_SHARED, VALSIZE and [length of length spec] from the current position,
+    /// where 'length spec' is the length of the three values in the entry header, as described
+    /// above.
+    /// Advances self.offset to point to the beginning of the next entry.
+    fn parse_entry_and_advance(&mut self) -> (usize, usize, usize, usize) {
         let mut i = 0;
         let (shared, sharedlen) = usize::decode_var(&self.block[self.offset..]);
         i += sharedlen;
@@ -104,89 +135,104 @@
         let (valsize, valsizelen) = usize::decode_var(&self.block[self.offset + i..]);
         i += valsizelen;
 
-        self.offset += i;
+        self.val_offset = self.offset + i + non_shared;
+        self.offset = self.offset + i + non_shared + valsize;
 
-        (shared, non_shared, valsize)
+        (shared, non_shared, valsize, i)
     }
 
-    /// offset is assumed to be at the beginning of the non-shared key part.
-    /// offset is not advanced.
-    fn assemble_key(&mut self, shared: usize, non_shared: usize) {
+    /// Assemble the current key from shared and non-shared parts (an entry usually contains only
+    /// the part of the key that is different from the previous key).
+    ///
+    /// `off` is the offset of the key string within the whole block (self.current_entry_offset
+    /// + entry header length); `shared` and `non_shared` are the lengths of the shared
+    /// respectively non-shared parts of the key.
+    /// Only self.key is mutated.
+    fn assemble_key(&mut self, off: usize, shared: usize, non_shared: usize) {
         self.key.resize(shared, 0);
-        self.key.extend_from_slice(&self.block[self.offset..self.offset + non_shared]);
+        self.key.extend_from_slice(&self.block[off..off + non_shared]);
     }
 
     pub fn seek_to_last(&mut self) {
         if self.number_restarts() > 0 {
-            let restart = self.get_restart_point(self.number_restarts() - 1);
-            self.offset = restart;
-            self.current_entry_offset = restart;
-            self.current_restart_ix = self.number_restarts() - 1;
+            let num_restarts = self.number_restarts();
+            self.seek_to_restart_point(num_restarts - 1);
         } else {
             self.reset();
         }
 
         while let Some((_, _)) = self.next() {
         }
-
-        self.prev();
     }
 }
 
-impl<C: Comparator> Iterator for BlockIter<C> {
+impl Iterator for BlockIter {
     type Item = (Vec<u8>, Vec<u8>);
 
     fn next(&mut self) -> Option<Self::Item> {
-        self.current_entry_offset = self.offset;
-
-        if self.current_entry_offset >= self.restarts_off {
+        if self.offset >= self.restarts_off {
+            self.offset = self.restarts_off;
+            // current_entry_offset is left at the offset of the last entry
             return None;
+        } else {
+            self.current_entry_offset = self.offset;
         }
-        let (shared, non_shared, valsize) = self.parse_entry();
-        self.assemble_key(shared, non_shared);
+
+        let current_off = self.current_entry_offset;
 
-        self.val_offset = self.offset + non_shared;
-        self.offset = self.val_offset + valsize;
+        let (shared, non_shared, valsize, entry_head_len) = self.parse_entry_and_advance();
+        self.assemble_key(current_off + entry_head_len, shared, non_shared);
 
+        // Adjust current_restart_ix
         let num_restarts = self.number_restarts();
         while self.current_restart_ix + 1 < num_restarts &&
               self.get_restart_point(self.current_restart_ix + 1) < self.current_entry_offset {
             self.current_restart_ix += 1;
         }
+
         Some((self.key.clone(), Vec::from(&self.block[self.val_offset..self.val_offset + valsize])))
     }
 }
 
-impl<C: Comparator> SSIterator for BlockIter<C> {
+impl SSIterator for BlockIter {
     fn reset(&mut self) {
         self.offset = 0;
+        self.val_offset = 0;
         self.current_restart_ix = 0;
         self.key.clear();
-        self.val_offset = 0;
     }
+
     fn prev(&mut self) -> Option<Self::Item> {
         // as in the original implementation -- seek to last restart point, then look for key
-        let current_offset = self.current_entry_offset;
+        let orig_offset = self.current_entry_offset;
 
         // At the beginning, can't go further back
-        if current_offset == 0 {
+        if orig_offset == 0 {
             self.reset();
             return None;
         }
 
-        while self.get_restart_point(self.current_restart_ix) >= current_offset {
+        while self.get_restart_point(self.current_restart_ix) >= orig_offset {
+            // todo: double check this
+            if self.current_restart_ix == 0 {
+                self.offset = self.restarts_off;
+                self.current_restart_ix = self.number_restarts();
+                break;
+            }
             self.current_restart_ix -= 1;
         }
 
         self.offset = self.get_restart_point(self.current_restart_ix);
-        assert!(self.offset < current_offset);
+        assert!(self.offset < orig_offset);
 
         let mut result;
 
+        // Stop if the next entry would be the original one (self.offset always points to the start
+        // of the next entry)
         loop {
             result = self.next();
 
-            if self.offset >= current_offset {
+            if self.offset >= orig_offset {
                 break;
             }
         }
@@ -206,19 +252,14 @@
         // Do a binary search over the restart points.
         while left < right {
             let middle = (left + right + 1) / 2;
-            self.offset = self.get_restart_point(middle);
-            // advances self.offset
-            let (shared, non_shared, _) = self.parse_entry();
+            self.seek_to_restart_point(middle);
 
-            // At a restart, the shared part is supposed to be 0.
-            assert_eq!(shared, 0);
+            let c = self.opt.cmp.cmp(&self.key, to);
 
-            let cmp = self.cmp.cmp(to, &self.block[self.offset..self.offset + non_shared]);
-
-            if cmp == Ordering::Less {
+            if c == Ordering::Less {
+                left = middle;
+            } else {
                 right = middle - 1;
-            } else {
-                left = middle;
             }
         }
 
@@ -228,7 +269,7 @@
 
         // Linear search from here on
         while let Some((k, _)) = self.next() {
-            if self.cmp.cmp(k.as_slice(), to) >= Ordering::Equal {
+            if self.opt.cmp.cmp(k.as_slice(), to) >= Ordering::Equal {
                 return;
             }
         }
@@ -247,9 +288,8 @@
     }
 }
 
-pub struct BlockBuilder<C: Comparator> {
-    opt: BuildOptions,
-    cmp: C,
+pub struct BlockBuilder {
+    opt: Options,
     buffer: Vec<u8>,
     restarts: Vec<u32>,
 
@@ -257,15 +297,14 @@
     counter: usize,
 }
 
-impl<C: Comparator> BlockBuilder<C> {
-    pub fn new(o: BuildOptions, cmp: C) -> BlockBuilder<C> {
+impl BlockBuilder {
+    pub fn new(o: Options) -> BlockBuilder {
         let mut restarts = vec![0];
         restarts.reserve(1023);
 
         BlockBuilder {
             buffer: Vec::with_capacity(o.block_size),
             opt: o,
-            cmp: cmp,
             restarts: restarts,
             last_key: Vec::new(),
             counter: 0,
@@ -295,7 +334,7 @@
     pub fn add(&mut self, key: &[u8], val: &[u8]) {
         assert!(self.counter <= self.opt.block_restart_interval);
         assert!(self.buffer.is_empty() ||
-                self.cmp.cmp(self.last_key.as_slice(), key) == Ordering::Less);
+                self.opt.cmp.cmp(self.last_key.as_slice(), key) == Ordering::Less);
 
         let mut shared = 0;
 
@@ -333,8 +372,6 @@
         self.last_key.resize(shared, 0);
         self.last_key.extend_from_slice(&key[shared..]);
 
-        // assert_eq!(&self.last_key[..], key);
-
         self.counter += 1;
     }
 
@@ -360,7 +397,7 @@
 #[cfg(test)]
 mod tests {
     use super::*;
-    use iterator::*;
+    use iterator::SSIterator;
     use options::*;
 
     fn get_data() -> Vec<(&'static [u8], &'static [u8])> {
@@ -374,10 +411,10 @@
 
     #[test]
     fn test_block_builder() {
-        let mut o = BuildOptions::default();
+        let mut o = Options::default();
         o.block_restart_interval = 3;
 
-        let mut builder = BlockBuilder::new(o, StandardComparator);
+        let mut builder = BlockBuilder::new(o);
 
         for &(k, v) in get_data().iter() {
             builder.add(k, v);
@@ -387,33 +424,19 @@
 
         let block = builder.finish();
         assert_eq!(block.len(), 149);
-
-    }
-
-    #[test]
-    fn test_block_builder_reset() {
-        let o = BuildOptions::default();
-
-        let mut builder = BlockBuilder::new(o, StandardComparator);
-
-        builder.add("abc".as_bytes(), "def".as_bytes());
-
-        assert!(builder.size_estimate() > 4);
-        builder.reset();
-        assert_eq!(builder.size_estimate(), 4);
     }
 
     #[test]
     fn test_block_empty() {
-        let mut o = BuildOptions::default();
+        let mut o = Options::default();
         o.block_restart_interval = 16;
-        let builder = BlockBuilder::new(o, StandardComparator);
+        let builder = BlockBuilder::new(o);
 
         let blockc = builder.finish();
         assert_eq!(blockc.len(), 8);
         assert_eq!(blockc, vec![0, 0, 0, 0, 1, 0, 0, 0]);
 
-        let block = Block::new(blockc, StandardComparator);
+        let block = Block::new(Options::default(), blockc);
 
         for _ in block.iter() {
             panic!("expected 0 iterations");
@@ -423,48 +446,39 @@
     #[test]
     fn test_block_build_iterate() {
         let data = get_data();
-        let mut builder = BlockBuilder::new(BuildOptions::default(), StandardComparator);
+        let mut builder = BlockBuilder::new(Options::default());
 
         for &(k, v) in data.iter() {
             builder.add(k, v);
         }
 
         let block_contents = builder.finish();
-        let block = Block::new(block_contents, StandardComparator);
-        let block_iter = block.iter();
+        let block = Block::new(Options::default(), block_contents).iter();
         let mut i = 0;
 
-        assert!(!block_iter.valid());
+        assert!(!block.valid());
 
-        for (k, v) in block_iter {
+        for (k, v) in block {
             assert_eq!(&k[..], data[i].0);
             assert_eq!(v, data[i].1);
             i += 1;
         }
         assert_eq!(i, data.len());
-
-        let mut block_iter = block.iter();
-
-        block_iter.reset();
-        assert!(block_iter.next().is_some());
-        assert!(block_iter.valid());
-        block_iter.reset();
-        assert!(!block_iter.valid());
     }
 
     #[test]
     fn test_block_iterate_reverse() {
-        let mut o = BuildOptions::default();
+        let mut o = Options::default();
         o.block_restart_interval = 3;
         let data = get_data();
-        let mut builder = BlockBuilder::new(o, StandardComparator);
+        let mut builder = BlockBuilder::new(o.clone());
 
         for &(k, v) in data.iter() {
             builder.add(k, v);
         }
 
         let block_contents = builder.finish();
-        let mut block = Block::new(block_contents, StandardComparator).iter();
+        let mut block = Block::new(o.clone(), block_contents).iter();
 
         assert!(!block.valid());
         assert_eq!(block.next(),
@@ -478,15 +492,26 @@
                    Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec())));
         block.prev();
         assert!(!block.valid());
+
+        // Verify that prev() from the last entry goes to the prev-to-last entry
+        // (essentially, that next() returning None doesn't advance anything)
+
+        while let Some(_) = block.next() {
+        }
+
+        block.prev();
+        assert!(block.valid());
+        assert_eq!(block.current(),
+                   Some(("prefix_key2".as_bytes().to_vec(), "value".as_bytes().to_vec())));
     }
 
     #[test]
     fn test_block_seek() {
-        let mut o = BuildOptions::default();
+        let mut o = Options::default();
         o.block_restart_interval = 3;
 
         let data = get_data();
-        let mut builder = BlockBuilder::new(o, StandardComparator);
+        let mut builder = BlockBuilder::new(o.clone());
 
         for &(k, v) in data.iter() {
             builder.add(k, v);
@@ -494,7 +519,7 @@
 
         let block_contents = builder.finish();
 
-        let mut block = Block::new(block_contents, StandardComparator).iter();
+        let mut block = Block::new(o.clone(), block_contents).iter();
 
         block.seek(&"prefix_key2".as_bytes());
         assert!(block.valid());
@@ -510,18 +535,28 @@
         assert!(block.valid());
         assert_eq!(block.current(),
                    Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec())));
+
+        block.seek(&"prefix_key3".as_bytes());
+        assert!(block.valid());
+        assert_eq!(block.current(),
+                   Some(("prefix_key3".as_bytes().to_vec(), "value".as_bytes().to_vec())));
+
+        block.seek(&"prefix_key8".as_bytes());
+        assert!(block.valid());
+        assert_eq!(block.current(),
+                   Some(("prefix_key3".as_bytes().to_vec(), "value".as_bytes().to_vec())));
     }
 
     #[test]
     fn test_block_seek_to_last() {
-        let mut o = BuildOptions::default();
+        let mut o = Options::default();
 
         // Test with different number of restarts
         for block_restart_interval in vec![2, 6, 10] {
             o.block_restart_interval = block_restart_interval;
 
             let data = get_data();
-            let mut builder = BlockBuilder::new(o, StandardComparator);
+            let mut builder = BlockBuilder::new(o.clone());
 
             for &(k, v) in data.iter() {
                 builder.add(k, v);
@@ -529,7 +564,7 @@
 
             let block_contents = builder.finish();
 
-            let mut block = Block::new(block_contents, StandardComparator).iter();
+            let mut block = Block::new(o.clone(), block_contents).iter();
 
             block.seek_to_last();
             assert!(block.valid());
--- a/src/lib.rs	Mon Jan 02 12:53:29 2017 +0100
+++ b/src/lib.rs	Tue Jan 03 19:28:14 2017 +0100
@@ -3,6 +3,11 @@
 
 mod block;
 mod blockhandle;
+mod cmp;
+mod filter;
+mod filter_block;
+mod key_types;
+mod types;
 
 pub mod iterator;
 pub mod options;
@@ -11,7 +16,7 @@
 
 pub use iterator::StandardComparator;
 pub use iterator::SSIterator;
-pub use options::{BuildOptions, ReadOptions};
+pub use options::{Options, ReadOptions};
 
 pub use table_builder::TableBuilder;
 pub use table_reader::{Table, TableIterator};
--- a/src/options.rs	Mon Jan 02 12:53:29 2017 +0100
+++ b/src/options.rs	Tue Jan 03 19:28:14 2017 +0100
@@ -1,4 +1,14 @@
+use cmp::{Cmp, DefaultCmp};
+use types::SequenceNumber;
+
 use std::default::Default;
+use std::sync::Arc;
+
+const KB: usize = 1 << 10;
+const MB: usize = KB * KB;
+
+const BLOCK_MAX_SIZE: usize = 4 * KB;
+const WRITE_BUFFER_SIZE: usize = 4 * MB;
 
 #[derive(Clone, Copy, PartialEq, Debug)]
 pub enum CompressionType {
@@ -10,37 +20,66 @@
     match i {
         0 => Some(CompressionType::CompressionNone),
         1 => Some(CompressionType::CompressionSnappy),
-        _ => None
+        _ => None,
     }
 }
 
 /// [not all member types implemented yet]
 ///
-#[derive(Clone, Copy)]
-pub struct BuildOptions {
+#[derive(Clone)]
+pub struct Options {
+    pub cmp: Arc<Box<Cmp>>,
+    pub create_if_missing: bool,
+    pub error_if_exists: bool,
+    pub paranoid_checks: bool,
+    // pub logger: Logger,
+    pub write_buffer_size: usize,
+    pub max_open_files: usize,
     pub block_size: usize,
     pub block_restart_interval: usize,
-    // Note: Compression is not implemented.
     pub compression_type: CompressionType,
+    pub reuse_logs: bool,
 }
 
-impl Default for BuildOptions {
-    fn default() -> BuildOptions {
-        BuildOptions {
-            block_size: 4 * (1 << 10),
+impl Default for Options {
+    fn default() -> Options {
+        Options {
+            cmp: Arc::new(Box::new(DefaultCmp)),
+            create_if_missing: true,
+            error_if_exists: false,
+            paranoid_checks: false,
+            write_buffer_size: WRITE_BUFFER_SIZE,
+            max_open_files: 1 << 10,
+            block_size: BLOCK_MAX_SIZE,
             block_restart_interval: 16,
+            reuse_logs: false,
             compression_type: CompressionType::CompressionNone,
         }
     }
 }
 
-#[derive(Clone, Copy)]
+/// Supplied to DB read operations.
 pub struct ReadOptions {
-    pub skip_bad_blocks: bool,
+    pub verify_checksums: bool,
+    pub snapshot: Option<SequenceNumber>,
 }
 
 impl Default for ReadOptions {
-    fn default() -> ReadOptions {
-        ReadOptions { skip_bad_blocks: true }
+    fn default() -> Self {
+        ReadOptions {
+            verify_checksums: false,
+            snapshot: None,
+        }
     }
 }
+
+/// Supplied to write operations
+pub struct WriteOptions {
+    pub sync: bool,
+}
+
+impl Default for WriteOptions {
+    fn default() -> Self {
+        WriteOptions { sync: false }
+    }
+}
--- a/src/table_builder.rs	Mon Jan 02 12:53:29 2017 +0100
+++ b/src/table_builder.rs	Tue Jan 03 19:28:14 2017 +0100
@@ -1,12 +1,14 @@
 use block::{BlockBuilder, BlockContents};
 use blockhandle::BlockHandle;
-use options::{CompressionType, BuildOptions};
-use iterator::{Comparator, StandardComparator};
+use cmp::InternalKeyCmp;
+use filter::{BoxedFilterPolicy, NoFilterPolicy};
+use filter_block::FilterBlockBuilder;
+use key_types::InternalKey;
+use options::{CompressionType, Options};
 
-use std::io::{Result, Write};
-use std::fs::{File, OpenOptions};
-use std::path::Path;
+use std::io::Write;
 use std::cmp::Ordering;
+use std::sync::Arc;
 
 use crc::crc32;
 use crc::Hasher32;
@@ -20,59 +22,44 @@
 pub const TABLE_BLOCK_COMPRESS_LEN: usize = 1;
 pub const TABLE_BLOCK_CKSUM_LEN: usize = 4;
 
-fn find_shortest_sep<C: Comparator>(c: &C, lo: &[u8], hi: &[u8]) -> Vec<u8> {
-    let min;
-
-    if lo.len() < hi.len() {
-        min = lo.len();
-    } else {
-        min = hi.len();
-    }
-
-    let mut diff_at = 0;
-
-    while diff_at < min && lo[diff_at] == hi[diff_at] {
-        diff_at += 1;
-    }
-
-    if diff_at == min {
-        return Vec::from(lo);
-    } else {
-        if lo[diff_at] < 0xff && lo[diff_at] + 1 < hi[diff_at] {
-            let mut result = Vec::from(&lo[0..diff_at + 1]);
-            result[diff_at] += 1;
-            assert_eq!(c.cmp(&result, hi), Ordering::Less);
-            return result;
-        }
-        return Vec::from(lo);
-    }
-}
-
 /// Footer is a helper for encoding/decoding a table footer.
 #[derive(Debug)]
 pub struct Footer {
+    pub meta_index: BlockHandle,
     pub index: BlockHandle,
 }
 
+/// A Table footer contains a pointer to the metaindex block, another pointer to the index block,
+/// and a magic number:
+/// [ { table data ... , METAINDEX blockhandle, INDEX blockhandle, PADDING bytes } = 40 bytes,
+/// MAGIC_FOOTER_ENCODED ]
 impl Footer {
-    pub fn new(index: BlockHandle) -> Footer {
-        Footer { index: index }
+    pub fn new(metaix: BlockHandle, index: BlockHandle) -> Footer {
+        Footer {
+            meta_index: metaix,
+            index: index,
+        }
     }
 
     pub fn decode(from: &[u8]) -> Footer {
         assert!(from.len() >= FULL_FOOTER_LENGTH);
         assert_eq!(&from[FOOTER_LENGTH..], &MAGIC_FOOTER_ENCODED);
-        let (ix, _) = BlockHandle::decode(&from[0..]);
+        let (meta, metalen) = BlockHandle::decode(&from[0..]);
+        let (ix, _) = BlockHandle::decode(&from[metalen..]);
 
-        Footer { index: ix }
+        Footer {
+            meta_index: meta,
+            index: ix,
+        }
     }
 
     pub fn encode(&self, to: &mut [u8]) {
         assert!(to.len() >= FOOTER_LENGTH + 8);
 
-        let s1 = self.index.encode_to(&mut to[0..]);
+        let s1 = self.meta_index.encode_to(to);
+        let s2 = self.index.encode_to(&mut to[s1..]);
 
-        for i in s1..FOOTER_LENGTH {
+        for i in s1 + s2..FOOTER_LENGTH {
             to[i] = 0;
         }
         for i in FOOTER_LENGTH..FULL_FOOTER_LENGTH {
@@ -81,91 +68,94 @@
     }
 }
 
-/// A table consists of DATA BLOCKs, an INDEX BLOCK and a FOOTER.
+/// A table consists of DATA BLOCKs, META BLOCKs, a METAINDEX BLOCK, an INDEX BLOCK and a FOOTER.
 ///
-/// DATA BLOCKs, INDEX BLOCKs, and BLOCKs are built using the code in the `block` module.
+/// DATA BLOCKs, META BLOCKs, INDEX BLOCK and METAINDEX BLOCK are built using the code in
+/// the `block` module.
 ///
-/// DATA BLOCKs contain the actual data, and a footer of [4B crc32; 1B compression];
-/// INDEX BLOCKS contain one entry per block, where the key is a string after the
-/// last key of a block, and the value is a encoded BlockHandle pointing to that
-/// block.
-///
-/// The footer is a pointer pointing to the index block, padding to fill up to 40 B and at the end
-/// the 8B magic number 0xdb4775248b80fb57.
-///
-pub struct TableBuilder<C: Comparator, Dst: Write> {
-    o: BuildOptions,
-    cmp: C,
+/// The FOOTER consists of a BlockHandle wthat points to the metaindex block, another pointing to
+/// the index block, padding to fill up to 40 B and at the end the 8B magic number
+/// 0xdb4775248b80fb57.
+
+pub struct TableBuilder<'a, Dst: Write> {
+    opt: Options,
     dst: Dst,
 
     offset: usize,
     num_entries: usize,
     prev_block_last_key: Vec<u8>,
 
-    data_block: Option<BlockBuilder<C>>,
-    index_block: Option<BlockBuilder<C>>,
+    data_block: Option<BlockBuilder>,
+    index_block: Option<BlockBuilder>,
+    filter_block: Option<FilterBlockBuilder<'a>>,
 }
 
-impl<Dst: Write> TableBuilder<StandardComparator, Dst> {
-    /// Create a new TableBuilder with default comparator and BuildOptions.
-    pub fn new_defaults(dst: Dst) -> TableBuilder<StandardComparator, Dst> {
-        TableBuilder::new(dst, BuildOptions::default(), StandardComparator)
+impl<'a, Dst: Write> TableBuilder<'a, Dst> {
+    pub fn new_no_filter(opt: Options, dst: Dst) -> TableBuilder<'a, Dst> {
+        TableBuilder::new(opt, dst, NoFilterPolicy::new())
     }
 }
 
-impl TableBuilder<StandardComparator, File> {
-    /// Open/create a file for writing a table.
-    /// This will truncate the file, if it exists.
-    pub fn new_to_file(file: &Path) -> Result<TableBuilder<StandardComparator, File>> {
-        let f = try!(OpenOptions::new().create(true).write(true).truncate(true).open(file));
-        Ok(TableBuilder::new(f, BuildOptions::default(), StandardComparator))
+/// TableBuilder is used for building a new SSTable. It groups entries into blocks,
+/// calculating checksums and bloom filters.
+/// It's recommended that you use InternalFilterPolicy as FilterPol, as that policy extracts the
+/// underlying user keys from the InternalKeys used as keys in the table.
+impl<'a, Dst: Write> TableBuilder<'a, Dst> {
+    /// Create a new table builder.
+    /// The comparator in opt will be wrapped in a InternalKeyCmp.
+    pub fn new(mut opt: Options, dst: Dst, fpol: BoxedFilterPolicy) -> TableBuilder<'a, Dst> {
+        opt.cmp = Arc::new(Box::new(InternalKeyCmp(opt.cmp.clone())));
+        TableBuilder::new_raw(opt, dst, fpol)
     }
-}
 
-impl<C: Comparator, Dst: Write> TableBuilder<C, Dst> {
-    /// Create a new TableBuilder.
-    pub fn new(dst: Dst, opt: BuildOptions, cmp: C) -> TableBuilder<C, Dst> {
+    /// Like new(), but doesn't wrap the comparator in an InternalKeyCmp (for testing)
+    pub fn new_raw(opt: Options, dst: Dst, fpol: BoxedFilterPolicy) -> TableBuilder<'a, Dst> {
         TableBuilder {
-            o: opt,
-            cmp: cmp,
+            opt: opt.clone(),
             dst: dst,
             offset: 0,
             prev_block_last_key: vec![],
             num_entries: 0,
-            data_block: Some(BlockBuilder::new(opt, cmp)),
-            index_block: Some(BlockBuilder::new(opt, cmp)),
+            data_block: Some(BlockBuilder::new(opt.clone())),
+            index_block: Some(BlockBuilder::new(opt)),
+            filter_block: Some(FilterBlockBuilder::new(fpol)),
         }
     }
 
-    /// Returns how many entries have been written.
     pub fn entries(&self) -> usize {
         self.num_entries
     }
 
-    /// Add an entry to this table. The key must be lexicographically greater than the last entry
-    /// written.
-    pub fn add(&mut self, key: &[u8], val: &[u8]) {
+    /// Add a key to the table. The key as to be lexically greater or equal to the last one added.
+    pub fn add(&mut self, key: InternalKey<'a>, val: &[u8]) {
         assert!(self.data_block.is_some());
-        assert!(self.num_entries == 0 ||
-                self.cmp.cmp(&self.prev_block_last_key, key) == Ordering::Less);
 
-        if self.data_block.as_ref().unwrap().size_estimate() > self.o.block_size {
+        if !self.prev_block_last_key.is_empty() {
+            assert!(self.opt.cmp.cmp(&self.prev_block_last_key, key) == Ordering::Less);
+        }
+
+        if self.data_block.as_ref().unwrap().size_estimate() > self.opt.block_size {
             self.write_data_block(key);
         }
 
         let dblock = &mut self.data_block.as_mut().unwrap();
 
+        if let Some(ref mut fblock) = self.filter_block {
+            fblock.add_key(key);
+        }
+
         self.num_entries += 1;
         dblock.add(key, val);
     }
 
     /// Writes an index entry for the current data_block where `next_key` is the first key of the
-    /// next block. Then, write the actual block to disk.
-    fn write_data_block(&mut self, next_key: &[u8]) {
+    /// next block.
+    /// Calls write_block() for writing the block to disk.
+    fn write_data_block<'b>(&mut self, next_key: InternalKey<'b>) {
         assert!(self.data_block.is_some());
 
         let block = self.data_block.take().unwrap();
-        let sep = find_shortest_sep(&self.cmp, block.last_key(), next_key);
+        let sep = self.opt.cmp.find_shortest_sep(&block.last_key(), next_key);
         self.prev_block_last_key = Vec::from(block.last_key());
         let contents = block.finish();
 
@@ -174,54 +164,79 @@
         let enc_len = handle.encode_to(&mut handle_enc);
 
         self.index_block.as_mut().unwrap().add(&sep, &handle_enc[0..enc_len]);
-        self.data_block = Some(BlockBuilder::new(self.o, self.cmp));
+        self.data_block = Some(BlockBuilder::new(self.opt.clone()));
+
+        let ctype = self.opt.compression_type;
 
-        let ctype = self.o.compression_type;
         self.write_block(contents, ctype);
+
+        if let Some(ref mut fblock) = self.filter_block {
+            fblock.start_block(self.offset);
+        }
     }
 
-    /// Writes a block to disk, with a trailing 4 byte CRC checksum.
-    fn write_block(&mut self, c: BlockContents, t: CompressionType) -> BlockHandle {
+    /// Calculates the checksum, writes the block to disk and updates the offset.
+    fn write_block(&mut self, block: BlockContents, t: CompressionType) -> BlockHandle {
         // compression is still unimplemented
         assert_eq!(t, CompressionType::CompressionNone);
 
-        let mut buf = [0 as u8; 4];
+        let mut buf = [0 as u8; TABLE_BLOCK_CKSUM_LEN];
         let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
 
-        digest.write(&c);
-        digest.write(&[self.o.compression_type as u8; 1]);
+        digest.write(&block);
+        digest.write(&[self.opt.compression_type as u8; TABLE_BLOCK_COMPRESS_LEN]);
         digest.sum32().encode_fixed(&mut buf);
 
         // TODO: Handle errors here.
-        let _ = self.dst.write(&c);
-        let _ = self.dst.write(&[t as u8; 1]);
+        let _ = self.dst.write(&block);
+        let _ = self.dst.write(&[t as u8; TABLE_BLOCK_COMPRESS_LEN]);
         let _ = self.dst.write(&buf);
 
-        let handle = BlockHandle::new(self.offset, c.len());
+        let handle = BlockHandle::new(self.offset, block.len());
 
-        self.offset += c.len() + 1 + buf.len();
+        self.offset += block.len() + TABLE_BLOCK_COMPRESS_LEN + TABLE_BLOCK_CKSUM_LEN;
 
         handle
     }
 
-    /// Finish building this table. This *must* be called at the end, otherwise not all data may
-    /// land on disk.
     pub fn finish(mut self) {
         assert!(self.data_block.is_some());
-        let ctype = self.o.compression_type;
+        let ctype = self.opt.compression_type;
+
+        // If there's a pending data block, write it
+        if self.data_block.as_ref().unwrap().entries() > 0 {
+            // Find a key reliably past the last key
+            let key_past_last =
+                self.opt.cmp.find_short_succ(self.data_block.as_ref().unwrap().last_key());
+            self.write_data_block(&key_past_last);
+        }
+
+        // Create metaindex block
+        let mut meta_ix_block = BlockBuilder::new(self.opt.clone());
 
-        // If there's a pending data block, write that one
-        let flush_last_block = self.data_block.as_ref().unwrap().entries() > 0;
-        if flush_last_block {
-            self.write_data_block(&[0xff as u8; 1]);
+        if self.filter_block.is_some() {
+            // if there's a filter block, write the filter block and add it to the metaindex block.
+            let fblock = self.filter_block.take().unwrap();
+            let filter_key = format!("filter.{}", fblock.filter_name());
+            let fblock_data = fblock.finish();
+            let fblock_handle = self.write_block(fblock_data, CompressionType::CompressionNone);
+
+            let mut handle_enc = [0 as u8; 16];
+            let enc_len = fblock_handle.encode_to(&mut handle_enc);
+
+            meta_ix_block.add(filter_key.as_bytes(), &handle_enc[0..enc_len]);
         }
 
+        // write metaindex block
+        let meta_ix = meta_ix_block.finish();
+        let meta_ix_handle = self.write_block(meta_ix, ctype);
+
         // write index block
         let index_cont = self.index_block.take().unwrap().finish();
         let ix_handle = self.write_block(index_cont, ctype);
 
         // write footer.
-        let footer = Footer::new(ix_handle);
+        let footer = Footer::new(meta_ix_handle, ix_handle);
         let mut buf = [0; FULL_FOOTER_LENGTH];
         footer.encode(&mut buf);
 
@@ -231,36 +246,20 @@
 
 #[cfg(test)]
 mod tests {
-    use super::{find_shortest_sep, Footer, TableBuilder};
-    use iterator::StandardComparator;
+    use super::{Footer, TableBuilder};
     use blockhandle::BlockHandle;
-    use options::BuildOptions;
-
-    #[test]
-    fn test_shortest_sep() {
-        assert_eq!(find_shortest_sep(&StandardComparator, "abcd".as_bytes(), "abcf".as_bytes()),
-                   "abce".as_bytes());
-        assert_eq!(find_shortest_sep(&StandardComparator,
-                                     "abcdefghi".as_bytes(),
-                                     "abcffghi".as_bytes()),
-                   "abce".as_bytes());
-        assert_eq!(find_shortest_sep(&StandardComparator, "a".as_bytes(), "a".as_bytes()),
-                   "a".as_bytes());
-        assert_eq!(find_shortest_sep(&StandardComparator, "a".as_bytes(), "b".as_bytes()),
-                   "a".as_bytes());
-        assert_eq!(find_shortest_sep(&StandardComparator, "abc".as_bytes(), "zzz".as_bytes()),
-                   "b".as_bytes());
-        assert_eq!(find_shortest_sep(&StandardComparator, "".as_bytes(), "".as_bytes()),
-                   "".as_bytes());
-    }
+    use filter::BloomPolicy;
+    use options::Options;
 
     #[test]
     fn test_footer() {
-        let f = Footer::new(BlockHandle::new(55, 5));
+        let f = Footer::new(BlockHandle::new(44, 4), BlockHandle::new(55, 5));
         let mut buf = [0; 48];
         f.encode(&mut buf[..]);
 
         let f2 = Footer::decode(&buf);
+        assert_eq!(f2.meta_index.offset(), 44);
+        assert_eq!(f2.meta_index.size(), 4);
         assert_eq!(f2.index.offset(), 55);
         assert_eq!(f2.index.size(), 5);
 
@@ -269,9 +268,9 @@
     #[test]
     fn test_table_builder() {
         let mut d = Vec::with_capacity(512);
-        let mut opt = BuildOptions::default();
+        let mut opt = Options::default();
         opt.block_restart_interval = 3;
-        let mut b = TableBuilder::new(&mut d, opt, StandardComparator);
+        let mut b = TableBuilder::new_raw(opt, &mut d, BloomPolicy::new(4));
 
         let data = vec![("abc", "def"), ("abd", "dee"), ("bcd", "asa"), ("bsr", "a00")];
 
@@ -279,6 +278,7 @@
             b.add(k.as_bytes(), v.as_bytes());
         }
 
+        assert!(b.filter_block.is_some());
         b.finish();
     }
 
@@ -286,9 +286,9 @@
     #[should_panic]
     fn test_bad_input() {
         let mut d = Vec::with_capacity(512);
-        let mut opt = BuildOptions::default();
+        let mut opt = Options::default();
         opt.block_restart_interval = 3;
-        let mut b = TableBuilder::new(&mut d, opt, StandardComparator);
+        let mut b = TableBuilder::new_raw(opt, &mut d, BloomPolicy::new(4));
 
         // Test two equal consecutive keys
         let data = vec![("abc", "def"), ("abc", "dee"), ("bcd", "asa"), ("bsr", "a00")];
--- a/src/table_reader.rs	Mon Jan 02 12:53:29 2017 +0100
+++ b/src/table_reader.rs	Tue Jan 03 19:28:14 2017 +0100
@@ -1,17 +1,19 @@
 use block::{Block, BlockIter};
 use blockhandle::BlockHandle;
+use filter::{BoxedFilterPolicy, InternalFilterPolicy};
+use filter_block::FilterBlockReader;
+use key_types::InternalKey;
+use cmp::InternalKeyCmp;
+use options::{self, CompressionType, Options};
 use table_builder::{self, Footer};
-use iterator::{Comparator, StandardComparator, SSIterator};
-use options::{self, ReadOptions, CompressionType};
-
-use integer_encoding::FixedInt;
-use crc::crc32;
-use crc::Hasher32;
+use iterator::SSIterator;
 
 use std::cmp::Ordering;
 use std::io::{self, Read, Seek, SeekFrom, Result};
-use std::fs::{File, OpenOptions};
-use std::path::Path;
+use std::sync::Arc;
+
+use integer_encoding::FixedInt;
+use crc::crc32::{self, Hasher32};
 
 /// Reads the table footer.
 fn read_footer<R: Read + Seek>(f: &mut R, size: usize) -> Result<Footer> {
@@ -32,36 +34,36 @@
     Ok(buf)
 }
 
-/// Reads a block at location.
-fn read_block<R: Read + Seek, C: Comparator>(cmp: &C,
-                                             f: &mut R,
-                                             location: &BlockHandle)
-                                             -> Result<TableBlock<C>> {
-    // The block is denoted by offset and length in BlockHandle. A block in an encoded
-    // table is followed by 1B compression type and 4B checksum.
-    let buf = try!(read_bytes(f, location));
-    let compress = try!(read_bytes(f,
-                                   &BlockHandle::new(location.offset() + location.size(),
-                                                     table_builder::TABLE_BLOCK_COMPRESS_LEN)));
-    let cksum = try!(read_bytes(f,
-                                &BlockHandle::new(location.offset() + location.size() +
-                                                  table_builder::TABLE_BLOCK_COMPRESS_LEN,
-                                                  table_builder::TABLE_BLOCK_CKSUM_LEN)));
-    Ok(TableBlock {
-        block: Block::new(buf, *cmp),
-        checksum: u32::decode_fixed(&cksum),
-        compression: options::int_to_compressiontype(compress[0] as u32)
-            .unwrap_or(CompressionType::CompressionNone),
-    })
-}
-
-struct TableBlock<C: Comparator> {
-    block: Block<C>,
+struct TableBlock {
+    block: Block,
     checksum: u32,
     compression: CompressionType,
 }
 
-impl<C: Comparator> TableBlock<C> {
+impl TableBlock {
+    /// Reads a block at location.
+    fn read_block<R: Read + Seek>(opt: Options,
+                                  f: &mut R,
+                                  location: &BlockHandle)
+                                  -> Result<TableBlock> {
+        // The block is denoted by offset and length in BlockHandle. A block in an encoded
+        // table is followed by 1B compression type and 4B checksum.
+        let buf = try!(read_bytes(f, location));
+        let compress = try!(read_bytes(f,
+                                       &BlockHandle::new(location.offset() + location.size(),
+                                                         table_builder::TABLE_BLOCK_COMPRESS_LEN)));
+        let cksum = try!(read_bytes(f,
+                                    &BlockHandle::new(location.offset() + location.size() +
+                                                      table_builder::TABLE_BLOCK_COMPRESS_LEN,
+                                                      table_builder::TABLE_BLOCK_CKSUM_LEN)));
+        Ok(TableBlock {
+            block: Block::new(opt, buf),
+            checksum: u32::decode_fixed(&cksum),
+            compression: options::int_to_compressiontype(compress[0] as u32)
+                .unwrap_or(CompressionType::CompressionNone),
+        })
+    }
+
     /// Verify checksum of block
     fn verify(&self) -> bool {
         let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
@@ -72,54 +74,72 @@
     }
 }
 
-pub struct Table<R: Read + Seek, C: Comparator> {
+pub struct Table<R: Read + Seek> {
     file: R,
-    file_size: usize,
 
-    opt: ReadOptions,
-    cmp: C,
+    opt: Options,
 
-    indexblock: Block<C>,
-}
-
-impl<R: Read + Seek> Table<R, StandardComparator> {
-    /// Open a table for reading.
-    pub fn new_defaults(file: R, size: usize) -> Result<Table<R, StandardComparator>> {
-        Table::new(file, size, ReadOptions::default(), StandardComparator)
-    }
+    footer: Footer,
+    indexblock: Block,
+    #[allow(dead_code)]
+    filters: Option<FilterBlockReader>,
 }
 
-impl Table<File, StandardComparator> {
-    /// Directly open a file for reading.
-    pub fn new_from_file(file: &Path) -> Result<Table<File, StandardComparator>> {
-        let f = try!(OpenOptions::new().read(true).open(file));
-        let len = try!(f.metadata()).len() as usize;
-
-        Table::new(f, len, ReadOptions::default(), StandardComparator)
-    }
-}
-
-impl<R: Read + Seek, C: Comparator> Table<R, C> {
-    /// Open a table for reading. Note: The comparator must be the same that was chosen when
-    /// building the table.
-    pub fn new(mut file: R, size: usize, opt: ReadOptions, cmp: C) -> Result<Table<R, C>> {
+impl<R: Read + Seek> Table<R> {
+    /// Creates a new table reader operating on unformatted keys (i.e., UserKey).
+    fn new_raw(opt: Options, mut file: R, size: usize, fp: BoxedFilterPolicy) -> Result<Table<R>> {
         let footer = try!(read_footer(&mut file, size));
 
-        let indexblock = try!(read_block(&cmp, &mut file, &footer.index));
+        let indexblock = try!(TableBlock::read_block(opt.clone(), &mut file, &footer.index));
+        let metaindexblock =
+            try!(TableBlock::read_block(opt.clone(), &mut file, &footer.meta_index));
+
+        if !indexblock.verify() || !metaindexblock.verify() {
+            return Err(io::Error::new(io::ErrorKind::InvalidData,
+                                      "Indexblock/Metaindexblock failed verification"));
+        }
+
+        // Open filter block for reading
+        let mut filter_block_reader = None;
+        let filter_name = format!("filter.{}", fp.name()).as_bytes().to_vec();
+
+        let mut metaindexiter = metaindexblock.block.iter();
+
+        metaindexiter.seek(&filter_name);
+
+        if let Some((_key, val)) = metaindexiter.current() {
+            let filter_block_location = BlockHandle::decode(&val).0;
+
+            if filter_block_location.size() > 0 {
+                let buf = try!(read_bytes(&mut file, &filter_block_location));
+                filter_block_reader = Some(FilterBlockReader::new_owned(fp, buf));
+            }
+        }
+
+        metaindexiter.reset();
 
         Ok(Table {
             file: file,
-            file_size: size,
             opt: opt,
-            cmp: cmp,
+            footer: footer,
+            filters: filter_block_reader,
             indexblock: indexblock.block,
         })
     }
 
-    fn read_block(&mut self, location: &BlockHandle) -> Result<TableBlock<C>> {
-        let b = try!(read_block(&self.cmp, &mut self.file, location));
+    /// Creates a new table reader operating on internal keys (i.e., InternalKey). This means that
+    /// a different comparator (internal_key_cmp) and a different filter policy
+    /// (InternalFilterPolicy) are used.
+    pub fn new(mut opt: Options, file: R, size: usize, fp: BoxedFilterPolicy) -> Result<Table<R>> {
+        opt.cmp = Arc::new(Box::new(InternalKeyCmp(opt.cmp.clone())));
+        let t = try!(Table::new_raw(opt, file, size, InternalFilterPolicy::new(fp)));
+        Ok(t)
+    }
 
-        if !b.verify() && self.opt.skip_bad_blocks {
+    fn read_block(&mut self, location: &BlockHandle) -> Result<TableBlock> {
+        let b = try!(TableBlock::read_block(self.opt.clone(), &mut self.file, location));
+
+        if !b.verify() {
             Err(io::Error::new(io::ErrorKind::InvalidData, "Data block failed verification"))
         } else {
             Ok(b)
@@ -137,128 +157,138 @@
             return location.offset();
         }
 
-        return self.file_size;
+        return self.footer.meta_index.offset();
     }
 
     // Iterators read from the file; thus only one iterator can be borrowed (mutably) per scope
-    pub fn iter<'a>(&'a mut self) -> TableIterator<'a, R, C> {
+    fn iter<'a>(&'a mut self) -> TableIterator<'a, R> {
         let iter = TableIterator {
-            current_block: self.indexblock.iter(), // just for filling in here
+            current_block: self.indexblock.iter(),
+            init: false,
+            current_block_off: 0,
             index_block: self.indexblock.iter(),
+            opt: self.opt.clone(),
             table: self,
-            init: false,
         };
         iter
     }
 
-    /// Retrieve an entry from the table.
-    ///
-    /// Note: As this doesn't keep state, using a TableIterator and seek() may be more efficient
-    /// when retrieving several entries from the same underlying block.
-    pub fn get(&mut self, key: &[u8]) -> Option<Vec<u8>> {
+    /// Retrieve value from table. This function uses the attached filters, so is better suited if
+    /// you frequently look for non-existing values (as it will detect the non-existence of an
+    /// entry in a block without having to load the block).
+    pub fn get<'a>(&mut self, to: InternalKey<'a>) -> Option<Vec<u8>> {
         let mut iter = self.iter();
 
-        iter.seek(key);
+        iter.seek(to);
 
         if let Some((k, v)) = iter.current() {
-            if k == key {
-                return Some(v);
-            }
+            if k == to { Some(v) } else { None }
+        } else {
+            None
         }
-        return None;
+
+        // TODO: replace this with a more efficient method using filters
     }
 }
 
-/// Iterator over a Table.
-pub struct TableIterator<'a, R: 'a + Read + Seek, C: 'a + Comparator> {
-    table: &'a mut Table<R, C>,
-    current_block: BlockIter<C>,
-    index_block: BlockIter<C>,
-
+/// This iterator is a "TwoLevelIterator"; it uses an index block in order to get an offset hint
+/// into the data blocks.
+pub struct TableIterator<'a, R: 'a + Read + Seek> {
+    table: &'a mut Table<R>,
+    opt: Options,
+    // We're not using Option<BlockIter>, but instead a separate `init` field. That makes it easier
+    // working with the current block in the iterator methods (no borrowing annoyance as with
+    // Option<>)
+    current_block: BlockIter,
+    current_block_off: usize,
     init: bool,
+    index_block: BlockIter,
 }
 
-impl<'a, C: Comparator, R: Read + Seek> TableIterator<'a, R, C> {
-    /// Skips to the entry referenced by the next entry in the index block.
-    /// This is called once a block has run out of entries.
-    /// Returns Ok(false) if the end has been reached, returns Err(...) if it should be retried
-    /// (e.g., because there's a corrupted block)
+impl<'a, R: Read + Seek> TableIterator<'a, R> {
+    // Skips to the entry referenced by the next entry in the index block.
+    // This is called once a block has run out of entries.
+    // Err means corruption or I/O error; Ok(true) means a new block was loaded; Ok(false) means
+    // tht there's no more entries.
     fn skip_to_next_entry(&mut self) -> Result<bool> {
         if let Some((_key, val)) = self.index_block.next() {
-            let r = self.load_block(&val);
-
-            if let Err(e) = r { Err(e) } else { Ok(true) }
+            self.load_block(&val).map(|_| true)
         } else {
             Ok(false)
         }
     }
 
-    /// Load the block at `handle` into `self.current_block`
+    // Load the block at `handle` into `self.current_block`
     fn load_block(&mut self, handle: &[u8]) -> Result<()> {
         let (new_block_handle, _) = BlockHandle::decode(handle);
 
         let block = try!(self.table.read_block(&new_block_handle));
+
         self.current_block = block.block.iter();
+        self.current_block_off = new_block_handle.offset();
 
         Ok(())
     }
 }
 
-impl<'a, C: Comparator, R: Read + Seek> Iterator for TableIterator<'a, R, C> {
+impl<'a, R: Read + Seek> Iterator for TableIterator<'a, R> {
     type Item = (Vec<u8>, Vec<u8>);
 
     fn next(&mut self) -> Option<Self::Item> {
-        if !self.init {
-            return match self.skip_to_next_entry() {
+        // init essentially means that `current_block` is a data block (it's initially filled with
+        // an index block as filler).
+        if self.init {
+            if let Some((key, val)) = self.current_block.next() {
+                Some((key, val))
+            } else {
+                match self.skip_to_next_entry() {
+                    Ok(true) => self.next(),
+                    Ok(false) => None,
+                    // try next block, this might be corruption
+                    Err(_) => self.next(),
+                }
+            }
+        } else {
+            match self.skip_to_next_entry() {
                 Ok(true) => {
+                    // Only initialize if we got an entry
                     self.init = true;
                     self.next()
                 }
                 Ok(false) => None,
-                Err(_) => self.next(),
-            };
-        }
-        if let Some((key, val)) = self.current_block.next() {
-            Some((key, val))
-        } else {
-            match self.skip_to_next_entry() {
-                Ok(true) => self.next(),
-                Ok(false) => None,
+                // try next block from index, this might be corruption
                 Err(_) => self.next(),
             }
         }
     }
 }
 
-impl<'a, C: Comparator, R: Read + Seek> SSIterator for TableIterator<'a, R, C> {
+impl<'a, R: Read + Seek> SSIterator for TableIterator<'a, R> {
     // A call to valid() after seeking is necessary to ensure that the seek worked (e.g., no error
     // while reading from disk)
     fn seek(&mut self, to: &[u8]) {
-        // first seek in index block, then set current_block and seek there
+        // first seek in index block, rewind by one entry (so we get the next smaller index entry),
+        // then set current_block and seek there
 
         self.index_block.seek(to);
 
-        if let Some((k, _)) = self.index_block.current() {
-            if self.table.cmp.cmp(to, &k) <= Ordering::Equal {
-                // ok, found right block: continue below
+        if let Some((past_block, handle)) = self.index_block.current() {
+            if self.opt.cmp.cmp(to, &past_block) <= Ordering::Equal {
+                // ok, found right block: continue
+                if let Ok(()) = self.load_block(&handle) {
+                    self.current_block.seek(to);
+                    self.init = true;
+                } else {
+                    self.reset();
+                    return;
+                }
             } else {
                 self.reset();
+                return;
             }
         } else {
             panic!("Unexpected None from current() (bug)");
         }
-
-        // Read block and seek to entry in that block
-        if let Some((k, handle)) = self.index_block.current() {
-            assert!(self.table.cmp.cmp(to, &k) <= Ordering::Equal);
-
-            if let Ok(()) = self.load_block(&handle) {
-                self.current_block.seek(to);
-                self.init = true;
-            } else {
-                self.reset();
-            }
-        }
     }
 
     fn prev(&mut self) -> Option<Self::Item> {
@@ -284,9 +314,6 @@
     fn reset(&mut self) {
         self.index_block.reset();
         self.init = false;
-
-        while let Err(_) = self.skip_to_next_entry() {
-        }
     }
 
     // This iterator is special in that it's valid even before the first call to next(). It behaves
@@ -296,39 +323,50 @@
     }
 
     fn current(&self) -> Option<Self::Item> {
-        self.current_block.current()
+        if self.init {
+            self.current_block.current()
+        } else {
+            None
+        }
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use options::{BuildOptions, ReadOptions};
+    use filter::BloomPolicy;
+    use filter::InternalFilterPolicy;
+    use options::Options;
     use table_builder::TableBuilder;
-    use iterator::{StandardComparator, SSIterator};
+    use iterator::SSIterator;
+    use key_types::LookupKey;
 
     use std::io::Cursor;
 
     use super::*;
 
     fn build_data() -> Vec<(&'static str, &'static str)> {
-        vec![("abc", "def"),
+        vec![// block 1
+             ("abc", "def"),
              ("abd", "dee"),
              ("bcd", "asa"),
+             // block 2
              ("bsr", "a00"),
              ("xyz", "xxx"),
              ("xzz", "yyy"),
+             // block 3
              ("zzz", "111")]
     }
 
-
+    // Build a table containing raw keys (no format)
     fn build_table() -> (Vec<u8>, usize) {
         let mut d = Vec::with_capacity(512);
-        let mut opt = BuildOptions::default();
+        let mut opt = Options::default();
         opt.block_restart_interval = 2;
         opt.block_size = 32;
 
         {
-            let mut b = TableBuilder::new(&mut d, opt, StandardComparator);
+            // Uses the standard comparator in opt.
+            let mut b = TableBuilder::new_raw(opt, &mut d, BloomPolicy::new(4));
             let data = build_data();
 
             for &(k, v) in data.iter() {
@@ -336,6 +374,41 @@
             }
 
             b.finish();
+
+        }
+
+        let size = d.len();
+
+        (d, size)
+    }
+
+    // Build a table containing keys in InternalKey format.
+    fn build_internal_table() -> (Vec<u8>, usize) {
+        let mut d = Vec::with_capacity(512);
+        let mut opt = Options::default();
+        opt.block_restart_interval = 1;
+        opt.block_size = 32;
+
+        let mut i = 0 as u64;
+        let data: Vec<(Vec<u8>, &'static str)> = build_data()
+            .into_iter()
+            .map(|(k, v)| {
+                i += 1;
+                (LookupKey::new(k.as_bytes(), i).internal_key().to_vec(), v)
+            })
+            .collect();
+
+        {
+            // Uses InternalKeyCmp
+            let mut b =
+                TableBuilder::new(opt, &mut d, InternalFilterPolicy::new(BloomPolicy::new(4)));
+
+            for &(ref k, ref v) in data.iter() {
+                b.add(k.as_slice(), v.as_bytes());
+            }
+
+            b.finish();
+
         }
 
         let size = d.len();
@@ -344,100 +417,108 @@
     }
 
     #[test]
-    fn test_table_iterator_fwd() {
-        let (src, size) = build_table();
-        let data = build_data();
+    fn test_table_reader_checksum() {
+        let (mut src, size) = build_table();
+        println!("{}", size);
+
+        src[10] += 1;
+
+        let mut table = Table::new_raw(Options::default(),
+                                       Cursor::new(&src as &[u8]),
+                                       size,
+                                       BloomPolicy::new(4))
+            .unwrap();
+
+        assert!(table.filters.is_some());
+        assert_eq!(table.filters.as_ref().unwrap().num(), 1);
 
-        let mut table = Table::new(Cursor::new(&src as &[u8]),
-                                   size,
-                                   ReadOptions::default(),
-                                   StandardComparator)
-            .unwrap();
-        let iter = table.iter();
-        let mut i = 0;
+        {
+            let iter = table.iter();
+            // first block is skipped
+            assert_eq!(iter.count(), 4);
+        }
 
-        for (k, v) in iter {
-            assert_eq!((data[i].0.as_bytes(), data[i].1.as_bytes()),
-                       (k.as_ref(), v.as_ref()));
-            i += 1;
+        {
+            let iter = table.iter();
+
+            for (k, _) in iter {
+                if k == build_data()[5].0.as_bytes() {
+                    return;
+                }
+            }
+
+            panic!("Should have hit 5th record in table!");
         }
     }
 
     #[test]
-    fn test_table_data_corruption() {
-        let (mut src, size) = build_table();
+    fn test_table_iterator_fwd_bwd() {
+        let (src, size) = build_table();
+        let data = build_data();
 
-        // Mess with first block
-        src[28] += 1;
-
-        let mut table = Table::new(Cursor::new(&src as &[u8]),
-                                   size,
-                                   ReadOptions::default(),
-                                   StandardComparator)
+        let mut table = Table::new_raw(Options::default(),
+                                       Cursor::new(&src as &[u8]),
+                                       size,
+                                       BloomPolicy::new(4))
             .unwrap();
         let mut iter = table.iter();
+        let mut i = 0;
 
-        // defective blocks are skipped, i.e. we should start with the second block
+        while let Some((k, v)) = iter.next() {
+            assert_eq!((data[i].0.as_bytes(), data[i].1.as_bytes()),
+                       (k.as_ref(), v.as_ref()));
+            i += 1;
+        }
+
+        assert_eq!(i, data.len());
+        assert!(iter.next().is_none());
 
-        assert!(iter.next().is_some());
-        assert_eq!(iter.current(),
-                   Some(("bsr".as_bytes().to_vec(), "a00".as_bytes().to_vec())));
-        assert!(iter.next().is_some());
-        assert_eq!(iter.current(),
-                   Some(("xyz".as_bytes().to_vec(), "xxx".as_bytes().to_vec())));
-        assert!(iter.prev().is_some());
-        // corrupted blocks are skipped also when reading the other way round
-        assert!(iter.prev().is_none());
+        // backwards count
+        let mut j = 0;
+
+        while let Some((k, v)) = iter.prev() {
+            j += 1;
+            assert_eq!((data[data.len() - 1 - j].0.as_bytes(),
+                        data[data.len() - 1 - j].1.as_bytes()),
+                       (k.as_ref(), v.as_ref()));
+        }
+
+        // expecting 7 - 1, because the last entry that the iterator stopped on is the last entry
+        // in the table; that is, it needs to go back over 6 entries.
+        assert_eq!(j, 6);
     }
 
     #[test]
-    fn test_table_data_corruption_regardless() {
-        let mut opt = ReadOptions::default();
-        opt.skip_bad_blocks = false;
-
-        let (mut src, size) = build_table();
+    fn test_table_iterator_filter() {
+        let (src, size) = build_table();
 
-        // Mess with first block
-        src[28] += 1;
-
-        let mut table = Table::new(Cursor::new(&src as &[u8]), size, opt, StandardComparator)
+        let mut table = Table::new_raw(Options::default(),
+                                       Cursor::new(&src as &[u8]),
+                                       size,
+                                       BloomPolicy::new(4))
             .unwrap();
+        let filter_reader = table.filters.clone().unwrap();
         let mut iter = table.iter();
 
-        // defective blocks are NOT skipped!
-
-        assert!(iter.next().is_some());
-        assert_eq!(iter.current(),
-                   Some(("abc".as_bytes().to_vec(), "def".as_bytes().to_vec())));
-        assert!(iter.next().is_some());
-        assert_eq!(iter.current(),
-                   Some(("abd".as_bytes().to_vec(), "dee".as_bytes().to_vec())));
-    }
-
-    #[test]
-    fn test_table_get() {
-        let (src, size) = build_table();
-
-        let mut table = Table::new(Cursor::new(&src as &[u8]),
-                                   size,
-                                   ReadOptions::default(),
-                                   StandardComparator)
-            .unwrap();
-
-        assert_eq!(table.get("abc".as_bytes()), Some("def".as_bytes().to_vec()));
-        assert_eq!(table.get("zzz".as_bytes()), Some("111".as_bytes().to_vec()));
-        assert_eq!(table.get("xzz".as_bytes()), Some("yyy".as_bytes().to_vec()));
-        assert_eq!(table.get("xzy".as_bytes()), None);
+        loop {
+            if let Some((k, _)) = iter.next() {
+                assert!(filter_reader.key_may_match(iter.current_block_off, &k));
+                assert!(!filter_reader.key_may_match(iter.current_block_off,
+                                                     "somerandomkey".as_bytes()));
+            } else {
+                break;
+            }
+        }
     }
 
     #[test]
     fn test_table_iterator_state_behavior() {
         let (src, size) = build_table();
 
-        let mut table = Table::new(Cursor::new(&src as &[u8]),
-                                   size,
-                                   ReadOptions::default(),
-                                   StandardComparator)
+        let mut table = Table::new_raw(Options::default(),
+                                       Cursor::new(&src as &[u8]),
+                                       size,
+                                       BloomPolicy::new(4))
             .unwrap();
         let mut iter = table.iter();
 
@@ -446,8 +527,10 @@
         // See comment on valid()
         assert!(!iter.valid());
         assert!(iter.current().is_none());
+        assert!(iter.prev().is_none());
 
         assert!(iter.next().is_some());
+        let first = iter.current();
         assert!(iter.valid());
         assert!(iter.current().is_some());
 
@@ -458,6 +541,7 @@
         iter.reset();
         assert!(!iter.valid());
         assert!(iter.current().is_none());
+        assert_eq!(first, iter.next());
     }
 
     #[test]
@@ -465,10 +549,10 @@
         let (src, size) = build_table();
         let data = build_data();
 
-        let mut table = Table::new(Cursor::new(&src as &[u8]),
-                                   size,
-                                   ReadOptions::default(),
-                                   StandardComparator)
+        let mut table = Table::new_raw(Options::default(),
+                                       Cursor::new(&src as &[u8]),
+                                       size,
+                                       BloomPolicy::new(4))
             .unwrap();
         let mut iter = table.iter();
         let mut i = 0;
@@ -478,7 +562,7 @@
 
         // Go back to previous entry, check, go forward two entries, repeat
         // Verifies that prev/next works well.
-        while iter.valid() && i < data.len() {
+        loop {
             iter.prev();
 
             if let Some((k, v)) = iter.current() {
@@ -489,21 +573,23 @@
             }
 
             i += 1;
-            iter.next();
-            iter.next();
+            if iter.next().is_none() || iter.next().is_none() {
+                break;
+            }
         }
 
-        assert_eq!(i, 7);
+        // Skipping the last value because the second next() above will break the loop
+        assert_eq!(i, 6);
     }
 
     #[test]
     fn test_table_iterator_seek() {
         let (src, size) = build_table();
 
-        let mut table = Table::new(Cursor::new(&src as &[u8]),
-                                   size,
-                                   ReadOptions::default(),
-                                   StandardComparator)
+        let mut table = Table::new_raw(Options::default(),
+                                       Cursor::new(&src as &[u8]),
+                                       size,
+                                       BloomPolicy::new(4))
             .unwrap();
         let mut iter = table.iter();
 
@@ -516,4 +602,61 @@
         assert_eq!(iter.current(),
                    Some(("abc".as_bytes().to_vec(), "def".as_bytes().to_vec())));
     }
+
+    #[test]
+    fn test_table_get() {
+        let (src, size) = build_table();
+
+        let mut table = Table::new_raw(Options::default(),
+                                       Cursor::new(&src as &[u8]),
+                                       size,
+                                       BloomPolicy::new(4))
+            .unwrap();
+
+        assert!(table.get("aaa".as_bytes()).is_none());
+        assert_eq!(table.get("abc".as_bytes()), Some("def".as_bytes().to_vec()));
+        assert!(table.get("abcd".as_bytes()).is_none());
+        assert_eq!(table.get("bcd".as_bytes()), Some("asa".as_bytes().to_vec()));
+        assert_eq!(table.get("zzz".as_bytes()), Some("111".as_bytes().to_vec()));
+        assert!(table.get("zz1".as_bytes()).is_none());
+    }
+
+    // This test verifies that the table and filters work with internal keys. This means:
+    // The table contains keys in InternalKey format and it uses a filter wrapped by
+    // InternalFilterPolicy.
+    // All the other tests use raw keys that don't have any internal structure; this is fine in
+    // general, but here we want to see that the other infrastructure works too.
+    #[test]
+    fn test_table_internal_keys() {
+        use key_types::LookupKey;
+
+        let (src, size) = build_internal_table();
+
+        let mut table = Table::new(Options::default(),
+                                   Cursor::new(&src as &[u8]),
+                                   size,
+                                   BloomPolicy::new(4))
+            .unwrap();
+        let filter_reader = table.filters.clone().unwrap();
+
+        // Check that we're actually using internal keys
+        for (ref k, _) in table.iter() {
+            assert_eq!(k.len(), 3 + 8);
+        }
+
+        let mut iter = table.iter();
+
+        loop {
+            if let Some((k, _)) = iter.next() {
+                let lk = LookupKey::new(&k, 123);
+                let userkey = lk.user_key();
+
+                assert!(filter_reader.key_may_match(iter.current_block_off, userkey));
+                assert!(!filter_reader.key_may_match(iter.current_block_off,
+                                                     "somerandomkey".as_bytes()));
+            } else {
+                break;
+            }
+        }
+    }
 }