changeset 0:c28758ff297b

Initial commit: Check-in from abandoned sister project
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 19 Nov 2016 12:55:46 +0100
parents
children f7a0e4369fe7
files .hgignore Cargo.toml src/block.rs src/blockhandle.rs src/iterator.rs src/key_types.rs src/lib.rs src/options.rs src/table_builder.rs src/table_builder.rs.bk src/table_reader.rs
diffstat 11 files changed, 1633 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/.hgignore	Sat Nov 19 12:55:46 2016 +0100
@@ -0,0 +1,2 @@
+target
+Cargo.lock
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Cargo.toml	Sat Nov 19 12:55:46 2016 +0100
@@ -0,0 +1,9 @@
+[package]
+name = "sstable"
+version = "0.1.0"
+authors = ["Lewin Bormann <lewin@lewin-bormann.info>"]
+license = "MIT"
+
+[dependencies]
+crc = "1.2"
+integer-encoding = "1.0"
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/block.rs	Sat Nov 19 12:55:46 2016 +0100
@@ -0,0 +1,536 @@
+use std::cmp::Ordering;
+
+use std::rc::Rc;
+
+use options::Options;
+use iterator::{SSIterator, Comparator};
+
+use integer_encoding::FixedInt;
+use integer_encoding::VarInt;
+
+pub type BlockContents = Vec<u8>;
+
+/// A block is a list of ENTRIES followed by a list of RESTARTS, terminated by a fixed u32
+/// N_RESTARTS.
+///
+/// An ENTRY consists of three varints, SHARED, NON_SHARED, VALSIZE, a KEY and a VALUE.
+///
+/// SHARED denotes how many bytes the entry's key shares with the previous one.
+///
+/// NON_SHARED is the size of the key minus SHARED.
+///
+/// VALSIZE is the size of the value.
+///
+/// KEY and VALUE are byte strings; the length of KEY is NON_SHARED.
+///
+/// A RESTART is a fixed u32 pointing to the beginning of an ENTRY.
+///
+/// N_RESTARTS contains the number of restarts.
+pub struct Block<C: Comparator> {
+    block: Rc<BlockContents>,
+    cmp: C,
+}
+
+impl<C: Comparator> Block<C> {
+    pub fn iter(&self) -> BlockIter<C> {
+        let restarts = u32::decode_fixed(&self.block[self.block.len() - 4..]);
+        let restart_offset = self.block.len() - 4 - 4 * restarts as usize;
+
+        BlockIter {
+            block: self.block.clone(),
+            cmp: self.cmp,
+            offset: 0,
+            restarts_off: restart_offset,
+            current_entry_offset: 0,
+            current_restart_ix: 0,
+
+            key: Vec::new(),
+            val_offset: 0,
+        }
+    }
+
+    pub fn new(contents: BlockContents, cmp: C) -> Block<C> {
+        assert!(contents.len() > 4);
+        Block {
+            block: Rc::new(contents),
+            cmp: cmp,
+        }
+    }
+}
+
+
+#[derive(Debug)]
+pub struct BlockIter<C: Comparator> {
+    block: Rc<BlockContents>,
+    cmp: C,
+    // start of next entry
+    offset: usize,
+    // offset of restarts area
+    restarts_off: usize,
+    current_entry_offset: usize,
+    // tracks the last restart we encountered
+    current_restart_ix: usize,
+
+    // We assemble the key from two parts usually, so we keep the current full key here.
+    key: Vec<u8>,
+    val_offset: usize,
+}
+
+impl<C: Comparator> BlockIter<C> {
+    fn number_restarts(&self) -> usize {
+        u32::decode_fixed(&self.block[self.block.len() - 4..]) as usize
+    }
+
+    fn get_restart_point(&self, ix: usize) -> usize {
+        let restart = self.restarts_off + 4 * ix;
+        u32::decode_fixed(&self.block[restart..restart + 4]) as usize
+    }
+}
+
+impl<C: Comparator> BlockIter<C> {
+    // Returns SHARED, NON_SHARED and VALSIZE from the current position. Advances self.offset.
+    fn parse_entry(&mut self) -> (usize, usize, usize) {
+        let mut i = 0;
+        let (shared, sharedlen) = usize::decode_var(&self.block[self.offset..]);
+        i += sharedlen;
+
+        let (non_shared, non_sharedlen) = usize::decode_var(&self.block[self.offset + i..]);
+        i += non_sharedlen;
+
+        let (valsize, valsizelen) = usize::decode_var(&self.block[self.offset + i..]);
+        i += valsizelen;
+
+        self.offset += i;
+
+        (shared, non_shared, valsize)
+    }
+
+    /// offset is assumed to be at the beginning of the non-shared key part.
+    /// offset is not advanced.
+    fn assemble_key(&mut self, shared: usize, non_shared: usize) {
+        self.key.resize(shared, 0);
+        self.key.extend_from_slice(&self.block[self.offset..self.offset + non_shared]);
+    }
+
+    pub fn seek_to_last(&mut self) {
+        if self.number_restarts() > 0 {
+            let restart = self.get_restart_point(self.number_restarts() - 1);
+            self.offset = restart;
+            self.current_entry_offset = restart;
+            self.current_restart_ix = self.number_restarts() - 1;
+        } else {
+            self.reset();
+        }
+
+        while let Some((_, _)) = self.next() {
+        }
+
+        self.prev();
+    }
+}
+
+impl<C: Comparator> Iterator for BlockIter<C> {
+    type Item = (Vec<u8>, Vec<u8>);
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.current_entry_offset = self.offset;
+
+        if self.current_entry_offset >= self.restarts_off {
+            return None;
+        }
+        let (shared, non_shared, valsize) = self.parse_entry();
+        self.assemble_key(shared, non_shared);
+
+        self.val_offset = self.offset + non_shared;
+        self.offset = self.val_offset + valsize;
+
+        let num_restarts = self.number_restarts();
+        while self.current_restart_ix + 1 < num_restarts &&
+              self.get_restart_point(self.current_restart_ix + 1) < self.current_entry_offset {
+            self.current_restart_ix += 1;
+        }
+        Some((self.key.clone(), Vec::from(&self.block[self.val_offset..self.val_offset + valsize])))
+    }
+}
+
+impl<C: Comparator> SSIterator for BlockIter<C> {
+    fn reset(&mut self) {
+        self.offset = 0;
+        self.current_restart_ix = 0;
+        self.key.clear();
+        self.val_offset = 0;
+    }
+    fn prev(&mut self) -> Option<Self::Item> {
+        // as in the original implementation -- seek to last restart point, then look for key
+        let current_offset = self.current_entry_offset;
+
+        // At the beginning, can't go further back
+        if current_offset == 0 {
+            self.reset();
+            return None;
+        }
+
+        while self.get_restart_point(self.current_restart_ix) >= current_offset {
+            self.current_restart_ix -= 1;
+        }
+
+        self.offset = self.get_restart_point(self.current_restart_ix);
+        assert!(self.offset < current_offset);
+
+        let mut result;
+
+        loop {
+            result = self.next();
+
+            if self.offset >= current_offset {
+                break;
+            }
+        }
+        result
+    }
+
+    fn seek(&mut self, to: &[u8]) {
+        self.reset();
+
+        let mut left = 0;
+        let mut right = if self.number_restarts() == 0 {
+            0
+        } else {
+            self.number_restarts() - 1
+        };
+
+        // Do a binary search over the restart points.
+        while left < right {
+            let middle = (left + right + 1) / 2;
+            self.offset = self.get_restart_point(middle);
+            // advances self.offset
+            let (shared, non_shared, _) = self.parse_entry();
+
+            // At a restart, the shared part is supposed to be 0.
+            assert_eq!(shared, 0);
+
+            let cmp = self.cmp.cmp(to, &self.block[self.offset..self.offset + non_shared]);
+
+            if cmp == Ordering::Less {
+                right = middle - 1;
+            } else {
+                left = middle;
+            }
+        }
+
+        assert_eq!(left, right);
+        self.current_restart_ix = left;
+        self.offset = self.get_restart_point(left);
+
+        // Linear search from here on
+        while let Some((k, _)) = self.next() {
+            if self.cmp.cmp(k.as_slice(), to) >= Ordering::Equal {
+                return;
+            }
+        }
+    }
+
+    fn valid(&self) -> bool {
+        !self.key.is_empty() && self.val_offset > 0 && self.val_offset < self.restarts_off
+    }
+
+    fn current(&self) -> Option<Self::Item> {
+        if self.valid() {
+            Some((self.key.clone(), Vec::from(&self.block[self.val_offset..self.offset])))
+        } else {
+            None
+        }
+    }
+}
+
+pub struct BlockBuilder<C: Comparator> {
+    opt: Options,
+    cmp: C,
+    buffer: Vec<u8>,
+    restarts: Vec<u32>,
+
+    last_key: Vec<u8>,
+    counter: usize,
+}
+
+impl<C: Comparator> BlockBuilder<C> {
+    pub fn new(o: Options, cmp: C) -> BlockBuilder<C> {
+        let mut restarts = vec![0];
+        restarts.reserve(1023);
+
+        BlockBuilder {
+            buffer: Vec::with_capacity(o.block_size),
+            opt: o,
+            cmp: cmp,
+            restarts: restarts,
+            last_key: Vec::new(),
+            counter: 0,
+        }
+    }
+
+    pub fn entries(&self) -> usize {
+        self.counter
+    }
+
+    pub fn last_key<'a>(&'a self) -> &'a [u8] {
+        &self.last_key
+    }
+
+    pub fn size_estimate(&self) -> usize {
+        self.buffer.len() + self.restarts.len() * 4 + 4
+    }
+
+    #[allow(dead_code)]
+    pub fn reset(&mut self) {
+        self.buffer.clear();
+        self.restarts.clear();
+        self.last_key.clear();
+        self.counter = 0;
+    }
+
+    pub fn add(&mut self, key: &[u8], val: &[u8]) {
+        assert!(self.counter <= self.opt.block_restart_interval);
+        assert!(self.buffer.is_empty() ||
+                self.cmp.cmp(self.last_key.as_slice(), key) == Ordering::Less);
+
+        let mut shared = 0;
+
+        if self.counter < self.opt.block_restart_interval {
+            let smallest = if self.last_key.len() < key.len() {
+                self.last_key.len()
+            } else {
+                key.len()
+            };
+
+            while shared < smallest && self.last_key[shared] == key[shared] {
+                shared += 1;
+            }
+        } else {
+            self.restarts.push(self.buffer.len() as u32);
+            self.last_key.resize(0, 0);
+            self.counter = 0;
+        }
+
+        let non_shared = key.len() - shared;
+
+        let mut buf = [0 as u8; 4];
+
+        let mut sz = shared.encode_var(&mut buf[..]);
+        self.buffer.extend_from_slice(&buf[0..sz]);
+        sz = non_shared.encode_var(&mut buf[..]);
+        self.buffer.extend_from_slice(&buf[0..sz]);
+        sz = val.len().encode_var(&mut buf[..]);
+        self.buffer.extend_from_slice(&buf[0..sz]);
+
+        self.buffer.extend_from_slice(&key[shared..]);
+        self.buffer.extend_from_slice(val);
+
+        // Update key
+        self.last_key.resize(shared, 0);
+        self.last_key.extend_from_slice(&key[shared..]);
+
+        // assert_eq!(&self.last_key[..], key);
+
+        self.counter += 1;
+    }
+
+    pub fn finish(mut self) -> BlockContents {
+        // 1. Append RESTARTS
+        let mut i = self.buffer.len();
+        self.buffer.resize(i + self.restarts.len() * 4 + 4, 0);
+
+        for r in self.restarts.iter() {
+            r.encode_fixed(&mut self.buffer[i..i + 4]);
+            i += 4;
+        }
+
+        // 2. Append N_RESTARTS
+        (self.restarts.len() as u32).encode_fixed(&mut self.buffer[i..i + 4]);
+
+        // done
+        self.buffer
+    }
+}
+
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use iterator::*;
+    use options::*;
+
+    fn get_data() -> Vec<(&'static [u8], &'static [u8])> {
+        vec![("key1".as_bytes(), "value1".as_bytes()),
+             ("loooooooooooooooooooooooooooooooooongerkey1".as_bytes(), "shrtvl1".as_bytes()),
+             ("medium length key 1".as_bytes(), "some value 2".as_bytes()),
+             ("prefix_key1".as_bytes(), "value".as_bytes()),
+             ("prefix_key2".as_bytes(), "value".as_bytes()),
+             ("prefix_key3".as_bytes(), "value".as_bytes())]
+    }
+
+    #[test]
+    fn test_block_builder() {
+        let mut o = Options::default();
+        o.block_restart_interval = 3;
+
+        let mut builder = BlockBuilder::new(o, StandardComparator);
+
+        for &(k, v) in get_data().iter() {
+            builder.add(k, v);
+            assert!(builder.counter <= 3);
+            assert_eq!(builder.last_key(), k);
+        }
+
+        let block = builder.finish();
+        assert_eq!(block.len(), 149);
+
+    }
+
+    #[test]
+    fn test_block_builder_reset() {
+        let o = Options::default();
+
+        let mut builder = BlockBuilder::new(o, StandardComparator);
+
+        builder.add("abc".as_bytes(), "def".as_bytes());
+
+        assert!(builder.size_estimate() > 4);
+        builder.reset();
+        assert_eq!(builder.size_estimate(), 4);
+    }
+
+    #[test]
+    fn test_block_empty() {
+        let mut o = Options::default();
+        o.block_restart_interval = 16;
+        let builder = BlockBuilder::new(o, StandardComparator);
+
+        let blockc = builder.finish();
+        assert_eq!(blockc.len(), 8);
+        assert_eq!(blockc, vec![0, 0, 0, 0, 1, 0, 0, 0]);
+
+        let block = Block::new(blockc, StandardComparator);
+
+        for _ in block.iter() {
+            panic!("expected 0 iterations");
+        }
+    }
+
+    #[test]
+    fn test_block_build_iterate() {
+        let data = get_data();
+        let mut builder = BlockBuilder::new(Options::default(), StandardComparator);
+
+        for &(k, v) in data.iter() {
+            builder.add(k, v);
+        }
+
+        let block_contents = builder.finish();
+        let block = Block::new(block_contents, StandardComparator);
+        let block_iter = block.iter();
+        let mut i = 0;
+
+        assert!(!block_iter.valid());
+
+        for (k, v) in block_iter {
+            assert_eq!(&k[..], data[i].0);
+            assert_eq!(v, data[i].1);
+            i += 1;
+        }
+        assert_eq!(i, data.len());
+
+        let mut block_iter = block.iter();
+
+        block_iter.reset();
+        assert!(block_iter.next().is_some());
+        assert!(block_iter.valid());
+        block_iter.reset();
+        assert!(!block_iter.valid());
+    }
+
+    #[test]
+    fn test_block_iterate_reverse() {
+        let mut o = Options::default();
+        o.block_restart_interval = 3;
+        let data = get_data();
+        let mut builder = BlockBuilder::new(o, StandardComparator);
+
+        for &(k, v) in data.iter() {
+            builder.add(k, v);
+        }
+
+        let block_contents = builder.finish();
+        let mut block = Block::new(block_contents, StandardComparator).iter();
+
+        assert!(!block.valid());
+        assert_eq!(block.next(),
+                   Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec())));
+        assert!(block.valid());
+        block.next();
+        assert!(block.valid());
+        block.prev();
+        assert!(block.valid());
+        assert_eq!(block.current(),
+                   Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec())));
+        block.prev();
+        assert!(!block.valid());
+    }
+
+    #[test]
+    fn test_block_seek() {
+        let mut o = Options::default();
+        o.block_restart_interval = 3;
+
+        let data = get_data();
+        let mut builder = BlockBuilder::new(o, StandardComparator);
+
+        for &(k, v) in data.iter() {
+            builder.add(k, v);
+        }
+
+        let block_contents = builder.finish();
+
+        let mut block = Block::new(block_contents, StandardComparator).iter();
+
+        block.seek(&"prefix_key2".as_bytes());
+        assert!(block.valid());
+        assert_eq!(block.current(),
+                   Some(("prefix_key2".as_bytes().to_vec(), "value".as_bytes().to_vec())));
+
+        block.seek(&"prefix_key0".as_bytes());
+        assert!(block.valid());
+        assert_eq!(block.current(),
+                   Some(("prefix_key1".as_bytes().to_vec(), "value".as_bytes().to_vec())));
+
+        block.seek(&"key1".as_bytes());
+        assert!(block.valid());
+        assert_eq!(block.current(),
+                   Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec())));
+    }
+
+    #[test]
+    fn test_block_seek_to_last() {
+        let mut o = Options::default();
+
+        // Test with different number of restarts
+        for block_restart_interval in vec![2, 6, 10] {
+            o.block_restart_interval = block_restart_interval;
+
+            let data = get_data();
+            let mut builder = BlockBuilder::new(o, StandardComparator);
+
+            for &(k, v) in data.iter() {
+                builder.add(k, v);
+            }
+
+            let block_contents = builder.finish();
+
+            let mut block = Block::new(block_contents, StandardComparator).iter();
+
+            block.seek_to_last();
+            assert!(block.valid());
+            assert_eq!(block.current(),
+                       Some(("prefix_key3".as_bytes().to_vec(), "value".as_bytes().to_vec())));
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/blockhandle.rs	Sat Nov 19 12:55:46 2016 +0100
@@ -0,0 +1,67 @@
+use integer_encoding::VarInt;
+
+/// Contains an offset and a length (or size); can be efficiently encoded in to varints. This is
+/// used typically as file-internal pointer in table (SSTable) files. For example, the index block
+/// in an SSTable is a block of (key = largest key in block) -> (value = encoded blockhandle of
+/// block).
+#[derive(Debug)]
+pub struct BlockHandle {
+    offset: usize,
+    size: usize,
+}
+
+impl BlockHandle {
+    /// Decodes a block handle from `from` and returns a block handle
+    /// together with how many bytes were read from the slice.
+    pub fn decode(from: &[u8]) -> (BlockHandle, usize) {
+        let (off, offsize) = usize::decode_var(from);
+        let (sz, szsize) = usize::decode_var(&from[offsize..]);
+
+        (BlockHandle {
+            offset: off,
+            size: sz,
+        },
+         offsize + szsize)
+    }
+
+    pub fn new(offset: usize, size: usize) -> BlockHandle {
+        BlockHandle {
+            offset: offset,
+            size: size,
+        }
+    }
+
+    pub fn offset(&self) -> usize {
+        self.offset
+    }
+
+    pub fn size(&self) -> usize {
+        self.size
+    }
+
+    /// Returns how many bytes were written, or 0 if the write failed because `dst` is too small.
+    pub fn encode_to(&self, dst: &mut [u8]) -> usize {
+        assert!(dst.len() >= self.offset.required_space() + self.size.required_space());
+
+        let off = self.offset.encode_var(dst);
+        off + self.size.encode_var(&mut dst[off..])
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_blockhandle() {
+        let bh = BlockHandle::new(890, 777);
+        let mut dst = [0 as u8; 128];
+        let enc_sz = bh.encode_to(&mut dst[..]);
+
+        let (bh2, dec_sz) = BlockHandle::decode(&dst);
+
+        assert_eq!(enc_sz, dec_sz);
+        assert_eq!(bh.size(), bh2.size());
+        assert_eq!(bh.offset(), bh2.offset());
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/iterator.rs	Sat Nov 19 12:55:46 2016 +0100
@@ -0,0 +1,46 @@
+//! A collection of fundamental and/or simple types used by other modules
+
+use std::cmp::Ordering;
+
+/// Trait used to influence how SkipMap determines the order of elements. Use StandardComparator
+/// for the normal implementation using numerical comparison.
+pub trait Comparator: Copy {
+    fn cmp(&self, &[u8], &[u8]) -> Ordering;
+}
+
+#[derive(Clone, Copy, Default)]
+pub struct StandardComparator;
+
+impl Comparator for StandardComparator {
+    fn cmp(&self, a: &[u8], b: &[u8]) -> Ordering {
+        a.cmp(b)
+    }
+}
+
+/// An extension of the standard `Iterator` trait that supports some methods necessary for LevelDB.
+/// This works because the iterators used are stateful and keep the last returned element.
+///
+/// Note: Implementing types are expected to hold `!valid()` before the first call to `next()`.
+pub trait SSIterator: Iterator {
+    // We're emulating LevelDB's Slice type here using actual slices with the lifetime of the
+    // iterator. The lifetime of the iterator is usually the one of the backing storage (Block,
+    // MemTable, SkipMap...)
+    // type Item = (&'a [u8], &'a [u8]);
+
+    /// Seek the iterator to `key` or the next bigger key. If the seek is invalid (past last
+    /// element), the iterator is reset() and not valid.
+    fn seek(&mut self, key: &[u8]);
+    /// Resets the iterator to be `!valid()` again (before first element)
+    fn reset(&mut self);
+    /// Returns true if `current()` would return a valid item.
+    fn valid(&self) -> bool;
+    /// Return the current item.
+    fn current(&self) -> Option<Self::Item>;
+    /// Go to the previous item.
+    fn prev(&mut self) -> Option<Self::Item>;
+
+    fn seek_to_first(&mut self) {
+        self.reset();
+        self.next();
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/key_types.rs	Sat Nov 19 12:55:46 2016 +0100
@@ -0,0 +1,11 @@
+// The following typedefs are used to distinguish between the different key formats used internally
+// by different modules.
+
+/// A UserKey is the actual key supplied by the calling application, without any internal
+/// decorations.
+pub type UserKey<'a> = &'a [u8];
+
+/// An InternalKey consists of [key, tag], so it's basically a MemtableKey without the initial
+/// length specification. This type is used as item type of MemtableIterator, and as the key
+/// type of tables.
+pub type InternalKey<'a> = &'a [u8];
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib.rs	Sat Nov 19 12:55:46 2016 +0100
@@ -0,0 +1,17 @@
+extern crate crc;
+extern crate integer_encoding;
+
+mod block;
+mod blockhandle;
+mod iterator;
+mod key_types;
+
+pub mod options;
+pub mod table_builder;
+pub mod table_reader;
+
+#[cfg(test)]
+mod tests {
+    #[test]
+    fn it_works() {}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/options.rs	Sat Nov 19 12:55:46 2016 +0100
@@ -0,0 +1,26 @@
+use std::default::Default;
+
+#[derive(Clone, Copy, PartialEq, Debug)]
+pub enum CompressionType {
+    CompressionNone = 0,
+    CompressionSnappy = 1,
+}
+
+/// [not all member types implemented yet]
+///
+#[derive(Clone, Copy)]
+pub struct Options {
+    pub block_size: usize,
+    pub block_restart_interval: usize,
+    pub compression_type: CompressionType,
+}
+
+impl Default for Options {
+    fn default() -> Options {
+        Options {
+            block_size: 4 * (1 << 10),
+            block_restart_interval: 16,
+            compression_type: CompressionType::CompressionNone,
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/table_builder.rs	Sat Nov 19 12:55:46 2016 +0100
@@ -0,0 +1,273 @@
+use block::{BlockBuilder, BlockContents};
+use blockhandle::BlockHandle;
+use key_types::InternalKey;
+use options::{CompressionType, Options};
+use iterator::Comparator;
+
+use std::io::Write;
+use std::cmp::Ordering;
+
+use crc::crc32;
+use crc::Hasher32;
+use integer_encoding::FixedInt;
+
+pub const FOOTER_LENGTH: usize = 40;
+pub const FULL_FOOTER_LENGTH: usize = FOOTER_LENGTH + 8;
+pub const MAGIC_FOOTER_NUMBER: u64 = 0xdb4775248b80fb57;
+pub const MAGIC_FOOTER_ENCODED: [u8; 8] = [0x57, 0xfb, 0x80, 0x8b, 0x24, 0x75, 0x47, 0xdb];
+
+fn find_shortest_sep<'a, C: Comparator>(c: &C,
+                                        lo: InternalKey<'a>,
+                                        hi: InternalKey<'a>)
+                                        -> Vec<u8> {
+    let min;
+
+    if lo.len() < hi.len() {
+        min = lo.len();
+    } else {
+        min = hi.len();
+    }
+
+    let mut diff_at = 0;
+
+    while diff_at < min && lo[diff_at] == hi[diff_at] {
+        diff_at += 1;
+    }
+
+    if diff_at == min {
+        return Vec::from(lo);
+    } else {
+        if lo[diff_at] < 0xff && lo[diff_at] + 1 < hi[diff_at] {
+            let mut result = Vec::from(&lo[0..diff_at + 1]);
+            result[diff_at] += 1;
+            assert_eq!(c.cmp(&result, hi), Ordering::Less);
+            return result;
+        }
+        return Vec::from(lo);
+    }
+}
+
+/// Footer is a helper for encoding/decoding a table footer.
+#[derive(Debug)]
+pub struct Footer {
+    pub index: BlockHandle,
+}
+
+impl Footer {
+    pub fn new(index: BlockHandle) -> Footer {
+        Footer { index: index }
+    }
+
+    pub fn decode(from: &[u8]) -> Footer {
+        assert!(from.len() >= FULL_FOOTER_LENGTH);
+        assert_eq!(&from[FOOTER_LENGTH..], &MAGIC_FOOTER_ENCODED);
+        let (ix, _) = BlockHandle::decode(&from[0..]);
+
+        Footer { index: ix }
+    }
+
+    pub fn encode(&self, to: &mut [u8]) {
+        assert!(to.len() >= FOOTER_LENGTH + 8);
+
+        let s1 = self.index.encode_to(&mut to[0..]);
+
+        for i in s1..FOOTER_LENGTH {
+            to[i] = 0;
+        }
+        for i in FOOTER_LENGTH..FULL_FOOTER_LENGTH {
+            to[i] = MAGIC_FOOTER_ENCODED[i - FOOTER_LENGTH];
+        }
+    }
+}
+
+/// A table consists of DATA BLOCKs, an INDEX BLOCK and a FOOTER.
+///
+/// DATA BLOCKs, INDEX BLOCK BLOCK are built using the code in
+/// the `block` module.
+///
+/// The footer is a pointer pointing to
+/// the index block, padding to fill up to 40 B and at the end the 8B magic number
+/// 0xdb4775248b80fb57.
+
+pub struct TableBuilder<C: Comparator, Dst: Write> {
+    o: Options,
+    cmp: C,
+    dst: Dst,
+
+    offset: usize,
+    num_entries: usize,
+    prev_block_last_key: Vec<u8>,
+
+    data_block: Option<BlockBuilder<C>>,
+    index_block: Option<BlockBuilder<C>>,
+}
+
+impl<C: Comparator, Dst: Write> TableBuilder<C, Dst> {
+    pub fn new(opt: Options, cmp: C, dst: Dst) -> TableBuilder<C, Dst> {
+        TableBuilder {
+            o: opt,
+            cmp: cmp,
+            dst: dst,
+            offset: 0,
+            prev_block_last_key: vec![],
+            num_entries: 0,
+            data_block: Some(BlockBuilder::new(opt, cmp)),
+            index_block: Some(BlockBuilder::new(opt, cmp)),
+        }
+    }
+
+    pub fn entries(&self) -> usize {
+        self.num_entries
+    }
+
+    pub fn add<'a>(&mut self, key: InternalKey<'a>, val: &[u8]) {
+        assert!(self.data_block.is_some());
+        assert!(self.num_entries == 0 ||
+                self.cmp.cmp(&self.prev_block_last_key, key) == Ordering::Less);
+
+        if self.data_block.as_ref().unwrap().size_estimate() > self.o.block_size {
+            self.write_data_block(key);
+        }
+
+        let dblock = &mut self.data_block.as_mut().unwrap();
+
+        self.num_entries += 1;
+        dblock.add(key, val);
+    }
+
+    /// Writes an index entry for the current data_block where `next_key` is the first key of the
+    /// next block.
+    fn write_data_block<'b>(&mut self, next_key: InternalKey<'b>) {
+        assert!(self.data_block.is_some());
+
+        let block = self.data_block.take().unwrap();
+        let sep = find_shortest_sep(&self.cmp, block.last_key(), next_key);
+        self.prev_block_last_key = Vec::from(block.last_key());
+        let contents = block.finish();
+
+        let handle = BlockHandle::new(self.offset, contents.len());
+        let mut handle_enc = [0 as u8; 16];
+        let enc_len = handle.encode_to(&mut handle_enc);
+
+        self.index_block.as_mut().unwrap().add(&sep, &handle_enc[0..enc_len]);
+        self.data_block = Some(BlockBuilder::new(self.o, self.cmp));
+
+        let ctype = self.o.compression_type;
+        self.write_block(contents, ctype);
+    }
+
+    fn write_block(&mut self, c: BlockContents, t: CompressionType) -> BlockHandle {
+        // compression is still unimplemented
+        assert_eq!(t, CompressionType::CompressionNone);
+
+        let mut buf = [0 as u8; 4];
+        let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
+
+        digest.write(&c);
+        digest.write(&[self.o.compression_type as u8; 1]);
+        digest.sum32().encode_fixed(&mut buf);
+
+        // TODO: Handle errors here.
+        let _ = self.dst.write(&c);
+        let _ = self.dst.write(&[t as u8; 1]);
+        let _ = self.dst.write(&buf);
+
+        let handle = BlockHandle::new(self.offset, c.len());
+
+        self.offset += c.len() + 1 + buf.len();
+
+        handle
+    }
+
+    pub fn finish(mut self) {
+        assert!(self.data_block.is_some());
+        let ctype = self.o.compression_type;
+
+        // If there's a pending data block, write that one
+        let flush_last_block = self.data_block.as_ref().unwrap().entries() > 0;
+        if flush_last_block {
+            self.write_data_block(&[0xff as u8; 1]);
+        }
+
+        // write index block
+        let index_cont = self.index_block.take().unwrap().finish();
+        let ix_handle = self.write_block(index_cont, ctype);
+
+        // write footer.
+        let footer = Footer::new(ix_handle);
+        let mut buf = [0; FULL_FOOTER_LENGTH];
+        footer.encode(&mut buf);
+
+        self.offset += self.dst.write(&buf[..]).unwrap();
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::{find_shortest_sep, Footer, TableBuilder};
+    use iterator::StandardComparator;
+    use blockhandle::BlockHandle;
+    use options::Options;
+
+    #[test]
+    fn test_shortest_sep() {
+        assert_eq!(find_shortest_sep(&StandardComparator, "abcd".as_bytes(), "abcf".as_bytes()),
+                   "abce".as_bytes());
+        assert_eq!(find_shortest_sep(&StandardComparator,
+                                     "abcdefghi".as_bytes(),
+                                     "abcffghi".as_bytes()),
+                   "abce".as_bytes());
+        assert_eq!(find_shortest_sep(&StandardComparator, "a".as_bytes(), "a".as_bytes()),
+                   "a".as_bytes());
+        assert_eq!(find_shortest_sep(&StandardComparator, "a".as_bytes(), "b".as_bytes()),
+                   "a".as_bytes());
+        assert_eq!(find_shortest_sep(&StandardComparator, "abc".as_bytes(), "zzz".as_bytes()),
+                   "b".as_bytes());
+        assert_eq!(find_shortest_sep(&StandardComparator, "".as_bytes(), "".as_bytes()),
+                   "".as_bytes());
+    }
+
+    #[test]
+    fn test_footer() {
+        let f = Footer::new(BlockHandle::new(55, 5));
+        let mut buf = [0; 48];
+        f.encode(&mut buf[..]);
+
+        let f2 = Footer::decode(&buf);
+        assert_eq!(f2.index.offset(), 55);
+        assert_eq!(f2.index.size(), 5);
+
+    }
+
+    #[test]
+    fn test_table_builder() {
+        let mut d = Vec::with_capacity(512);
+        let mut opt = Options::default();
+        opt.block_restart_interval = 3;
+        let mut b = TableBuilder::new(opt, StandardComparator, &mut d);
+
+        let data = vec![("abc", "def"), ("abd", "dee"), ("bcd", "asa"), ("bsr", "a00")];
+
+        for &(k, v) in data.iter() {
+            b.add(k.as_bytes(), v.as_bytes());
+        }
+
+        b.finish();
+    }
+
+    #[test]
+    #[should_panic]
+    fn test_bad_input() {
+        let mut d = Vec::with_capacity(512);
+        let mut opt = Options::default();
+        opt.block_restart_interval = 3;
+        let mut b = TableBuilder::new(opt, StandardComparator, &mut d);
+
+        // Test two equal consecutive keys
+        let data = vec![("abc", "def"), ("abc", "dee"), ("bcd", "asa"), ("bsr", "a00")];
+
+        for &(k, v) in data.iter() {
+            b.add(k.as_bytes(), v.as_bytes());
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/table_builder.rs.bk	Sat Nov 19 12:55:46 2016 +0100
@@ -0,0 +1,280 @@
+use block::{BlockBuilder, BlockContents};
+use blockhandle::BlockHandle;
+use key_types::InternalKey;
+use options::{CompressionType, Options};
+use iterator::Comparator;
+
+use std::io::Write;
+use std::cmp::Ordering;
+
+use crc::crc32;
+use crc::Hasher32;
+use integer_encoding::FixedInt;
+
+pub const FOOTER_LENGTH: usize = 40;
+pub const FULL_FOOTER_LENGTH: usize = FOOTER_LENGTH + 8;
+pub const MAGIC_FOOTER_NUMBER: u64 = 0xdb4775248b80fb57;
+pub const MAGIC_FOOTER_ENCODED: [u8; 8] = [0x57, 0xfb, 0x80, 0x8b, 0x24, 0x75, 0x47, 0xdb];
+
+fn find_shortest_sep<'a, C: Comparator>(c: &C,
+                                        lo: InternalKey<'a>,
+                                        hi: InternalKey<'a>)
+                                        -> Vec<u8> {
+    let min;
+
+    if lo.len() < hi.len() {
+        min = lo.len();
+    } else {
+        min = hi.len();
+    }
+
+    let mut diff_at = 0;
+
+    while diff_at < min && lo[diff_at] == hi[diff_at] {
+        diff_at += 1;
+    }
+
+    if diff_at == min {
+        return Vec::from(lo);
+    } else {
+        if lo[diff_at] < 0xff && lo[diff_at] + 1 < hi[diff_at] {
+            let mut result = Vec::from(&lo[0..diff_at + 1]);
+            result[diff_at] += 1;
+            assert_eq!(c.cmp(&result, hi), Ordering::Less);
+            return result;
+        }
+        return Vec::from(lo);
+    }
+}
+
+/// Footer is a helper for encoding/decoding a table footer.
+#[derive(Debug)]
+pub struct Footer {
+    pub index: BlockHandle,
+}
+
+impl Footer {
+    pub fn new(index: BlockHandle) -> Footer {
+        Footer {
+            index: index,
+        }
+    }
+
+    pub fn decode(from: &[u8]) -> Footer {
+        assert!(from.len() >= FULL_FOOTER_LENGTH);
+        assert_eq!(&from[FOOTER_LENGTH..], &MAGIC_FOOTER_ENCODED);
+        let (ix, _) = BlockHandle::decode(&from[0..]);
+
+        Footer {
+            index: ix,
+        }
+    }
+
+    pub fn encode(&self, to: &mut [u8]) {
+        assert!(to.len() >= FOOTER_LENGTH + 8);
+
+        let s1 = self.index.encode_to(&mut to[0..]);
+
+        for i in s1..FOOTER_LENGTH {
+            to[i] = 0;
+        }
+        for i in FOOTER_LENGTH..FULL_FOOTER_LENGTH {
+            to[i] = MAGIC_FOOTER_ENCODED[i - FOOTER_LENGTH];
+        }
+    }
+}
+
+/// A table consists of DATA BLOCKs, an INDEX BLOCK and a FOOTER.
+///
+/// DATA BLOCKs, INDEX BLOCK BLOCK are built using the code in
+/// the `block` module.
+///
+/// The footer is a pointer pointing to
+/// the index block, padding to fill up to 40 B and at the end the 8B magic number
+/// 0xdb4775248b80fb57.
+
+pub struct TableBuilder<C: Comparator, Dst: Write> {
+    o: Options,
+    cmp: C,
+    dst: Dst,
+
+    offset: usize,
+    num_entries: usize,
+    prev_block_last_key: Vec<u8>,
+
+    data_block: Option<BlockBuilder<C>>,
+    index_block: Option<BlockBuilder<C>>,
+}
+
+impl<C: Comparator, Dst: Write> TableBuilder<C, Dst> {
+    pub fn new(opt: Options,
+                         cmp: C,
+                         dst: Dst)
+                         -> TableBuilder<C, Dst> {
+        TableBuilder {
+            o: opt,
+            cmp: cmp,
+            dst: dst,
+            offset: 0,
+            prev_block_last_key: vec![],
+            num_entries: 0,
+            data_block: Some(BlockBuilder::new(opt, cmp)),
+            index_block: Some(BlockBuilder::new(opt, cmp)),
+        }
+    }
+
+    pub fn entries(&self) -> usize {
+        self.num_entries
+    }
+
+    pub fn add<'a>(&mut self, key: InternalKey<'a>, val: &[u8]) {
+        assert!(self.data_block.is_some());
+        assert!(self.num_entries == 0 ||
+                self.cmp.cmp(&self.prev_block_last_key, key) == Ordering::Less);
+
+        if self.data_block.as_ref().unwrap().size_estimate() > self.o.block_size {
+            self.write_data_block(key);
+        }
+
+        let dblock = &mut self.data_block.as_mut().unwrap();
+
+        self.num_entries += 1;
+        dblock.add(key, val);
+    }
+
+    /// Writes an index entry for the current data_block where `next_key` is the first key of the
+    /// next block.
+    fn write_data_block<'b>(&mut self, next_key: InternalKey<'b>) {
+        assert!(self.data_block.is_some());
+
+        let block = self.data_block.take().unwrap();
+        let sep = find_shortest_sep(&self.cmp, block.last_key(), next_key);
+        self.prev_block_last_key = Vec::from(block.last_key());
+        let contents = block.finish();
+
+        let handle = BlockHandle::new(self.offset, contents.len());
+        let mut handle_enc = [0 as u8; 16];
+        let enc_len = handle.encode_to(&mut handle_enc);
+
+        self.index_block.as_mut().unwrap().add(&sep, &handle_enc[0..enc_len]);
+        self.data_block = Some(BlockBuilder::new(self.o, self.cmp));
+
+        let ctype = self.o.compression_type;
+        self.write_block(contents, ctype);
+    }
+
+    fn write_block(&mut self, c: BlockContents, t: CompressionType) -> BlockHandle {
+        // compression is still unimplemented
+        assert_eq!(t, CompressionType::CompressionNone);
+
+        let mut buf = [0 as u8; 4];
+        let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
+
+        digest.write(&c);
+        digest.write(&[self.o.compression_type as u8; 1]);
+        digest.sum32().encode_fixed(&mut buf);
+
+        // TODO: Handle errors here.
+        let _ = self.dst.write(&c);
+        let _ = self.dst.write(&[t as u8; 1]);
+        let _ = self.dst.write(&buf);
+
+        let handle = BlockHandle::new(self.offset, c.len());
+
+        self.offset += c.len() + 1 + buf.len();
+
+        handle
+    }
+
+    pub fn finish(mut self) {
+        assert!(self.data_block.is_some());
+        let ctype = self.o.compression_type;
+
+        // If there's a pending data block, write that one
+        let flush_last_block = self.data_block.as_ref().unwrap().entries() > 0;
+        if flush_last_block {
+            self.write_data_block(&[0xff as u8; 1]);
+        }
+
+        // write index block
+        let index_cont = self.index_block.take().unwrap().finish();
+        let ix_handle = self.write_block(index_cont, ctype);
+
+        // write footer.
+        let footer = Footer::new(ix_handle);
+        let mut buf = [0; FULL_FOOTER_LENGTH];
+        footer.encode(&mut buf);
+
+        self.offset += self.dst.write(&buf[..]).unwrap();
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::{find_shortest_sep, Footer, TableBuilder};
+    use iterator::StandardComparator;
+    use blockhandle::BlockHandle;
+    use options::Options;
+
+    #[test]
+    fn test_shortest_sep() {
+        assert_eq!(find_shortest_sep(&StandardComparator, "abcd".as_bytes(), "abcf".as_bytes()),
+                   "abce".as_bytes());
+        assert_eq!(find_shortest_sep(&StandardComparator,
+                                     "abcdefghi".as_bytes(),
+                                     "abcffghi".as_bytes()),
+                   "abce".as_bytes());
+        assert_eq!(find_shortest_sep(&StandardComparator, "a".as_bytes(), "a".as_bytes()),
+                   "a".as_bytes());
+        assert_eq!(find_shortest_sep(&StandardComparator, "a".as_bytes(), "b".as_bytes()),
+                   "a".as_bytes());
+        assert_eq!(find_shortest_sep(&StandardComparator, "abc".as_bytes(), "zzz".as_bytes()),
+                   "b".as_bytes());
+        assert_eq!(find_shortest_sep(&StandardComparator, "".as_bytes(), "".as_bytes()),
+                   "".as_bytes());
+    }
+
+    #[test]
+    fn test_footer() {
+        let f = Footer::new(BlockHandle::new(55, 5));
+        let mut buf = [0; 48];
+        f.encode(&mut buf[..]);
+
+        let f2 = Footer::decode(&buf);
+        assert_eq!(f2.index.offset(), 55);
+        assert_eq!(f2.index.size(), 5);
+
+    }
+
+    #[test]
+    fn test_table_builder() {
+        let mut d = Vec::with_capacity(512);
+        let mut opt = Options::default();
+        opt.block_restart_interval = 3;
+        let mut b = TableBuilder::new(opt, StandardComparator, &mut d);
+
+        let data = vec![("abc", "def"), ("abd", "dee"), ("bcd", "asa"), ("bsr", "a00")];
+
+        for &(k, v) in data.iter() {
+            b.add(k.as_bytes(), v.as_bytes());
+        }
+
+        b.finish();
+    }
+
+    #[test]
+    #[should_panic]
+    fn test_bad_input() {
+        let mut d = Vec::with_capacity(512);
+        let mut opt = Options::default();
+        opt.block_restart_interval = 3;
+        let mut b = TableBuilder::new(opt, StandardComparator, &mut d);
+
+        // Test two equal consecutive keys
+        let data = vec![("abc", "def"), ("abc", "dee"), ("bcd", "asa"), ("bsr", "a00")];
+
+        for &(k, v) in data.iter() {
+            b.add(k.as_bytes(), v.as_bytes());
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/table_reader.rs	Sat Nov 19 12:55:46 2016 +0100
@@ -0,0 +1,366 @@
+use block::{Block, BlockIter};
+use blockhandle::BlockHandle;
+use table_builder::{self, Footer};
+use iterator::{Comparator, SSIterator};
+
+use std::io::{Read, Seek, SeekFrom, Result};
+use std::cmp::Ordering;
+
+/// Reads the table footer.
+fn read_footer<R: Read + Seek>(f: &mut R, size: usize) -> Result<Footer> {
+    try!(f.seek(SeekFrom::Start((size - table_builder::FULL_FOOTER_LENGTH) as u64)));
+    let mut buf = [0; table_builder::FULL_FOOTER_LENGTH];
+    try!(f.read_exact(&mut buf));
+    Ok(Footer::decode(&buf))
+}
+
+fn read_bytes<R: Read + Seek>(f: &mut R, location: &BlockHandle) -> Result<Vec<u8>> {
+    try!(f.seek(SeekFrom::Start(location.offset() as u64)));
+
+    let mut buf = Vec::new();
+    buf.resize(location.size(), 0);
+
+    try!(f.read_exact(&mut buf[0..location.size()]));
+
+    Ok(buf)
+}
+
+/// Reads a block at location.
+fn read_block<R: Read + Seek, C: Comparator>(cmp: &C,
+                                             f: &mut R,
+                                             location: &BlockHandle)
+                                             -> Result<Block<C>> {
+    let buf = try!(read_bytes(f, location));
+    Ok(Block::new(buf, *cmp))
+}
+
+pub struct Table<R: Read + Seek, C: Comparator> {
+    file: R,
+    file_size: usize,
+
+    cmp: C,
+
+    indexblock: Block<C>,
+}
+
+impl<R: Read + Seek, C: Comparator> Table<R, C> {
+    pub fn new(mut file: R, size: usize, cmp: C) -> Result<Table<R, C>> {
+        let footer = try!(read_footer(&mut file, size));
+
+        let indexblock = try!(read_block(&cmp, &mut file, &footer.index));
+
+        Ok(Table {
+            file: file,
+            file_size: size,
+            cmp: cmp,
+            indexblock: indexblock,
+        })
+    }
+
+    fn read_block_(&mut self, location: &BlockHandle) -> Result<Block<C>> {
+        read_block(&self.cmp, &mut self.file, location)
+    }
+
+    /// Returns the offset of the block that contains `key`.
+    pub fn approx_offset_of(&self, key: &[u8]) -> usize {
+        let mut iter = self.indexblock.iter();
+
+        iter.seek(key);
+
+        if let Some((_, val)) = iter.current() {
+            let location = BlockHandle::decode(&val).0;
+            return location.offset();
+        }
+
+        return self.file_size;
+    }
+
+    // Iterators read from the file; thus only one iterator can be borrowed (mutably) per scope
+    fn iter<'a>(&'a mut self) -> TableIterator<'a, R, C> {
+        let mut iter = TableIterator {
+            current_block: self.indexblock.iter(), // just for filling in here
+            index_block: self.indexblock.iter(),
+            table: self,
+            init: false,
+        };
+        iter.skip_to_next_entry();
+        iter
+    }
+
+    pub fn get(&mut self, key: &[u8]) -> Option<Vec<u8>> {
+        let mut iter = self.iter();
+
+        iter.seek(key);
+
+        if let Some((k, v)) = iter.current() {
+            if k == key {
+                return Some(v);
+            }
+        }
+        return None;
+    }
+}
+
+/// This iterator is a "TwoLevelIterator"; it uses an index block in order to get an offset hint
+/// into the data blocks.
+pub struct TableIterator<'a, R: 'a + Read + Seek, C: 'a + Comparator> {
+    table: &'a mut Table<R, C>,
+    current_block: BlockIter<C>,
+    index_block: BlockIter<C>,
+
+    init: bool,
+}
+
+impl<'a, C: Comparator, R: Read + Seek> TableIterator<'a, R, C> {
+    // Skips to the entry referenced by the next entry in the index block.
+    // This is called once a block has run out of entries.
+    fn skip_to_next_entry(&mut self) -> bool {
+        if let Some((_key, val)) = self.index_block.next() {
+            self.load_block(&val).is_ok()
+        } else {
+            false
+        }
+    }
+
+    // Load the block at `handle` into `self.current_block`
+    fn load_block(&mut self, handle: &[u8]) -> Result<()> {
+        let (new_block_handle, _) = BlockHandle::decode(handle);
+
+        let block = try!(self.table.read_block_(&new_block_handle));
+        self.current_block = block.iter();
+
+        Ok(())
+    }
+}
+
+impl<'a, C: Comparator, R: Read + Seek> Iterator for TableIterator<'a, R, C> {
+    type Item = (Vec<u8>, Vec<u8>);
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.init = true;
+        if let Some((key, val)) = self.current_block.next() {
+            Some((key, val))
+        } else {
+            if self.skip_to_next_entry() {
+                self.next()
+            } else {
+                None
+            }
+        }
+    }
+}
+
+impl<'a, C: Comparator, R: Read + Seek> SSIterator for TableIterator<'a, R, C> {
+    // A call to valid() after seeking is necessary to ensure that the seek worked (e.g., no error
+    // while reading from disk)
+    fn seek(&mut self, to: &[u8]) {
+        // first seek in index block, then set current_block and seek there
+
+        self.index_block.seek(to);
+
+        if let Some((k, _)) = self.index_block.current() {
+            if self.table.cmp.cmp(to, &k) <= Ordering::Equal {
+                // ok, found right block: continue below
+            } else {
+                self.reset();
+            }
+        } else {
+            panic!("Unexpected None from current() (bug)");
+        }
+
+        // Read block and seek to entry in that block
+        if let Some((k, handle)) = self.index_block.current() {
+            assert!(self.table.cmp.cmp(to, &k) <= Ordering::Equal);
+
+            if let Ok(()) = self.load_block(&handle) {
+                self.current_block.seek(to);
+                self.init = true;
+            } else {
+                self.reset();
+            }
+        }
+    }
+
+    fn prev(&mut self) -> Option<Self::Item> {
+        // happy path: current block contains previous entry
+        if let Some(result) = self.current_block.prev() {
+            Some(result)
+        } else {
+            // Go back one block and look for the last entry in the previous block
+            if let Some((_, handle)) = self.index_block.prev() {
+                if self.load_block(&handle).is_ok() {
+                    self.current_block.seek_to_last();
+                    self.current_block.current()
+                } else {
+                    self.reset();
+                    None
+                }
+            } else {
+                None
+            }
+        }
+    }
+
+    fn reset(&mut self) {
+        self.index_block.reset();
+        self.init = false;
+        self.skip_to_next_entry();
+    }
+
+    // This iterator is special in that it's valid even before the first call to next(). It behaves
+    // correctly, though.
+    fn valid(&self) -> bool {
+        self.init && (self.current_block.valid() || self.index_block.valid())
+    }
+
+    fn current(&self) -> Option<Self::Item> {
+        self.current_block.current()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use options::Options;
+    use table_builder::TableBuilder;
+    use iterator::{StandardComparator, SSIterator};
+
+    use std::io::Cursor;
+
+    use super::*;
+
+    fn build_data() -> Vec<(&'static str, &'static str)> {
+        vec![("abc", "def"),
+             ("abd", "dee"),
+             ("bcd", "asa"),
+             ("bsr", "a00"),
+             ("xyz", "xxx"),
+             ("xzz", "yyy"),
+             ("zzz", "111")]
+    }
+
+
+    fn build_table() -> (Vec<u8>, usize) {
+        let mut d = Vec::with_capacity(512);
+        let mut opt = Options::default();
+        opt.block_restart_interval = 2;
+        opt.block_size = 64;
+
+        {
+            let mut b = TableBuilder::new(opt, StandardComparator, &mut d);
+            let data = build_data();
+
+            for &(k, v) in data.iter() {
+                b.add(k.as_bytes(), v.as_bytes());
+            }
+
+            b.finish();
+        }
+
+        let size = d.len();
+
+        (d, size)
+    }
+
+    #[test]
+    fn test_table_iterator_fwd() {
+        let (src, size) = build_table();
+        let data = build_data();
+
+        let mut table = Table::new(Cursor::new(&src as &[u8]), size, StandardComparator).unwrap();
+        let iter = table.iter();
+        let mut i = 0;
+
+        for (k, v) in iter {
+            assert_eq!((data[i].0.as_bytes(), data[i].1.as_bytes()),
+                       (k.as_ref(), v.as_ref()));
+            i += 1;
+        }
+    }
+
+    #[test]
+    fn test_table_get() {
+        let (src, size) = build_table();
+
+        let mut table = Table::new(Cursor::new(&src as &[u8]), size, StandardComparator).unwrap();
+
+        assert_eq!(table.get("abc".as_bytes()), Some("def".as_bytes().to_vec()));
+        assert_eq!(table.get("zzz".as_bytes()), Some("111".as_bytes().to_vec()));
+        assert_eq!(table.get("xzz".as_bytes()), Some("yyy".as_bytes().to_vec()));
+        assert_eq!(table.get("xzy".as_bytes()), None);
+    }
+
+    #[test]
+    fn test_table_iterator_state_behavior() {
+        let (src, size) = build_table();
+
+        let mut table = Table::new(Cursor::new(&src as &[u8]), size, StandardComparator).unwrap();
+        let mut iter = table.iter();
+
+        // behavior test
+
+        // See comment on valid()
+        assert!(!iter.valid());
+        assert!(iter.current().is_none());
+
+        assert!(iter.next().is_some());
+        assert!(iter.valid());
+        assert!(iter.current().is_some());
+
+        assert!(iter.next().is_some());
+        assert!(iter.prev().is_some());
+        assert!(iter.current().is_some());
+
+        iter.reset();
+        assert!(!iter.valid());
+        assert!(iter.current().is_none());
+    }
+
+    #[test]
+    fn test_table_iterator_values() {
+        let (src, size) = build_table();
+        let data = build_data();
+
+        let mut table = Table::new(Cursor::new(&src as &[u8]), size, StandardComparator).unwrap();
+        let mut iter = table.iter();
+        let mut i = 0;
+
+        iter.next();
+        iter.next();
+
+        // Go back to previous entry, check, go forward two entries, repeat
+        // Verifies that prev/next works well.
+        while iter.valid() && i < data.len() {
+            iter.prev();
+
+            if let Some((k, v)) = iter.current() {
+                assert_eq!((data[i].0.as_bytes(), data[i].1.as_bytes()),
+                           (k.as_ref(), v.as_ref()));
+            } else {
+                break;
+            }
+
+            i += 1;
+            iter.next();
+            iter.next();
+        }
+
+        assert_eq!(i, 7);
+    }
+
+    #[test]
+    fn test_table_iterator_seek() {
+        let (src, size) = build_table();
+
+        let mut table = Table::new(Cursor::new(&src as &[u8]), size, StandardComparator).unwrap();
+        let mut iter = table.iter();
+
+        iter.seek("bcd".as_bytes());
+        assert!(iter.valid());
+        assert_eq!(iter.current(),
+                   Some(("bcd".as_bytes().to_vec(), "asa".as_bytes().to_vec())));
+        iter.seek("abc".as_bytes());
+        assert!(iter.valid());
+        assert_eq!(iter.current(),
+                   Some(("abc".as_bytes().to_vec(), "def".as_bytes().to_vec())));
+    }
+}