changeset 123:bab23aa835e0

Finally integrate comparators properly everywhere
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 31 Dec 2016 15:33:20 +0100
parents 14dbd87dd144
children 778b1fa2063e
files src/block.rs src/key_types.rs src/memtable.rs src/merging_iter.rs src/options.rs src/skipmap.rs src/table_builder.rs src/table_reader.rs src/test_util.rs src/types.rs
diffstat 10 files changed, 247 insertions(+), 150 deletions(-) [+]
line wrap: on
line diff
--- a/src/block.rs	Mon Dec 26 11:22:17 2016 +0000
+++ b/src/block.rs	Sat Dec 31 15:33:20 2016 +0100
@@ -3,7 +3,7 @@
 use std::rc::Rc;
 
 use options::Options;
-use types::{LdbIterator, cmp};
+use types::LdbIterator;
 
 use integer_encoding::FixedInt;
 use integer_encoding::VarInt;
@@ -28,6 +28,7 @@
 /// N_RESTARTS contains the number of restarts.
 pub struct Block {
     block: Rc<BlockContents>,
+    opt: Options,
 }
 
 impl Block {
@@ -37,6 +38,8 @@
 
         BlockIter {
             block: self.block.clone(),
+            opt: self.opt.clone(),
+
             offset: 0,
             restarts_off: restart_offset,
             current_entry_offset: 0,
@@ -51,14 +54,18 @@
         self.block.clone()
     }
 
-    pub fn new(contents: BlockContents) -> Block {
+    pub fn new(opt: Options, contents: BlockContents) -> Block {
         assert!(contents.len() > 4);
-        Block { block: Rc::new(contents) }
+        Block {
+            block: Rc::new(contents),
+            opt: opt,
+        }
     }
 }
 
 pub struct BlockIter {
     block: Rc<BlockContents>,
+    opt: Options,
     // start of next entry
     offset: usize,
     // offset of restarts area
@@ -205,7 +212,7 @@
             // At a restart, the shared part is supposed to be 0.
             assert_eq!(shared, 0);
 
-            let c = cmp(to, &self.block[self.offset..self.offset + non_shared]);
+            let c = self.opt.cmp.cmp(to, &self.block[self.offset..self.offset + non_shared]);
 
             if c == Ordering::Less {
                 right = middle - 1;
@@ -220,7 +227,7 @@
 
         // Linear search from here on
         while let Some((k, _)) = self.next() {
-            if cmp(k.as_slice(), to) >= Ordering::Equal {
+            if self.opt.cmp.cmp(k.as_slice(), to) >= Ordering::Equal {
                 return;
             }
         }
@@ -283,7 +290,8 @@
 
     pub fn add(&mut self, key: &[u8], val: &[u8]) {
         assert!(self.counter <= self.opt.block_restart_interval);
-        assert!(self.buffer.is_empty() || cmp(self.last_key.as_slice(), key) == Ordering::Less);
+        assert!(self.buffer.is_empty() ||
+                self.opt.cmp.cmp(self.last_key.as_slice(), key) == Ordering::Less);
 
         let mut shared = 0;
 
@@ -387,7 +395,7 @@
         assert_eq!(blockc.len(), 8);
         assert_eq!(blockc, vec![0, 0, 0, 0, 1, 0, 0, 0]);
 
-        let block = Block::new(blockc);
+        let block = Block::new(Options::default(), blockc);
 
         for _ in block.iter() {
             panic!("expected 0 iterations");
@@ -404,7 +412,7 @@
         }
 
         let block_contents = builder.finish();
-        let block = Block::new(block_contents).iter();
+        let block = Block::new(Options::default(), block_contents).iter();
         let mut i = 0;
 
         assert!(!block.valid());
@@ -422,14 +430,14 @@
         let mut o = Options::default();
         o.block_restart_interval = 3;
         let data = get_data();
-        let mut builder = BlockBuilder::new(o);
+        let mut builder = BlockBuilder::new(o.clone());
 
         for &(k, v) in data.iter() {
             builder.add(k, v);
         }
 
         let block_contents = builder.finish();
-        let mut block = Block::new(block_contents).iter();
+        let mut block = Block::new(o.clone(), block_contents).iter();
 
         assert!(!block.valid());
         assert_eq!(block.next(),
@@ -451,7 +459,7 @@
         o.block_restart_interval = 3;
 
         let data = get_data();
-        let mut builder = BlockBuilder::new(o);
+        let mut builder = BlockBuilder::new(o.clone());
 
         for &(k, v) in data.iter() {
             builder.add(k, v);
@@ -459,7 +467,7 @@
 
         let block_contents = builder.finish();
 
-        let mut block = Block::new(block_contents).iter();
+        let mut block = Block::new(o.clone(), block_contents).iter();
 
         block.seek(&"prefix_key2".as_bytes());
         assert!(block.valid());
@@ -494,7 +502,7 @@
 
             let block_contents = builder.finish();
 
-            let mut block = Block::new(block_contents).iter();
+            let mut block = Block::new(o.clone(), block_contents).iter();
 
             block.seek_to_last();
             assert!(block.valid());
--- a/src/key_types.rs	Mon Dec 26 11:22:17 2016 +0000
+++ b/src/key_types.rs	Sat Dec 31 15:33:20 2016 +0100
@@ -1,7 +1,8 @@
 use options::{CompressionType, int_to_compressiontype};
-use types::{ValueType, SequenceNumber, cmp};
+use types::{ValueType, SequenceNumber, Cmp};
 
 use std::cmp::Ordering;
+use std::sync::Arc;
 
 use integer_encoding::{FixedInt, VarInt};
 
@@ -28,6 +29,7 @@
 /// A LookupKey is the first part of a memtable key, consisting of [keylen: varint32, key: *u8,
 /// tag: u64]
 /// keylen is the length of key plus 8 (for the tag; this for LevelDB compatibility)
+#[derive(Clone, Debug)]
 pub struct LookupKey {
     key: Vec<u8>,
     key_offset: usize,
@@ -146,6 +148,7 @@
     }
 }
 
+/// Parse a key in InternalKey format.
 pub fn parse_internal_key<'a>(ikey: InternalKey<'a>) -> (CompressionType, u64, UserKey<'a>) {
     assert!(ikey.len() >= 8);
 
@@ -155,40 +158,51 @@
     return (ctype, seq, &ikey[0..ikey.len() - 8]);
 }
 
+/// Same as memtable_key_cmp, but for InternalKeys.
+#[derive(Clone)]
+pub struct InternalKeyCmp(pub Arc<Box<Cmp>>);
+
+impl Cmp for InternalKeyCmp {
+    fn cmp(&self, a: &[u8], b: &[u8]) -> Ordering {
+        let (_, seqa, keya) = parse_internal_key(a);
+        let (_, seqb, keyb) = parse_internal_key(b);
+
+        match self.0.cmp(keya, keyb) {
+            Ordering::Less => Ordering::Less,
+            Ordering::Greater => Ordering::Greater,
+            // reverse comparison!
+            Ordering::Equal => seqb.cmp(&seqa),
+        }
+    }
+}
+
 /// An internal comparator wrapping a user-supplied comparator. This comparator is used to compare
 /// memtable keys, which contain length prefixes and a sequence number.
 /// The ordering is determined by asking the wrapped comparator; ties are broken by *reverse*
 /// ordering the sequence numbers. (This means that when having an entry abx/4 and searching for
 /// abx/5, then abx/4 is counted as "greater-or-equal", making snapshot functionality work at all)
-pub fn memtable_key_cmp(a: &[u8], b: &[u8]) -> Ordering {
-    let (akeylen, akeyoff, atag, _, _) = parse_memtable_key(a);
-    let (bkeylen, bkeyoff, btag, _, _) = parse_memtable_key(b);
-
-    let userkey_a = &a[akeyoff..akeyoff + akeylen];
-    let userkey_b = &b[bkeyoff..bkeyoff + bkeylen];
+#[derive(Clone)]
+pub struct MemtableKeyCmp(pub Arc<Box<Cmp>>);
 
-    match cmp(userkey_a, userkey_b) {
-        Ordering::Less => Ordering::Less,
-        Ordering::Greater => Ordering::Greater,
-        Ordering::Equal => {
-            let (_, aseq) = parse_tag(atag);
-            let (_, bseq) = parse_tag(btag);
+impl Cmp for MemtableKeyCmp {
+    fn cmp(&self, a: &[u8], b: &[u8]) -> Ordering {
+        let (akeylen, akeyoff, atag, _, _) = parse_memtable_key(a);
+        let (bkeylen, bkeyoff, btag, _, _) = parse_memtable_key(b);
+
+        let userkey_a = &a[akeyoff..akeyoff + akeylen];
+        let userkey_b = &b[bkeyoff..bkeyoff + bkeylen];
 
-            // reverse!
-            bseq.cmp(&aseq)
-        }
-    }
-}
+        match self.0.cmp(userkey_a, userkey_b) {
+            Ordering::Less => Ordering::Less,
+            Ordering::Greater => Ordering::Greater,
+            Ordering::Equal => {
+                let (_, aseq) = parse_tag(atag);
+                let (_, bseq) = parse_tag(btag);
 
-/// Same as memtable_key_cmp, but for InternalKeys.
-pub fn internal_key_cmp(a: &[u8], b: &[u8]) -> Ordering {
-    let (_, seqa, keya) = parse_internal_key(a);
-    let (_, seqb, keyb) = parse_internal_key(b);
-
-    match cmp(keya, keyb) {
-        Ordering::Less => Ordering::Less,
-        Ordering::Greater => Ordering::Greater,
-        Ordering::Equal => seqb.cmp(&seqa),
+                // reverse!
+                bseq.cmp(&aseq)
+            }
+        }
     }
 }
 
--- a/src/memtable.rs	Mon Dec 26 11:22:17 2016 +0000
+++ b/src/memtable.rs	Sat Dec 31 15:33:20 2016 +0100
@@ -1,19 +1,34 @@
-use std::cmp::Ordering;
+use key_types::{LookupKey, UserKey, InternalKey, MemtableKey, MemtableKeyCmp, parse_memtable_key,
+                build_memtable_key};
+use types::{ValueType, SequenceNumber, Status, LdbIterator};
+use skipmap::{SkipMap, SkipMapIter};
+use options::Options;
 
-use key_types::{LookupKey, UserKey, InternalKey, MemtableKey, parse_memtable_key,
-                build_memtable_key};
-use types::{ValueType, SequenceNumber, Status, LdbIterator, cmp};
-use skipmap::{SkipMap, SkipMapIter};
+use std::sync::Arc;
 
 /// Provides Insert/Get/Iterate, based on the SkipMap implementation.
 /// MemTable uses MemtableKeys internally, that is, it stores key and value in the [Skipmap] key.
 pub struct MemTable {
     map: SkipMap,
+    opt: Options,
 }
 
 impl MemTable {
-    pub fn new() -> MemTable {
-        MemTable { map: SkipMap::new_memtable_map() }
+    /// Returns a new MemTable.
+    /// This wraps opt.cmp inside a MemtableKey-specific comparator.
+    pub fn new(mut opt: Options) -> MemTable {
+        opt.cmp = Arc::new(Box::new(MemtableKeyCmp(opt.cmp.clone())));
+        MemTable::new_raw(opt)
+    }
+
+    /// Doesn't wrap the comparator in a MemtableKeyCmp.
+    fn new_raw(opt: Options) -> MemTable {
+        // Not using SkipMap::new_memtable_map(), as opt.cmp will already be wrapped by
+        // MemTable::new()
+        MemTable {
+            map: SkipMap::new(opt.clone()),
+            opt: opt,
+        }
     }
     pub fn approx_mem_usage(&self) -> usize {
         self.map.approx_memory()
@@ -30,12 +45,13 @@
 
         if let Some(e) = iter.current() {
             let foundkey: MemtableKey = e.0;
-            let (lkeylen, lkeyoff, _, _, _) = parse_memtable_key(key.memtable_key());
+            // let (lkeylen, lkeyoff, _, _, _) = parse_memtable_key(key.memtable_key());
             let (fkeylen, fkeyoff, tag, vallen, valoff) = parse_memtable_key(foundkey);
 
             // Compare user key -- if equal, proceed
-            if cmp(&key.memtable_key()[lkeyoff..lkeyoff + lkeylen],
-                   &foundkey[fkeyoff..fkeyoff + fkeylen]) == Ordering::Equal {
+            println!("{:?}", (key, foundkey));
+            // equality doesn't need custom comparator
+            if key.user_key() == &foundkey[fkeyoff..fkeyoff + fkeylen] {
                 if tag & 0xff == ValueType::TypeValue as u64 {
                     return Result::Ok(foundkey[valoff..valoff + vallen].to_vec());
                 } else {
@@ -132,9 +148,10 @@
     use super::*;
     use key_types::*;
     use types::*;
+    use options::Options;
 
     fn get_memtable() -> MemTable {
-        let mut mt = MemTable::new();
+        let mut mt = MemTable::new(Options::default());
         let entries = vec![(115, "abc", "122"),
                            (120, "abc", "123"),
                            (121, "abd", "124"),
@@ -155,7 +172,7 @@
 
     #[test]
     fn test_memtable_add() {
-        let mut mt = MemTable::new();
+        let mut mt = MemTable::new_raw(Options::default());
         mt.add(123,
                ValueType::TypeValue,
                "abc".as_bytes(),
--- a/src/merging_iter.rs	Mon Dec 26 11:22:17 2016 +0000
+++ b/src/merging_iter.rs	Sat Dec 31 15:33:20 2016 +0100
@@ -1,6 +1,8 @@
-use types::{cmp, LdbIterator};
+use options::Options;
+use types::{Cmp, LdbIterator};
 
 use std::cmp::Ordering;
+use std::sync::Arc;
 
 // Warning: This module is kinda messy. The original implementation is
 // not that much better though :-)
@@ -24,16 +26,19 @@
     iters: Vec<&'a mut LdbIterator<Item = (&'b [u8], &'b [u8])>>,
     current: Option<usize>,
     direction: Direction,
+    cmp: Arc<Box<Cmp>>,
 }
 
 impl<'a, 'b: 'a> MergingIter<'a, 'b> {
     /// Construct a new merging iterator.
-    pub fn new(iters: Vec<&'a mut LdbIterator<Item = (&'b [u8], &'b [u8])>>)
+    pub fn new(opt: Options,
+               iters: Vec<&'a mut LdbIterator<Item = (&'b [u8], &'b [u8])>>)
                -> MergingIter<'a, 'b> {
         let mi = MergingIter {
             iters: iters,
             current: None,
             direction: Direction::Fwd,
+            cmp: opt.cmp,
         };
         mi
     }
@@ -60,7 +65,7 @@
                             if i != current {
                                 self.iters[i].seek(key);
                                 if let Some((current_key, _)) = self.iters[i].current() {
-                                    if cmp(current_key, key) == Ordering::Equal {
+                                    if self.cmp.cmp(current_key, key) == Ordering::Equal {
                                         self.iters[i].next();
                                     }
                                 }
@@ -105,7 +110,7 @@
         for i in 1..self.iters.len() {
             if let Some(current) = self.iters[i].current() {
                 if let Some(smallest) = self.iters[next_ix].current() {
-                    if cmp(current.0, smallest.0) == ord {
+                    if self.cmp.cmp(current.0, smallest.0) == ord {
                         next_ix = i;
                     }
                 } else {
@@ -184,6 +189,7 @@
 mod tests {
     use super::*;
 
+    use options::Options;
     use test_util::TestLdbIter;
     use types::LdbIterator;
     use skipmap::tests;
@@ -194,7 +200,7 @@
         let mut iter = skm.iter();
         let mut iter2 = skm.iter();
 
-        let mut miter = MergingIter::new(vec![&mut iter]);
+        let mut miter = MergingIter::new(Options::default(), vec![&mut iter]);
 
         loop {
             if let Some((k, v)) = miter.next() {
@@ -216,7 +222,7 @@
         let mut iter = skm.iter();
         let mut iter2 = skm.iter();
 
-        let mut miter = MergingIter::new(vec![&mut iter, &mut iter2]);
+        let mut miter = MergingIter::new(Options::default(), vec![&mut iter, &mut iter2]);
 
         loop {
             if let Some((k, v)) = miter.next() {
@@ -238,7 +244,7 @@
         let mut iter = skm.iter();
         let mut iter2 = skm.iter();
 
-        let mut miter = MergingIter::new(vec![&mut iter, &mut iter2]);
+        let mut miter = MergingIter::new(Options::default(), vec![&mut iter, &mut iter2]);
 
         let first = miter.next();
         miter.next();
@@ -261,7 +267,7 @@
         let mut it2 = TestLdbIter::new(vec![(b("abb"), val), (b("abd"), val)]);
         let expected = vec![b("aba"), b("abb"), b("abc"), b("abd"), b("abe")];
 
-        let iter = MergingIter::new(vec![&mut it1, &mut it2]);
+        let iter = MergingIter::new(Options::default(), vec![&mut it1, &mut it2]);
 
         let mut i = 0;
         for (k, _) in iter {
@@ -278,7 +284,7 @@
         let mut it1 = TestLdbIter::new(vec![(b("aba"), val), (b("abc"), val), (b("abe"), val)]);
         let mut it2 = TestLdbIter::new(vec![(b("abb"), val), (b("abd"), val)]);
 
-        let mut iter = MergingIter::new(vec![&mut it1, &mut it2]);
+        let mut iter = MergingIter::new(Options::default(), vec![&mut it1, &mut it2]);
 
         assert!(!iter.valid());
         iter.next();
@@ -305,7 +311,7 @@
         let mut it1 = TestLdbIter::new(vec![(b("aba"), val), (b("abc"), val), (b("abe"), val)]);
         let mut it2 = TestLdbIter::new(vec![(b("abb"), val), (b("abd"), val)]);
 
-        let mut iter = MergingIter::new(vec![&mut it1, &mut it2]);
+        let mut iter = MergingIter::new(Options::default(), vec![&mut it1, &mut it2]);
 
         iter.next();
         iter.next();
--- a/src/options.rs	Mon Dec 26 11:22:17 2016 +0000
+++ b/src/options.rs	Sat Dec 31 15:33:20 2016 +0100
@@ -1,10 +1,9 @@
 use block::Block;
 use cache::Cache;
-use types::SequenceNumber;
+use types::{Cmp, DefaultCmp, SequenceNumber};
 
 use std::default::Default;
-use std::sync::Mutex;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 
 const KB: usize = 1 << 10;
 const MB: usize = KB * KB;
@@ -31,6 +30,10 @@
 ///
 #[derive(Clone)]
 pub struct Options {
+    // NOTE: do NOT set this to something different than DefaultCmp, otherwise some things will
+    // break (at the moment). Comparators would need extra functionality to fix this (e.g., string
+    // separator finding)
+    pub cmp: Arc<Box<Cmp>>,
     pub create_if_missing: bool,
     pub error_if_exists: bool,
     pub paranoid_checks: bool,
@@ -47,6 +50,7 @@
 impl Default for Options {
     fn default() -> Options {
         Options {
+            cmp: Arc::new(Box::new(DefaultCmp)),
             create_if_missing: true,
             error_if_exists: false,
             paranoid_checks: false,
--- a/src/skipmap.rs	Mon Dec 26 11:22:17 2016 +0000
+++ b/src/skipmap.rs	Sat Dec 31 15:33:20 2016 +0100
@@ -1,9 +1,12 @@
-use types::{cmp, CmpFn, LdbIterator};
+
+use key_types::MemtableKeyCmp;
+use options::Options;
+use types::LdbIterator;
 use rand::{Rng, SeedableRng, StdRng};
-use key_types::memtable_key_cmp;
 
 use std::cmp::Ordering;
 use std::mem::{replace, size_of};
+use std::sync::Arc;
 
 const MAX_HEIGHT: usize = 12;
 const BRANCHING_FACTOR: u32 = 4;
@@ -27,19 +30,19 @@
     len: usize,
     // approximation of memory used.
     approx_mem: usize,
-    cmp: Box<CmpFn>,
+    opt: Options,
 }
 
 impl SkipMap {
-    /// Used for testing: Uses the standard comparator.
-    pub fn new_memtable_map() -> SkipMap {
-        let mut skm = SkipMap::new();
-        skm.cmp = Box::new(memtable_key_cmp);
+    /// Returns a SkipMap that wraps the comparator from opt inside a MemtableKeyCmp
+    pub fn new_memtable_map(mut opt: Options) -> SkipMap {
+        opt.cmp = Arc::new(Box::new(MemtableKeyCmp(opt.cmp.clone())));
+        let skm = SkipMap::new(opt);
         skm
     }
 
-    /// Returns a SkipMap that uses the memtable comparator (see above).
-    fn new() -> SkipMap {
+    /// Returns a SkipMap that uses the comparator from opt
+    pub fn new(opt: Options) -> SkipMap {
         let mut s = Vec::new();
         s.resize(MAX_HEIGHT, None);
 
@@ -53,7 +56,7 @@
             rand: StdRng::from_seed(&[0xde, 0xad, 0xbe, 0xef]),
             len: 0,
             approx_mem: size_of::<Self>() + MAX_HEIGHT * size_of::<Option<*mut Node>>(),
-            cmp: Box::new(cmp),
+            opt: opt,
         }
     }
 
@@ -92,7 +95,7 @@
         loop {
             unsafe {
                 if let Some(next) = (*current).skips[level] {
-                    let ord = (self.cmp)((*next).key.as_slice(), key);
+                    let ord = self.opt.cmp.cmp((*next).key.as_slice(), key);
 
                     match ord {
                         Ordering::Less => {
@@ -117,7 +120,7 @@
         unsafe {
             if current.is_null() {
                 return None;
-            } else if (self.cmp)(&(*current).key, key) == Ordering::Less {
+            } else if self.opt.cmp.cmp(&(*current).key, key) == Ordering::Less {
                 return None;
             } else {
                 return Some(&(*current));
@@ -135,7 +138,7 @@
         loop {
             unsafe {
                 if let Some(next) = (*current).skips[level] {
-                    let ord = (self.cmp)((*next).key.as_slice(), key);
+                    let ord = self.opt.cmp.cmp((*next).key.as_slice(), key);
 
                     match ord {
                         Ordering::Less => {
@@ -156,7 +159,7 @@
             if current.is_null() || (*current).key.is_empty() {
                 // If we're past the end for some reason or at the head
                 return None;
-            } else if (self.cmp)(&(*current).key, key) != Ordering::Less {
+            } else if self.opt.cmp.cmp(&(*current).key, key) != Ordering::Less {
                 return None;
             } else {
                 return Some(&(*current));
@@ -182,7 +185,7 @@
             unsafe {
                 if let Some(next) = (*current).skips[level] {
                     // If the wanted position is after the current node
-                    let ord = (self.cmp)(&(*next).key, &key);
+                    let ord = self.opt.cmp.cmp(&(*next).key, &key);
 
                     assert!(ord != Ordering::Equal, "No duplicates allowed");
 
@@ -323,10 +326,11 @@
 #[cfg(test)]
 pub mod tests {
     use super::*;
+    use options::Options;
     use types::*;
 
     pub fn make_skipmap() -> SkipMap {
-        let mut skm = SkipMap::new();
+        let mut skm = SkipMap::new(Options::default());
         let keys = vec!["aba", "abb", "abc", "abd", "abe", "abf", "abg", "abh", "abi", "abj",
                         "abk", "abl", "abm", "abn", "abo", "abp", "abq", "abr", "abs", "abt",
                         "abu", "abv", "abw", "abx", "aby", "abz"];
@@ -386,7 +390,7 @@
 
     #[test]
     fn test_iterator_0() {
-        let skm = SkipMap::new();
+        let skm = SkipMap::new(Options::default());
         let mut i = 0;
 
         for (_, _) in skm.iter() {
--- a/src/table_builder.rs	Mon Dec 26 11:22:17 2016 +0000
+++ b/src/table_builder.rs	Sat Dec 31 15:33:20 2016 +0100
@@ -2,12 +2,13 @@
 use blockhandle::BlockHandle;
 use filter::{FilterPolicy, NoFilterPolicy};
 use filter_block::FilterBlockBuilder;
-use key_types::InternalKey;
 use options::{CompressionType, Options};
-use types::cmp;
+use key_types::{InternalKey, InternalKeyCmp};
+use types::Cmp;
 
 use std::io::Write;
 use std::cmp::Ordering;
+use std::sync::Arc;
 
 use crc::crc32;
 use crc::Hasher32;
@@ -21,7 +22,7 @@
 pub const TABLE_BLOCK_COMPRESS_LEN: usize = 1;
 pub const TABLE_BLOCK_CKSUM_LEN: usize = 4;
 
-fn find_shortest_sep<'a>(lo: InternalKey<'a>, hi: InternalKey<'a>) -> Vec<u8> {
+fn find_shortest_sep<'a>(cmp: &Arc<Box<Cmp>>, lo: InternalKey<'a>, hi: InternalKey<'a>) -> Vec<u8> {
     let min;
 
     if lo.len() < hi.len() {
@@ -40,9 +41,10 @@
         return Vec::from(lo);
     } else {
         if lo[diff_at] < 0xff && lo[diff_at] + 1 < hi[diff_at] {
-            let mut result = Vec::from(&lo[0..diff_at + 1]);
+            let mut result = lo.to_vec();
             result[diff_at] += 1;
-            assert_eq!(cmp(&result, hi), Ordering::Less);
+            println!("{:?}", (&result, hi));
+            assert_eq!(cmp.cmp(&result, hi), Ordering::Less);
             return result;
         }
         return Vec::from(lo);
@@ -124,7 +126,15 @@
 /// It's recommended that you use InternalFilterPolicy as FilterPol, as that policy extracts the
 /// underlying user keys from the InternalKeys used as keys in the table.
 impl<'a, Dst: Write, FilterPol: FilterPolicy> TableBuilder<'a, Dst, FilterPol> {
-    pub fn new(opt: Options, dst: Dst, fpol: FilterPol) -> TableBuilder<'a, Dst, FilterPol> {
+    /// Create a new table builder.
+    /// The comparator in opt will be wrapped in a InternalKeyCmp.
+    pub fn new(mut opt: Options, dst: Dst, fpol: FilterPol) -> TableBuilder<'a, Dst, FilterPol> {
+        opt.cmp = Arc::new(Box::new(InternalKeyCmp(opt.cmp.clone())));
+        TableBuilder::new_raw(opt, dst, fpol)
+    }
+
+    /// Like new(), but doesn't wrap the comparator in an InternalKeyCmp (for testing)
+    pub fn new_raw(opt: Options, dst: Dst, fpol: FilterPol) -> TableBuilder<'a, Dst, FilterPol> {
         TableBuilder {
             opt: opt.clone(),
             dst: dst,
@@ -141,9 +151,13 @@
         self.num_entries
     }
 
+    /// Add a key to the table. The key as to be lexically greater or equal to the last one added.
     pub fn add(&mut self, key: InternalKey<'a>, val: &[u8]) {
         assert!(self.data_block.is_some());
-        assert!(self.num_entries == 0 || cmp(&self.prev_block_last_key, key) == Ordering::Less);
+
+        if !self.prev_block_last_key.is_empty() {
+            assert!(self.opt.cmp.cmp(&self.prev_block_last_key, key) == Ordering::Less);
+        }
 
         if self.data_block.as_ref().unwrap().size_estimate() > self.opt.block_size {
             self.write_data_block(key);
@@ -166,7 +180,7 @@
         assert!(self.data_block.is_some());
 
         let block = self.data_block.take().unwrap();
-        let sep = find_shortest_sep(&block.last_key(), next_key);
+        let sep = find_shortest_sep(&self.opt.cmp, &block.last_key(), next_key);
         self.prev_block_last_key = Vec::from(block.last_key());
         let contents = block.finish();
 
@@ -216,7 +230,16 @@
 
         // If there's a pending data block, write it
         if self.data_block.as_ref().unwrap().entries() > 0 {
-            self.write_data_block(&[0xff as u8; 1]);
+            // Find a key reliably past the last key
+            // NOTE: This only works if the basic comparator is DefaultCmp. (not a problem as long
+            // as we don't accept comparators from users)
+            let mut past_block =
+                Vec::with_capacity(self.data_block.as_ref().unwrap().last_key().len() + 1);
+            // Push 255 to the beginning
+            past_block.extend_from_slice(&[0xff; 1]);
+            past_block.extend_from_slice(self.data_block.as_ref().unwrap().last_key());
+
+            self.write_data_block(&past_block);
         }
 
         // Create metaindex block
@@ -258,20 +281,24 @@
     use blockhandle::BlockHandle;
     use filter::BloomPolicy;
     use options::Options;
+    use types::{DefaultCmp, Cmp};
+
+    use std::sync::Arc;
 
     #[test]
     fn test_shortest_sep() {
-        assert_eq!(find_shortest_sep("abcd".as_bytes(), "abcf".as_bytes()),
+        let cmp = Arc::new(Box::new(DefaultCmp) as Box<Cmp>);
+        assert_eq!(find_shortest_sep(&cmp, "abcd".as_bytes(), "abcf".as_bytes()),
                    "abce".as_bytes());
-        assert_eq!(find_shortest_sep("abcdefghi".as_bytes(), "abcffghi".as_bytes()),
-                   "abce".as_bytes());
-        assert_eq!(find_shortest_sep("a".as_bytes(), "a".as_bytes()),
+        assert_eq!(find_shortest_sep(&cmp, "abcdefghi".as_bytes(), "abcffghi".as_bytes()),
+                   "abceefghi".as_bytes());
+        assert_eq!(find_shortest_sep(&cmp, "a".as_bytes(), "a".as_bytes()),
                    "a".as_bytes());
-        assert_eq!(find_shortest_sep("a".as_bytes(), "b".as_bytes()),
+        assert_eq!(find_shortest_sep(&cmp, "a".as_bytes(), "b".as_bytes()),
                    "a".as_bytes());
-        assert_eq!(find_shortest_sep("abc".as_bytes(), "zzz".as_bytes()),
-                   "b".as_bytes());
-        assert_eq!(find_shortest_sep("".as_bytes(), "".as_bytes()),
+        assert_eq!(find_shortest_sep(&cmp, "abc".as_bytes(), "zzz".as_bytes()),
+                   "bbc".as_bytes());
+        assert_eq!(find_shortest_sep(&cmp, "".as_bytes(), "".as_bytes()),
                    "".as_bytes());
     }
 
@@ -294,7 +321,7 @@
         let mut d = Vec::with_capacity(512);
         let mut opt = Options::default();
         opt.block_restart_interval = 3;
-        let mut b = TableBuilder::new(opt, &mut d, BloomPolicy::new(4));
+        let mut b = TableBuilder::new_raw(opt, &mut d, BloomPolicy::new(4));
 
         let data = vec![("abc", "def"), ("abd", "dee"), ("bcd", "asa"), ("bsr", "a00")];
 
@@ -312,7 +339,7 @@
         let mut d = Vec::with_capacity(512);
         let mut opt = Options::default();
         opt.block_restart_interval = 3;
-        let mut b = TableBuilder::new(opt, &mut d, BloomPolicy::new(4));
+        let mut b = TableBuilder::new_raw(opt, &mut d, BloomPolicy::new(4));
 
         // Test two equal consecutive keys
         let data = vec![("abc", "def"), ("abc", "dee"), ("bcd", "asa"), ("bsr", "a00")];
--- a/src/table_reader.rs	Mon Dec 26 11:22:17 2016 +0000
+++ b/src/table_reader.rs	Sat Dec 31 15:33:20 2016 +0100
@@ -3,13 +3,14 @@
 use cache::CacheID;
 use filter::{InternalFilterPolicy, FilterPolicy};
 use filter_block::FilterBlockReader;
+use key_types::{InternalKeyCmp, InternalKey};
 use options::{self, CompressionType, Options};
 use table_builder::{self, Footer};
-use types::{cmp, CmpFn, LdbIterator};
-use key_types::{internal_key_cmp, InternalKey};
+use types::LdbIterator;
 
+use std::cmp::Ordering;
 use std::io::{self, Read, Seek, SeekFrom, Result};
-use std::cmp::Ordering;
+use std::sync::Arc;
 
 use integer_encoding::FixedInt;
 use crc::crc32::{self, Hasher32};
@@ -41,7 +42,10 @@
 
 impl TableBlock {
     /// Reads a block at location.
-    fn read_block<R: Read + Seek>(f: &mut R, location: &BlockHandle) -> Result<TableBlock> {
+    fn read_block<R: Read + Seek>(opt: Options,
+                                  f: &mut R,
+                                  location: &BlockHandle)
+                                  -> Result<TableBlock> {
         // The block is denoted by offset and length in BlockHandle. A block in an encoded
         // table is followed by 1B compression type and 4B checksum.
         let buf = try!(read_bytes(f, location));
@@ -53,7 +57,7 @@
                                                       table_builder::TABLE_BLOCK_COMPRESS_LEN,
                                                       table_builder::TABLE_BLOCK_CKSUM_LEN)));
         Ok(TableBlock {
-            block: Block::new(buf),
+            block: Block::new(opt, buf),
             checksum: u32::decode_fixed(&cksum),
             compression: options::int_to_compressiontype(compress[0] as u32)
                 .unwrap_or(CompressionType::CompressionNone),
@@ -76,7 +80,6 @@
     cache_id: CacheID,
 
     opt: Options,
-    cmp: Box<CmpFn>,
 
     footer: Footer,
     indexblock: Block,
@@ -85,11 +88,12 @@
 
 impl<R: Read + Seek, FP: FilterPolicy> Table<R, FP> {
     /// Creates a new table reader operating on unformatted keys (i.e., UserKey).
-    fn new_raw(mut file: R, size: usize, fp: FP, opt: Options) -> Result<Table<R, FP>> {
+    fn new_raw(opt: Options, mut file: R, size: usize, fp: FP) -> Result<Table<R, FP>> {
         let footer = try!(read_footer(&mut file, size));
 
-        let indexblock = try!(TableBlock::read_block(&mut file, &footer.index));
-        let metaindexblock = try!(TableBlock::read_block(&mut file, &footer.meta_index));
+        let indexblock = try!(TableBlock::read_block(opt.clone(), &mut file, &footer.index));
+        let metaindexblock =
+            try!(TableBlock::read_block(opt.clone(), &mut file, &footer.meta_index));
 
         if !indexblock.verify() || !metaindexblock.verify() {
             return Err(io::Error::new(io::ErrorKind::InvalidData,
@@ -122,7 +126,6 @@
             file_size: size,
             cache_id: cache_id,
             opt: opt,
-            cmp: Box::new(cmp),
             footer: footer,
             filters: filter_block_reader,
             indexblock: indexblock.block,
@@ -132,18 +135,18 @@
     /// Creates a new table reader operating on internal keys (i.e., InternalKey). This means that
     /// a different comparator (internal_key_cmp) and a different filter policy
     /// (InternalFilterPolicy) are used.
-    pub fn new(file: R,
+    pub fn new(mut opt: Options,
+               file: R,
                size: usize,
-               fp: FP,
-               opt: Options)
+               fp: FP)
                -> Result<Table<R, InternalFilterPolicy<FP>>> {
-        let mut t = try!(Table::new_raw(file, size, InternalFilterPolicy::new(fp), opt));
-        t.cmp = Box::new(internal_key_cmp);
+        opt.cmp = Arc::new(Box::new(InternalKeyCmp(opt.cmp.clone())));
+        let t = try!(Table::new_raw(opt, file, size, InternalFilterPolicy::new(fp)));
         Ok(t)
     }
 
     fn read_block(&mut self, location: &BlockHandle) -> Result<TableBlock> {
-        let b = try!(TableBlock::read_block(&mut self.file, location));
+        let b = try!(TableBlock::read_block(self.opt.clone(), &mut self.file, location));
 
         if !b.verify() {
             Err(io::Error::new(io::ErrorKind::InvalidData, "Data block failed verification"))
@@ -172,6 +175,7 @@
             current_block: self.indexblock.iter(), // just for filling in here
             current_block_off: 0,
             index_block: self.indexblock.iter(),
+            opt: self.opt.clone(),
             table: self,
             init: false,
         };
@@ -221,6 +225,7 @@
 /// into the data blocks.
 pub struct TableIterator<'a, R: 'a + Read + Seek, FP: 'a + FilterPolicy> {
     table: &'a mut Table<R, FP>,
+    opt: Options,
     current_block: BlockIter,
     current_block_off: usize,
     index_block: BlockIter,
@@ -284,7 +289,7 @@
         self.index_block.seek(to);
 
         if let Some((past_block, handle)) = self.index_block.current() {
-            if (self.table.cmp)(to, &past_block) == Ordering::Less {
+            if self.opt.cmp.cmp(to, &past_block) == Ordering::Less {
                 // ok, found right block: continue
                 if let Ok(()) = self.load_block(&handle) {
                     self.current_block.seek(to);
@@ -365,7 +370,7 @@
              ("zzz", "111")]
     }
 
-
+    // Build a table containing raw keys (no format)
     fn build_table() -> (Vec<u8>, usize) {
         let mut d = Vec::with_capacity(512);
         let mut opt = Options::default();
@@ -373,7 +378,8 @@
         opt.block_size = 32;
 
         {
-            let mut b = TableBuilder::new(opt, &mut d, BloomPolicy::new(4));
+            // Uses the standard comparator in opt.
+            let mut b = TableBuilder::new_raw(opt, &mut d, BloomPolicy::new(4));
             let data = build_data();
 
             for &(k, v) in data.iter() {
@@ -389,8 +395,8 @@
         (d, size)
     }
 
+    // Build a table containing keys in InternalKey format.
     fn build_internal_table() -> (Vec<u8>, usize) {
-
         let mut d = Vec::with_capacity(512);
         let mut opt = Options::default();
         opt.block_restart_interval = 2;
@@ -406,6 +412,7 @@
             .collect();
 
         {
+            // Uses InternalKeyCmp
             let mut b =
                 TableBuilder::new(opt, &mut d, InternalFilterPolicy::new(BloomPolicy::new(4)));
 
@@ -429,10 +436,10 @@
 
         src[45] = 0;
 
-        let mut table = Table::new_raw(Cursor::new(&src as &[u8]),
+        let mut table = Table::new_raw(Options::default(),
+                                       Cursor::new(&src as &[u8]),
                                        size,
-                                       BloomPolicy::new(4),
-                                       Options::default())
+                                       BloomPolicy::new(4))
             .unwrap();
 
         assert!(table.filters.is_some());
@@ -463,10 +470,10 @@
         let (src, size) = build_table();
         let data = build_data();
 
-        let mut table = Table::new_raw(Cursor::new(&src as &[u8]),
+        let mut table = Table::new_raw(Options::default(),
+                                       Cursor::new(&src as &[u8]),
                                        size,
-                                       BloomPolicy::new(4),
-                                       Options::default())
+                                       BloomPolicy::new(4))
             .unwrap();
         let iter = table.iter();
         let mut i = 0;
@@ -484,10 +491,10 @@
     fn test_table_iterator_filter() {
         let (src, size) = build_table();
 
-        let mut table = Table::new_raw(Cursor::new(&src as &[u8]),
+        let mut table = Table::new_raw(Options::default(),
+                                       Cursor::new(&src as &[u8]),
                                        size,
-                                       BloomPolicy::new(4),
-                                       Options::default())
+                                       BloomPolicy::new(4))
             .unwrap();
         let filter_reader = table.filters.clone().unwrap();
         let mut iter = table.iter();
@@ -507,10 +514,10 @@
     fn test_table_iterator_state_behavior() {
         let (src, size) = build_table();
 
-        let mut table = Table::new_raw(Cursor::new(&src as &[u8]),
+        let mut table = Table::new_raw(Options::default(),
+                                       Cursor::new(&src as &[u8]),
                                        size,
-                                       BloomPolicy::new(4),
-                                       Options::default())
+                                       BloomPolicy::new(4))
             .unwrap();
         let mut iter = table.iter();
 
@@ -540,10 +547,10 @@
         let (src, size) = build_table();
         let data = build_data();
 
-        let mut table = Table::new_raw(Cursor::new(&src as &[u8]),
+        let mut table = Table::new_raw(Options::default(),
+                                       Cursor::new(&src as &[u8]),
                                        size,
-                                       BloomPolicy::new(4),
-                                       Options::default())
+                                       BloomPolicy::new(4))
             .unwrap();
         let mut iter = table.iter();
         let mut i = 0;
@@ -575,10 +582,10 @@
     fn test_table_iterator_seek() {
         let (src, size) = build_table();
 
-        let mut table = Table::new_raw(Cursor::new(&src as &[u8]),
+        let mut table = Table::new_raw(Options::default(),
+                                       Cursor::new(&src as &[u8]),
                                        size,
-                                       BloomPolicy::new(4),
-                                       Options::default())
+                                       BloomPolicy::new(4))
             .unwrap();
         let mut iter = table.iter();
 
@@ -596,10 +603,10 @@
     fn test_table_get() {
         let (src, size) = build_table();
 
-        let mut table = Table::new_raw(Cursor::new(&src as &[u8]),
+        let mut table = Table::new_raw(Options::default(),
+                                       Cursor::new(&src as &[u8]),
                                        size,
-                                       BloomPolicy::new(4),
-                                       Options::default())
+                                       BloomPolicy::new(4))
             .unwrap();
 
         assert!(table.get("aaa".as_bytes()).is_none());
@@ -621,10 +628,10 @@
 
         let (src, size) = build_internal_table();
 
-        let mut table = Table::new(Cursor::new(&src as &[u8]),
+        let mut table = Table::new(Options::default(),
+                                   Cursor::new(&src as &[u8]),
                                    size,
-                                   BloomPolicy::new(4),
-                                   Options::default())
+                                   BloomPolicy::new(4))
             .unwrap();
         let filter_reader = table.filters.clone().unwrap();
 
--- a/src/test_util.rs	Mon Dec 26 11:22:17 2016 +0000
+++ b/src/test_util.rs	Sat Dec 31 15:33:20 2016 +0100
@@ -1,4 +1,4 @@
-use types::{cmp, LdbIterator};
+use types::{Cmp, DefaultCmp, LdbIterator};
 use std::cmp::Ordering;
 
 pub struct TestLdbIter<'a> {
@@ -50,7 +50,7 @@
     }
     fn seek(&mut self, k: &[u8]) {
         self.ix = 0;
-        while self.ix < self.v.len() && cmp(self.v[self.ix].0, k) == Ordering::Less {
+        while self.ix < self.v.len() && DefaultCmp.cmp(self.v[self.ix].0, k) == Ordering::Less {
             self.ix += 1;
         }
     }
--- a/src/types.rs	Mon Dec 26 11:22:17 2016 +0000
+++ b/src/types.rs	Sat Dec 31 15:33:20 2016 +0100
@@ -21,10 +21,20 @@
     IOError(String),
 }
 
-pub type CmpFn = Fn(&[u8], &[u8]) -> Ordering;
+/// Comparator trait, supporting types that can be nested (i.e., add additional functionality on
+/// top of an inner comparator)
+pub trait Cmp {
+    fn cmp(&self, &[u8], &[u8]) -> Ordering;
+}
 
-pub fn cmp(a: &[u8], b: &[u8]) -> Ordering {
-    a.cmp(b)
+/// Lexical comparator.
+#[derive(Clone)]
+pub struct DefaultCmp;
+
+impl Cmp for DefaultCmp {
+    fn cmp(&self, a: &[u8], b: &[u8]) -> Ordering {
+        a.cmp(b)
+    }
 }
 
 pub struct Range<'a> {