changeset 47:e2e8dbf80106

Track newly added files.
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 11 Mar 2018 16:58:32 +0100
parents 3dac643ee26a
children bbb2e76ee581
files src/block_builder.rs src/cache.rs src/error.rs src/table_block.rs src/test_util.rs
diffstat 5 files changed, 935 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/block_builder.rs	Sun Mar 11 16:58:32 2018 +0100
@@ -0,0 +1,166 @@
+use std::cmp::Ordering;
+
+use block::BlockContents;
+use options::Options;
+
+use integer_encoding::{FixedIntWriter, VarIntWriter};
+
+/// BlockBuilder contains functionality for building a block consisting of consecutive key-value
+/// entries.
+pub struct BlockBuilder {
+    opt: Options,
+    buffer: Vec<u8>,
+    restarts: Vec<u32>,
+
+    last_key: Vec<u8>,
+    restart_counter: usize,
+    counter: usize,
+}
+
+impl BlockBuilder {
+    pub fn new(o: Options) -> BlockBuilder {
+        let mut restarts = vec![0];
+        restarts.reserve(1023);
+
+        BlockBuilder {
+            buffer: Vec::with_capacity(o.block_size),
+            opt: o,
+            restarts: restarts,
+            last_key: Vec::new(),
+            restart_counter: 0,
+            counter: 0,
+        }
+    }
+
+    pub fn entries(&self) -> usize {
+        self.counter
+    }
+
+    pub fn last_key<'a>(&'a self) -> &'a [u8] {
+        &self.last_key
+    }
+
+    pub fn size_estimate(&self) -> usize {
+        self.buffer.len() + 4 * self.restarts.len() + 4
+    }
+
+    pub fn add(&mut self, key: &[u8], val: &[u8]) {
+        assert!(self.restart_counter <= self.opt.block_restart_interval);
+        assert!(
+            self.buffer.is_empty()
+                || self.opt.cmp.cmp(self.last_key.as_slice(), key) == Ordering::Less
+        );
+
+        let mut shared = 0;
+
+        if self.restart_counter < self.opt.block_restart_interval {
+            let smallest = if self.last_key.len() < key.len() {
+                self.last_key.len()
+            } else {
+                key.len()
+            };
+
+            while shared < smallest && self.last_key[shared] == key[shared] {
+                shared += 1;
+            }
+        } else {
+            self.restarts.push(self.buffer.len() as u32);
+            self.last_key.resize(0, 0);
+            self.restart_counter = 0;
+        }
+
+        let non_shared = key.len() - shared;
+
+        self.buffer
+            .write_varint(shared)
+            .expect("write to buffer failed");
+        self.buffer
+            .write_varint(non_shared)
+            .expect("write to buffer failed");
+        self.buffer
+            .write_varint(val.len())
+            .expect("write to buffer failed");
+        self.buffer.extend_from_slice(&key[shared..]);
+        self.buffer.extend_from_slice(val);
+
+        // Update key
+        self.last_key.resize(shared, 0);
+        self.last_key.extend_from_slice(&key[shared..]);
+
+        self.restart_counter += 1;
+        self.counter += 1;
+    }
+
+    pub fn finish(mut self) -> BlockContents {
+        self.buffer.reserve(self.restarts.len() * 4 + 4);
+
+        // 1. Append RESTARTS
+        for r in self.restarts.iter() {
+            self.buffer
+                .write_fixedint(*r as u32)
+                .expect("write to buffer failed");
+        }
+
+        // 2. Append N_RESTARTS
+        self.buffer
+            .write_fixedint(self.restarts.len() as u32)
+            .expect("write to buffer failed");
+
+        // done
+        self.buffer
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn get_data() -> Vec<(&'static [u8], &'static [u8])> {
+        vec![
+            ("key1".as_bytes(), "value1".as_bytes()),
+            (
+                "loooooooooooooooooooooooooooooooooongerkey1".as_bytes(),
+                "shrtvl1".as_bytes(),
+            ),
+            ("medium length key 1".as_bytes(), "some value 2".as_bytes()),
+            ("prefix_key1".as_bytes(), "value".as_bytes()),
+            ("prefix_key2".as_bytes(), "value".as_bytes()),
+            ("prefix_key3".as_bytes(), "value".as_bytes()),
+        ]
+    }
+
+    #[test]
+    fn test_block_builder_sanity() {
+        let mut o = Options::default();
+        o.block_restart_interval = 3;
+        let mut builder = BlockBuilder::new(o);
+        let d = get_data();
+
+        for &(k, v) in d.iter() {
+            builder.add(k, v);
+            assert!(builder.restart_counter <= 3);
+            assert_eq!(builder.last_key(), k);
+        }
+
+        assert_eq!(149, builder.size_estimate());
+        assert_eq!(d.len(), builder.entries());
+
+        let block = builder.finish();
+        assert_eq!(block.len(), 149);
+    }
+
+    #[test]
+    #[should_panic]
+    fn test_block_builder_panics() {
+        let mut d = get_data();
+        // Identical key as d[3].
+        d[4].0 = b"prefix_key1";
+
+        let mut builder = BlockBuilder::new(Options::default());
+        for &(k, v) in d.iter() {
+            builder.add(k, v);
+            assert_eq!(k, builder.last_key());
+        }
+    }
+    // Additional test coverage is provided by tests in block.rs.
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/cache.rs	Sun Mar 11 16:58:32 2018 +0100
@@ -0,0 +1,407 @@
+use std::collections::HashMap;
+use std::mem::{replace, swap};
+
+// No clone, no copy! That asserts that an LRUHandle exists only once.
+type LRUHandle<T> = *mut LRUNode<T>;
+
+struct LRUNode<T> {
+    next: Option<Box<LRUNode<T>>>, // None in the list's last node
+    prev: Option<*mut LRUNode<T>>,
+    data: Option<T>, // if None, then we have reached the head node
+}
+
+struct LRUList<T> {
+    head: LRUNode<T>,
+    count: usize,
+}
+
+/// This is likely unstable; more investigation is needed into correct behavior!
+impl<T> LRUList<T> {
+    fn new() -> LRUList<T> {
+        LRUList {
+            head: LRUNode {
+                data: None,
+                next: None,
+                prev: None,
+            },
+            count: 0,
+        }
+    }
+
+    /// Inserts new element at front (least recently used element)
+    fn insert(&mut self, elem: T) -> LRUHandle<T> {
+        self.count += 1;
+        // Not first element
+        if self.head.next.is_some() {
+            let mut new = Box::new(LRUNode {
+                data: Some(elem),
+                next: None,
+                prev: Some(&mut self.head as *mut LRUNode<T>),
+            });
+            let newp = new.as_mut() as *mut LRUNode<T>;
+
+            // Set up the node after the new one
+            self.head.next.as_mut().unwrap().prev = Some(newp);
+            // Replace head.next with None and set the new node's next to that
+            new.next = replace(&mut self.head.next, None);
+            self.head.next = Some(new);
+
+            newp
+        } else {
+            // First node; the only node right now is an empty head node
+            let mut new = Box::new(LRUNode {
+                data: Some(elem),
+                next: None,
+                prev: Some(&mut self.head as *mut LRUNode<T>),
+            });
+            let newp = new.as_mut() as *mut LRUNode<T>;
+
+            // Set tail
+            self.head.prev = Some(newp);
+            // Set first node
+            self.head.next = Some(new);
+
+            newp
+        }
+    }
+
+    fn remove_last(&mut self) -> Option<T> {
+        if self.head.prev.is_some() {
+            let mut lasto = unsafe {
+                replace(
+                    &mut (*((*self.head.prev.unwrap()).prev.unwrap())).next,
+                    None,
+                )
+            };
+
+            if let Some(ref mut last) = lasto {
+                self.head.prev = last.prev;
+                self.count -= 1;
+                return replace(&mut (*last).data, None);
+            } else {
+                None
+            }
+        } else {
+            None
+        }
+    }
+
+    fn remove(&mut self, node_handle: LRUHandle<T>) -> T {
+        unsafe {
+            // If has next
+            if let Some(ref mut nextp) = (*node_handle).next {
+                swap(&mut (**nextp).prev, &mut (*node_handle).prev);
+            }
+            // If has prev
+            if let Some(ref mut prevp) = (*node_handle).prev {
+                // swap prev.next
+                // (node_handle will own itself now)
+                swap(&mut (**prevp).next, &mut (*node_handle).next);
+            }
+
+            self.count -= 1;
+            // node_handle now only has references/objects that point to itself,
+            // so it's safe to drop
+            replace(&mut (*node_handle).data, None).unwrap()
+        }
+    }
+
+    /// Reinserts the referenced node at the front.
+    fn reinsert_front(&mut self, node_handle: LRUHandle<T>) {
+        unsafe {
+            let prevp = (*node_handle).prev.unwrap();
+
+            // If not last node, update following node's prev
+            if let Some(next) = (*node_handle).next.as_mut() {
+                next.prev = Some(prevp);
+            } else {
+                // If last node, update head
+                self.head.prev = Some(prevp);
+            }
+
+            // Swap this.next with prev.next. After that, this.next refers to this (!)
+            swap(&mut (*prevp).next, &mut (*node_handle).next);
+            // To reinsert at head, swap head's next with this.next
+            swap(&mut (*node_handle).next, &mut self.head.next);
+            // Update this' prev reference to point to head.
+
+            // Update the second node's prev reference.
+            if let Some(ref mut newnext) = (*node_handle).next {
+                (*node_handle).prev = newnext.prev;
+                newnext.prev = Some(node_handle);
+            } else {
+                // Only one node, being the last one; avoid head.prev pointing to head
+                self.head.prev = Some(node_handle);
+            }
+
+            assert!(self.head.next.is_some());
+            assert!(self.head.prev.is_some());
+        }
+    }
+
+    fn count(&self) -> usize {
+        self.count
+    }
+
+    fn _testing_head_ref(&self) -> Option<&T> {
+        if let Some(ref first) = self.head.next {
+            first.data.as_ref()
+        } else {
+            None
+        }
+    }
+}
+
+pub type CacheKey = [u8; 16];
+pub type CacheID = u64;
+type CacheEntry<T> = (T, LRUHandle<CacheKey>);
+
+/// Implementation of `ShardedLRUCache`.
+/// Based on a HashMap; the elements are linked in order to support the LRU ordering.
+pub struct Cache<T> {
+    // note: CacheKeys (Vec<u8>) are duplicated between list and map. If this turns out to be a
+    // performance bottleneck, another layer of indirection™ can solve this by mapping the key
+    // to a numeric handle that keys both list and map.
+    list: LRUList<CacheKey>,
+    map: HashMap<CacheKey, CacheEntry<T>>,
+    cap: usize,
+    id: u64,
+}
+
+impl<T> Cache<T> {
+    pub fn new(capacity: usize) -> Cache<T> {
+        assert!(capacity > 0);
+        Cache {
+            list: LRUList::new(),
+            map: HashMap::with_capacity(1024),
+            cap: capacity,
+            id: 0,
+        }
+    }
+
+    /// Returns an ID that is unique for this cache and that can be used to partition the cache
+    /// among several users.
+    pub fn new_cache_id(&mut self) -> CacheID {
+        self.id += 1;
+        return self.id;
+    }
+
+    /// How many the cache currently contains
+    pub fn count(&self) -> usize {
+        return self.list.count();
+    }
+
+    /// The capacity of this cache
+    pub fn cap(&self) -> usize {
+        return self.cap;
+    }
+
+    /// Insert a new element into the cache. The returned `CacheHandle` can be used for further
+    /// operations on that element.
+    /// If the capacity has been reached, the least recently used element is removed from the
+    /// cache.
+    pub fn insert(&mut self, key: &CacheKey, elem: T) {
+        if self.list.count() >= self.cap {
+            if let Some(removed_key) = self.list.remove_last() {
+                assert!(self.map.remove(&removed_key).is_some());
+            } else {
+                panic!("could not remove_last(); bug!");
+            }
+        }
+
+        let lru_handle = self.list.insert(key.clone());
+        self.map.insert(key.clone(), (elem, lru_handle));
+    }
+
+    /// Retrieve an element from the cache.
+    /// If the element has been preempted from the cache in the meantime, this returns None.
+    pub fn get<'a>(&'a mut self, key: &CacheKey) -> Option<&'a T> {
+        match self.map.get(key) {
+            None => None,
+            Some(&(ref elem, ref lru_handle)) => {
+                self.list.reinsert_front(*lru_handle);
+                Some(elem)
+            }
+        }
+    }
+
+    /// Remove an element from the cache (for invalidation).
+    pub fn remove(&mut self, key: &CacheKey) -> Option<T> {
+        match self.map.remove(key) {
+            None => None,
+            Some((elem, lru_handle)) => {
+                self.list.remove(lru_handle);
+                Some(elem)
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use super::LRUList;
+
+    fn make_key(a: u8, b: u8, c: u8) -> CacheKey {
+        [a, b, c, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
+    }
+
+    #[test]
+    fn test_blockcache_cache_add_rm() {
+        let mut cache = Cache::new(128);
+
+        let h_123 = make_key(1, 2, 3);
+        let h_521 = make_key(1, 2, 4);
+        let h_372 = make_key(3, 4, 5);
+        let h_332 = make_key(6, 3, 1);
+        let h_899 = make_key(8, 2, 1);
+
+        cache.insert(&h_123, 123);
+        cache.insert(&h_332, 332);
+        cache.insert(&h_521, 521);
+        cache.insert(&h_372, 372);
+        cache.insert(&h_899, 899);
+
+        assert_eq!(cache.count(), 5);
+
+        assert_eq!(cache.get(&h_123), Some(&123));
+        assert_eq!(cache.get(&h_372), Some(&372));
+
+        assert_eq!(cache.remove(&h_521), Some(521));
+        assert_eq!(cache.get(&h_521), None);
+        assert_eq!(cache.remove(&h_521), None);
+
+        assert_eq!(cache.count(), 4);
+    }
+
+    #[test]
+    fn test_blockcache_cache_capacity() {
+        let mut cache = Cache::new(3);
+
+        let h_123 = make_key(1, 2, 3);
+        let h_521 = make_key(1, 2, 4);
+        let h_372 = make_key(3, 4, 5);
+        let h_332 = make_key(6, 3, 1);
+        let h_899 = make_key(8, 2, 1);
+
+        cache.insert(&h_123, 123);
+        cache.insert(&h_332, 332);
+        cache.insert(&h_521, 521);
+        cache.insert(&h_372, 372);
+        cache.insert(&h_899, 899);
+
+        assert_eq!(cache.count(), 3);
+
+        assert_eq!(cache.get(&h_123), None);
+        assert_eq!(cache.get(&h_332), None);
+        assert_eq!(cache.get(&h_521), Some(&521));
+        assert_eq!(cache.get(&h_372), Some(&372));
+        assert_eq!(cache.get(&h_899), Some(&899));
+    }
+
+    #[test]
+    fn test_blockcache_lru_remove() {
+        let mut lru = LRUList::<usize>::new();
+
+        let h_56 = lru.insert(56);
+        lru.insert(22);
+        lru.insert(223);
+        let h_244 = lru.insert(244);
+        lru.insert(1111);
+        let h_12 = lru.insert(12);
+
+        assert_eq!(lru.count(), 6);
+        assert_eq!(244, lru.remove(h_244));
+        assert_eq!(lru.count(), 5);
+        assert_eq!(12, lru.remove(h_12));
+        assert_eq!(lru.count(), 4);
+        assert_eq!(56, lru.remove(h_56));
+        assert_eq!(lru.count(), 3);
+    }
+
+    #[test]
+    fn test_blockcache_lru_1() {
+        let mut lru = LRUList::<usize>::new();
+
+        lru.insert(56);
+        lru.insert(22);
+        lru.insert(244);
+        lru.insert(12);
+
+        assert_eq!(lru.count(), 4);
+
+        assert_eq!(Some(56), lru.remove_last());
+        assert_eq!(Some(22), lru.remove_last());
+        assert_eq!(Some(244), lru.remove_last());
+
+        assert_eq!(lru.count(), 1);
+
+        assert_eq!(Some(12), lru.remove_last());
+
+        assert_eq!(lru.count(), 0);
+
+        assert_eq!(None, lru.remove_last());
+    }
+
+    #[test]
+    fn test_blockcache_lru_reinsert() {
+        let mut lru = LRUList::<usize>::new();
+
+        let handle1 = lru.insert(56);
+        let handle2 = lru.insert(22);
+        let handle3 = lru.insert(244);
+
+        assert_eq!(lru._testing_head_ref().map(|r| (*r)).unwrap(), 244);
+
+        lru.reinsert_front(handle1);
+
+        assert_eq!(lru._testing_head_ref().map(|r| (*r)).unwrap(), 56);
+
+        lru.reinsert_front(handle3);
+
+        assert_eq!(lru._testing_head_ref().map(|r| (*r)).unwrap(), 244);
+
+        lru.reinsert_front(handle2);
+
+        assert_eq!(lru._testing_head_ref().map(|r| (*r)).unwrap(), 22);
+
+        assert_eq!(lru.remove_last(), Some(56));
+        assert_eq!(lru.remove_last(), Some(244));
+        assert_eq!(lru.remove_last(), Some(22));
+    }
+
+    #[test]
+    fn test_blockcache_lru_reinsert_2() {
+        let mut lru = LRUList::<usize>::new();
+
+        let handles = vec![
+            lru.insert(0),
+            lru.insert(1),
+            lru.insert(2),
+            lru.insert(3),
+            lru.insert(4),
+            lru.insert(5),
+            lru.insert(6),
+            lru.insert(7),
+            lru.insert(8),
+        ];
+
+        for i in 0..9 {
+            lru.reinsert_front(handles[i]);
+            assert_eq!(lru._testing_head_ref().map(|x| *x), Some(i));
+        }
+    }
+
+    #[test]
+    fn test_blockcache_lru_edge_cases() {
+        let mut lru = LRUList::<usize>::new();
+
+        let handle = lru.insert(3);
+
+        lru.reinsert_front(handle);
+        assert_eq!(lru._testing_head_ref().map(|x| *x), Some(3));
+        assert_eq!(lru.remove_last(), Some(3));
+        assert_eq!(lru.remove_last(), None);
+        assert_eq!(lru.remove_last(), None);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/error.rs	Sun Mar 11 16:58:32 2018 +0100
@@ -0,0 +1,108 @@
+use std::convert::From;
+use std::error::Error;
+use std::fmt::{self, Display, Formatter};
+use std::io;
+use std::result;
+use std::sync;
+
+use snap;
+
+/// StatusCode describes various failure modes of database operations.
+#[derive(Clone, Debug, PartialEq)]
+#[allow(dead_code)]
+pub enum StatusCode {
+    OK,
+
+    AlreadyExists,
+    Corruption,
+    CompressionError,
+    IOError,
+    InvalidArgument,
+    InvalidData,
+    LockError,
+    NotFound,
+    NotSupported,
+    PermissionDenied,
+    Unknown,
+}
+
+/// Status encapsulates a `StatusCode` and an error message. It can be displayed, and also
+/// implements `Error`.
+#[derive(Clone, Debug, PartialEq)]
+pub struct Status {
+    pub code: StatusCode,
+    pub err: String,
+}
+
+impl Default for Status {
+    fn default() -> Status {
+        Status {
+            code: StatusCode::OK,
+            err: String::new(),
+        }
+    }
+}
+
+impl Display for Status {
+    fn fmt(&self, fmt: &mut Formatter) -> result::Result<(), fmt::Error> {
+        fmt.write_str(self.description())
+    }
+}
+
+impl Error for Status {
+    fn description(&self) -> &str {
+        &self.err
+    }
+}
+
+impl Status {
+    pub fn new(code: StatusCode, msg: &str) -> Status {
+        let err;
+        if msg.is_empty() {
+            err = format!("{:?}", code)
+        } else {
+            err = format!("{:?}: {}", code, msg);
+        }
+        return Status {
+            code: code,
+            err: err,
+        };
+    }
+}
+
+/// LevelDB's result type
+pub type Result<T> = result::Result<T, Status>;
+
+/// err returns a new Status wrapped in a Result.
+pub fn err<T>(code: StatusCode, msg: &str) -> Result<T> {
+    Err(Status::new(code, msg))
+}
+
+impl From<io::Error> for Status {
+    fn from(e: io::Error) -> Status {
+        let c = match e.kind() {
+            io::ErrorKind::NotFound => StatusCode::NotFound,
+            io::ErrorKind::InvalidData => StatusCode::Corruption,
+            io::ErrorKind::InvalidInput => StatusCode::InvalidArgument,
+            io::ErrorKind::PermissionDenied => StatusCode::PermissionDenied,
+            _ => StatusCode::IOError,
+        };
+
+        Status::new(c, e.description())
+    }
+}
+
+impl<T> From<sync::PoisonError<T>> for Status {
+    fn from(_: sync::PoisonError<T>) -> Status {
+        Status::new(StatusCode::LockError, "lock poisoned")
+    }
+}
+
+impl From<snap::Error> for Status {
+    fn from(e: snap::Error) -> Status {
+        Status {
+            code: StatusCode::CompressionError,
+            err: e.description().to_string(),
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/table_block.rs	Sun Mar 11 16:58:32 2018 +0100
@@ -0,0 +1,88 @@
+use block::Block;
+use blockhandle::BlockHandle;
+use error::{err, Result, StatusCode};
+use filter;
+use filter_block::FilterBlockReader;
+use options::{self, CompressionType, Options};
+use table_builder;
+use types::{unmask_crc, RandomAccess};
+
+use crc::crc32::{self, Hasher32};
+use integer_encoding::FixedInt;
+use snap::Decoder;
+
+/// Reads the data for the specified block handle from a file.
+fn read_bytes(f: &RandomAccess, location: &BlockHandle) -> Result<Vec<u8>> {
+    let mut buf = vec![0; location.size()];
+    f.read_at(location.offset(), &mut buf).map(|_| buf)
+}
+
+/// Reads a serialized filter block from a file and returns a FilterBlockReader.
+pub fn read_filter_block(
+    src: &RandomAccess,
+    location: &BlockHandle,
+    policy: filter::BoxedFilterPolicy,
+) -> Result<FilterBlockReader> {
+    if location.size() == 0 {
+        return err(
+            StatusCode::InvalidArgument,
+            "no filter block in empty location",
+        );
+    }
+    let buf = read_bytes(src, location)?;
+    Ok(FilterBlockReader::new_owned(policy, buf))
+}
+
+/// Reads a table block from a random-access source.
+/// A table block consists of [bytes..., compress (1B), checksum (4B)]; the handle only refers to
+/// the location and length of [bytes...].
+pub fn read_table_block(opt: Options, f: &RandomAccess, location: &BlockHandle) -> Result<Block> {
+    // The block is denoted by offset and length in BlockHandle. A block in an encoded
+    // table is followed by 1B compression type and 4B checksum.
+    // The checksum refers to the compressed contents.
+    let buf = try!(read_bytes(f, location));
+    let compress = try!(read_bytes(
+        f,
+        &BlockHandle::new(
+            location.offset() + location.size(),
+            table_builder::TABLE_BLOCK_COMPRESS_LEN
+        )
+    ));
+    let cksum = try!(read_bytes(
+        f,
+        &BlockHandle::new(
+            location.offset() + location.size() + table_builder::TABLE_BLOCK_COMPRESS_LEN,
+            table_builder::TABLE_BLOCK_CKSUM_LEN
+        )
+    ));
+
+    if !verify_table_block(&buf, compress[0], unmask_crc(u32::decode_fixed(&cksum))) {
+        return err(
+            StatusCode::Corruption,
+            &format!(
+                "checksum verification failed for block at {}",
+                location.offset()
+            ),
+        );
+    }
+
+    if let Some(ctype) = options::int_to_compressiontype(compress[0] as u32) {
+        match ctype {
+            CompressionType::CompressionNone => Ok(Block::new(opt, buf)),
+            CompressionType::CompressionSnappy => {
+                let decoded = Decoder::new().decompress_vec(&buf)?;
+                Ok(Block::new(opt, decoded))
+            }
+        }
+    } else {
+        err(StatusCode::InvalidData, "invalid compression type")
+    }
+}
+
+/// Verify checksum of block
+fn verify_table_block(data: &[u8], compression: u8, want: u32) -> bool {
+    let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
+    digest.write(data);
+    digest.write(&[compression; 1]);
+    digest.sum32() == want
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/test_util.rs	Sun Mar 11 16:58:32 2018 +0100
@@ -0,0 +1,166 @@
+use types::{current_key_val, SSIterator};
+use cmp::{Cmp, DefaultCmp};
+
+use std::cmp::Ordering;
+
+/// TestSSIter is an SSIterator over a vector, to be used for testing purposes.
+pub struct TestSSIter<'a> {
+    v: Vec<(&'a [u8], &'a [u8])>,
+    ix: usize,
+    init: bool,
+}
+
+impl<'a> TestSSIter<'a> {
+    pub fn new(c: Vec<(&'a [u8], &'a [u8])>) -> TestSSIter<'a> {
+        return TestSSIter {
+            v: c,
+            ix: 0,
+            init: false,
+        };
+    }
+}
+
+impl<'a> SSIterator for TestSSIter<'a> {
+    fn advance(&mut self) -> bool {
+        if self.ix == self.v.len() - 1 {
+            self.ix += 1;
+            false
+        } else if !self.init {
+            self.init = true;
+            true
+        } else {
+            self.ix += 1;
+            true
+        }
+    }
+    fn reset(&mut self) {
+        self.ix = 0;
+        self.init = false;
+    }
+    fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool {
+        if self.init && self.ix < self.v.len() {
+            key.clear();
+            val.clear();
+            key.extend_from_slice(self.v[self.ix].0);
+            val.extend_from_slice(self.v[self.ix].1);
+            true
+        } else {
+            false
+        }
+    }
+    fn valid(&self) -> bool {
+        self.init && self.ix < self.v.len()
+    }
+    fn seek(&mut self, k: &[u8]) {
+        self.ix = 0;
+        self.init = true;
+        while self.ix < self.v.len() && DefaultCmp.cmp(self.v[self.ix].0, k) == Ordering::Less {
+            self.ix += 1;
+        }
+    }
+    fn prev(&mut self) -> bool {
+        if !self.init || self.ix == 0 {
+            self.init = false;
+            false
+        } else {
+            self.ix -= 1;
+            true
+        }
+    }
+}
+
+/// SSIteratorIter implements std::iter::Iterator for an SSIterator.
+pub struct SSIteratorIter<'a, It: 'a> {
+    inner: &'a mut It,
+}
+
+impl<'a, It: SSIterator> SSIteratorIter<'a, It> {
+    pub fn wrap(it: &'a mut It) -> SSIteratorIter<'a, It> {
+        SSIteratorIter { inner: it }
+    }
+}
+
+impl<'a, It: SSIterator> Iterator for SSIteratorIter<'a, It> {
+    type Item = (Vec<u8>, Vec<u8>);
+    fn next(&mut self) -> Option<Self::Item> {
+        SSIterator::next(self.inner)
+    }
+}
+
+/// This shared test takes an iterator over a set of exactly four elements and tests that it
+/// fulfills the generic iterator properties. Every iterator defined in this code base should pass
+/// this test.
+pub fn test_iterator_properties<It: SSIterator>(mut it: It) {
+    assert!(!it.valid());
+    assert!(it.advance());
+    assert!(it.valid());
+    let first = current_key_val(&it);
+    assert!(it.advance());
+    let second = current_key_val(&it);
+    assert!(it.advance());
+    let third = current_key_val(&it);
+    // fourth (last) element
+    assert!(it.advance());
+    assert!(it.valid());
+    let fourth = current_key_val(&it);
+    // past end is invalid
+    assert!(!it.advance());
+    assert!(!it.valid());
+
+    it.reset();
+    it.seek(&fourth.as_ref().unwrap().0);
+    assert!(it.valid());
+    it.seek(&second.as_ref().unwrap().0);
+    assert!(it.valid());
+    it.prev();
+    assert_eq!(first, current_key_val(&it));
+
+    it.reset();
+    assert!(!it.valid());
+    assert!(it.advance());
+    assert_eq!(first, current_key_val(&it));
+    assert!(it.advance());
+    assert_eq!(second, current_key_val(&it));
+    assert!(it.advance());
+    assert_eq!(third, current_key_val(&it));
+    assert!(it.prev());
+    assert_eq!(second, current_key_val(&it));
+    assert!(it.prev());
+    assert_eq!(first, current_key_val(&it));
+    assert!(!it.prev());
+    assert!(!it.valid());
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_test_util_basic() {
+        let v = vec![
+            ("abc".as_bytes(), "def".as_bytes()),
+            ("abd".as_bytes(), "deg".as_bytes()),
+        ];
+        let mut iter = TestSSIter::new(v);
+        assert_eq!(
+            iter.next(),
+            Some((Vec::from("abc".as_bytes()), Vec::from("def".as_bytes())))
+        );
+    }
+
+    #[test]
+    fn test_test_util_ssiter_properties() {
+        time_test!();
+        let v;
+        {
+            time_test!("init");
+            v = vec![
+                ("abc".as_bytes(), "def".as_bytes()),
+                ("abd".as_bytes(), "deg".as_bytes()),
+                ("abe".as_bytes(), "deg".as_bytes()),
+                ("abf".as_bytes(), "deg".as_bytes()),
+            ];
+        }
+        test_iterator_properties(TestSSIter::new(v));
+    }
+}