Mercurial > lbo > hg > sstable
changeset 41:d2559e2729e4
Completely rebase sstable code on rusty_leveldb 0.2.2
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 11 Mar 2018 16:00:32 +0100 |
parents | 2452c9af0021 |
children | 9cf43f11d5a6 |
files | Cargo.toml src/block.rs src/blockhandle.rs src/filter.rs src/filter_block.rs src/lib.rs src/options.rs src/table_builder.rs src/table_reader.rs src/types.rs |
diffstat | 10 files changed, 1044 insertions(+), 757 deletions(-) [+] |
line wrap: on
line diff
--- a/Cargo.toml Sun Mar 11 11:29:39 2018 +0100 +++ b/Cargo.toml Sun Mar 11 16:00:32 2018 +0100 @@ -12,3 +12,7 @@ [dependencies] crc = "1.2" integer-encoding = "1.0" +snap = "0.2" + +[dev-dependencies] +time-test = "0.2"
--- a/src/block.rs Sun Mar 11 11:29:39 2018 +0100 +++ b/src/block.rs Sun Mar 11 16:00:32 2018 +0100 @@ -3,13 +3,17 @@ use std::rc::Rc; use options::Options; -use iterator::SSIterator; +use types::SSIterator; use integer_encoding::FixedInt; use integer_encoding::VarInt; pub type BlockContents = Vec<u8>; +/// A Block is an immutable ordered set of key/value entries. +/// +/// The structure internally looks like follows: +/// /// A block is a list of ENTRIES followed by a list of RESTARTS, terminated by a fixed u32 /// N_RESTARTS. /// @@ -26,6 +30,7 @@ /// A RESTART is a fixed u32 pointing to the beginning of an ENTRY. /// /// N_RESTARTS contains the number of restarts. +#[derive(Clone)] pub struct Block { block: Rc<BlockContents>, opt: Options, @@ -34,7 +39,7 @@ impl Block { /// Return an iterator over this block. /// Note that the iterator isn't bound to the block's lifetime; the iterator uses the same - /// refcounted block contents as this block. (meaning also that if the iterator isn't released, + /// refcounted block contents as this block, meaning that if the iterator isn't released, /// the memory occupied by the block isn't, either) pub fn iter(&self) -> BlockIter { let restarts = u32::decode_fixed(&self.block[self.block.len() - 4..]); @@ -67,6 +72,8 @@ } } +/// BlockIter is an iterator over the entries in a block. It doesn't depend on the Block's +/// lifetime, as it uses a refcounted block underneath. pub struct BlockIter { /// The underlying block contents. /// TODO: Maybe (probably...) this needs an Arc. @@ -106,8 +113,8 @@ let (shared, non_shared, _, head_len) = self.parse_entry_and_advance(); assert_eq!(shared, 0); - self.assemble_key(off + head_len, shared, non_shared); + assert!(self.valid()); } /// Return the offset that restart `ix` points to. @@ -123,7 +130,7 @@ /// Returns SHARED, NON_SHARED, VALSIZE and [length of length spec] from the current position, /// where 'length spec' is the length of the three values in the entry header, as described /// above. - /// Advances self.offset to point to the beginning of the next entry. + /// Advances self.offset to the beginning of the next entry. fn parse_entry_and_advance(&mut self) -> (usize, usize, usize, usize) { let mut i = 0; let (shared, sharedlen) = usize::decode_var(&self.block[self.offset..]); @@ -136,7 +143,7 @@ i += valsizelen; self.val_offset = self.offset + i + non_shared; - self.offset = self.offset + i + non_shared + valsize; + self.offset = self.val_offset + valsize; (shared, non_shared, valsize, i) } @@ -149,8 +156,9 @@ /// respectively non-shared parts of the key. /// Only self.key is mutated. fn assemble_key(&mut self, off: usize, shared: usize, non_shared: usize) { - self.key.resize(shared, 0); - self.key.extend_from_slice(&self.block[off..off + non_shared]); + self.key.truncate(shared); + self.key + .extend_from_slice(&self.block[off..off + non_shared]); } pub fn seek_to_last(&mut self) { @@ -161,40 +169,41 @@ self.reset(); } - while let Some((_, _)) = self.next() { + // Stop at last entry, before the iterator becomes invalid. + // + // We're checking the position before calling advance; if a restart point points to the + // last entry, calling advance() will directly reset the iterator. + while self.offset < self.restarts_off { + self.advance(); } + assert!(self.valid()); } } -impl Iterator for BlockIter { - type Item = (Vec<u8>, Vec<u8>); - - fn next(&mut self) -> Option<Self::Item> { +impl SSIterator for BlockIter { + fn advance(&mut self) -> bool { if self.offset >= self.restarts_off { - self.offset = self.restarts_off; - // current_entry_offset is left at the offset of the last entry - return None; + self.reset(); + return false; } else { self.current_entry_offset = self.offset; } let current_off = self.current_entry_offset; - let (shared, non_shared, valsize, entry_head_len) = self.parse_entry_and_advance(); + let (shared, non_shared, _valsize, entry_head_len) = self.parse_entry_and_advance(); self.assemble_key(current_off + entry_head_len, shared, non_shared); // Adjust current_restart_ix let num_restarts = self.number_restarts(); - while self.current_restart_ix + 1 < num_restarts && - self.get_restart_point(self.current_restart_ix + 1) < self.current_entry_offset { + while self.current_restart_ix + 1 < num_restarts + && self.get_restart_point(self.current_restart_ix + 1) < self.current_entry_offset + { self.current_restart_ix += 1; } - - Some((self.key.clone(), Vec::from(&self.block[self.val_offset..self.val_offset + valsize]))) + true } -} -impl SSIterator for BlockIter { fn reset(&mut self) { self.offset = 0; self.val_offset = 0; @@ -202,14 +211,14 @@ self.key.clear(); } - fn prev(&mut self) -> Option<Self::Item> { + fn prev(&mut self) -> bool { // as in the original implementation -- seek to last restart point, then look for key let orig_offset = self.current_entry_offset; // At the beginning, can't go further back if orig_offset == 0 { self.reset(); - return None; + return false; } while self.get_restart_point(self.current_restart_ix) >= orig_offset { @@ -230,8 +239,7 @@ // Stop if the next entry would be the original one (self.offset always points to the start // of the next entry) loop { - result = self.next(); - + result = self.advance(); if self.offset >= orig_offset { break; } @@ -276,159 +284,62 @@ } fn valid(&self) -> bool { - !self.key.is_empty() && self.val_offset > 0 && self.val_offset < self.restarts_off + !self.key.is_empty() && self.val_offset > 0 && self.val_offset <= self.restarts_off } - fn current(&self) -> Option<Self::Item> { + fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool { if self.valid() { - Some((self.key.clone(), Vec::from(&self.block[self.val_offset..self.offset]))) + key.clear(); + val.clear(); + key.extend_from_slice(&self.key); + val.extend_from_slice(&self.block[self.val_offset..self.offset]); + true } else { - None + false } } } -pub struct BlockBuilder { - opt: Options, - buffer: Vec<u8>, - restarts: Vec<u32>, - - last_key: Vec<u8>, - counter: usize, -} - -impl BlockBuilder { - pub fn new(o: Options) -> BlockBuilder { - let mut restarts = vec![0]; - restarts.reserve(1023); - - BlockBuilder { - buffer: Vec::with_capacity(o.block_size), - opt: o, - restarts: restarts, - last_key: Vec::new(), - counter: 0, - } - } - - pub fn entries(&self) -> usize { - self.counter - } - - pub fn last_key<'a>(&'a self) -> &'a [u8] { - &self.last_key - } - - pub fn size_estimate(&self) -> usize { - self.buffer.len() + self.restarts.len() * 4 + 4 - } - - #[allow(dead_code)] - pub fn reset(&mut self) { - self.buffer.clear(); - self.restarts.clear(); - self.last_key.clear(); - self.counter = 0; - } - - pub fn add(&mut self, key: &[u8], val: &[u8]) { - assert!(self.counter <= self.opt.block_restart_interval); - assert!(self.buffer.is_empty() || - self.opt.cmp.cmp(self.last_key.as_slice(), key) == Ordering::Less); - - let mut shared = 0; - - if self.counter < self.opt.block_restart_interval { - let smallest = if self.last_key.len() < key.len() { - self.last_key.len() - } else { - key.len() - }; - - while shared < smallest && self.last_key[shared] == key[shared] { - shared += 1; - } - } else { - self.restarts.push(self.buffer.len() as u32); - self.last_key.resize(0, 0); - self.counter = 0; - } - - let non_shared = key.len() - shared; - - let mut buf = [0 as u8; 4]; - - let mut sz = shared.encode_var(&mut buf[..]); - self.buffer.extend_from_slice(&buf[0..sz]); - sz = non_shared.encode_var(&mut buf[..]); - self.buffer.extend_from_slice(&buf[0..sz]); - sz = val.len().encode_var(&mut buf[..]); - self.buffer.extend_from_slice(&buf[0..sz]); - - self.buffer.extend_from_slice(&key[shared..]); - self.buffer.extend_from_slice(val); - - // Update key - self.last_key.resize(shared, 0); - self.last_key.extend_from_slice(&key[shared..]); - - self.counter += 1; - } - - pub fn finish(mut self) -> BlockContents { - // 1. Append RESTARTS - let mut i = self.buffer.len(); - self.buffer.resize(i + self.restarts.len() * 4 + 4, 0); - - for r in self.restarts.iter() { - r.encode_fixed(&mut self.buffer[i..i + 4]); - i += 4; - } - - // 2. Append N_RESTARTS - (self.restarts.len() as u32).encode_fixed(&mut self.buffer[i..i + 4]); - - // done - self.buffer - } -} - - #[cfg(test)] mod tests { use super::*; - use iterator::SSIterator; - use options::*; + use block_builder::BlockBuilder; + use options; + use test_util::{test_iterator_properties, SSIteratorIter}; + use types::{current_key_val, SSIterator}; fn get_data() -> Vec<(&'static [u8], &'static [u8])> { - vec![("key1".as_bytes(), "value1".as_bytes()), - ("loooooooooooooooooooooooooooooooooongerkey1".as_bytes(), "shrtvl1".as_bytes()), - ("medium length key 1".as_bytes(), "some value 2".as_bytes()), - ("prefix_key1".as_bytes(), "value".as_bytes()), - ("prefix_key2".as_bytes(), "value".as_bytes()), - ("prefix_key3".as_bytes(), "value".as_bytes())] + vec![ + ("key1".as_bytes(), "value1".as_bytes()), + ( + "loooooooooooooooooooooooooooooooooongerkey1".as_bytes(), + "shrtvl1".as_bytes(), + ), + ("medium length key 1".as_bytes(), "some value 2".as_bytes()), + ("prefix_key1".as_bytes(), "value".as_bytes()), + ("prefix_key2".as_bytes(), "value".as_bytes()), + ("prefix_key3".as_bytes(), "value".as_bytes()), + ] } #[test] - fn test_block_builder() { - let mut o = Options::default(); - o.block_restart_interval = 3; - - let mut builder = BlockBuilder::new(o); + fn test_block_iterator_properties() { + let o = options::for_test(); + let mut builder = BlockBuilder::new(o.clone()); + let mut data = get_data(); + data.truncate(4); + for &(k, v) in data.iter() { + builder.add(k, v); + } + let block_contents = builder.finish(); - for &(k, v) in get_data().iter() { - builder.add(k, v); - assert!(builder.counter <= 3); - assert_eq!(builder.last_key(), k); - } - - let block = builder.finish(); - assert_eq!(block.len(), 149); + let block = Block::new(o.clone(), block_contents).iter(); + test_iterator_properties(block); } #[test] fn test_block_empty() { - let mut o = Options::default(); + let mut o = options::for_test(); o.block_restart_interval = 16; let builder = BlockBuilder::new(o); @@ -436,9 +347,9 @@ assert_eq!(blockc.len(), 8); assert_eq!(blockc, vec![0, 0, 0, 0, 1, 0, 0, 0]); - let block = Block::new(Options::default(), blockc); + let block = Block::new(options::for_test(), blockc); - for _ in block.iter() { + for _ in SSIteratorIter::wrap(&mut block.iter()) { panic!("expected 0 iterations"); } } @@ -446,19 +357,19 @@ #[test] fn test_block_build_iterate() { let data = get_data(); - let mut builder = BlockBuilder::new(Options::default()); + let mut builder = BlockBuilder::new(options::for_test()); for &(k, v) in data.iter() { builder.add(k, v); } let block_contents = builder.finish(); - let block = Block::new(Options::default(), block_contents).iter(); + let mut block = Block::new(options::for_test(), block_contents).iter(); let mut i = 0; assert!(!block.valid()); - for (k, v) in block { + for (k, v) in SSIteratorIter::wrap(&mut block) { assert_eq!(&k[..], data[i].0); assert_eq!(v, data[i].1); i += 1; @@ -468,7 +379,7 @@ #[test] fn test_block_iterate_reverse() { - let mut o = Options::default(); + let mut o = options::for_test(); o.block_restart_interval = 3; let data = get_data(); let mut builder = BlockBuilder::new(o.clone()); @@ -481,33 +392,40 @@ let mut block = Block::new(o.clone(), block_contents).iter(); assert!(!block.valid()); - assert_eq!(block.next(), - Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec()))); + assert_eq!( + block.next(), + Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec())) + ); assert!(block.valid()); block.next(); assert!(block.valid()); block.prev(); assert!(block.valid()); - assert_eq!(block.current(), - Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec()))); + assert_eq!( + current_key_val(&block), + Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec())) + ); block.prev(); assert!(!block.valid()); // Verify that prev() from the last entry goes to the prev-to-last entry // (essentially, that next() returning None doesn't advance anything) - - while let Some(_) = block.next() { - } + while let Some(_) = block.next() {} block.prev(); assert!(block.valid()); - assert_eq!(block.current(), - Some(("prefix_key2".as_bytes().to_vec(), "value".as_bytes().to_vec()))); + assert_eq!( + current_key_val(&block), + Some(( + "prefix_key2".as_bytes().to_vec(), + "value".as_bytes().to_vec() + )) + ); } #[test] fn test_block_seek() { - let mut o = Options::default(); + let mut o = options::for_test(); o.block_restart_interval = 3; let data = get_data(); @@ -523,33 +441,49 @@ block.seek(&"prefix_key2".as_bytes()); assert!(block.valid()); - assert_eq!(block.current(), - Some(("prefix_key2".as_bytes().to_vec(), "value".as_bytes().to_vec()))); + assert_eq!( + current_key_val(&block), + Some(( + "prefix_key2".as_bytes().to_vec(), + "value".as_bytes().to_vec() + )) + ); block.seek(&"prefix_key0".as_bytes()); assert!(block.valid()); - assert_eq!(block.current(), - Some(("prefix_key1".as_bytes().to_vec(), "value".as_bytes().to_vec()))); + assert_eq!( + current_key_val(&block), + Some(( + "prefix_key1".as_bytes().to_vec(), + "value".as_bytes().to_vec() + )) + ); block.seek(&"key1".as_bytes()); assert!(block.valid()); - assert_eq!(block.current(), - Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec()))); + assert_eq!( + current_key_val(&block), + Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec())) + ); block.seek(&"prefix_key3".as_bytes()); assert!(block.valid()); - assert_eq!(block.current(), - Some(("prefix_key3".as_bytes().to_vec(), "value".as_bytes().to_vec()))); + assert_eq!( + current_key_val(&block), + Some(( + "prefix_key3".as_bytes().to_vec(), + "value".as_bytes().to_vec() + )) + ); block.seek(&"prefix_key8".as_bytes()); - assert!(block.valid()); - assert_eq!(block.current(), - Some(("prefix_key3".as_bytes().to_vec(), "value".as_bytes().to_vec()))); + assert!(!block.valid()); + assert_eq!(current_key_val(&block), None); } #[test] fn test_block_seek_to_last() { - let mut o = Options::default(); + let mut o = options::for_test(); // Test with different number of restarts for block_restart_interval in vec![2, 6, 10] { @@ -568,8 +502,13 @@ block.seek_to_last(); assert!(block.valid()); - assert_eq!(block.current(), - Some(("prefix_key3".as_bytes().to_vec(), "value".as_bytes().to_vec()))); + assert_eq!( + current_key_val(&block), + Some(( + "prefix_key3".as_bytes().to_vec(), + "value".as_bytes().to_vec() + )) + ); } } }
--- a/src/blockhandle.rs Sun Mar 11 11:29:39 2018 +0100 +++ b/src/blockhandle.rs Sun Mar 11 16:00:32 2018 +0100 @@ -4,7 +4,7 @@ /// used typically as file-internal pointer in table (SSTable) files. For example, the index block /// in an SSTable is a block of (key = largest key in block) -> (value = encoded blockhandle of /// block). -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BlockHandle { offset: usize, size: usize, @@ -17,11 +17,13 @@ let (off, offsize) = usize::decode_var(from); let (sz, szsize) = usize::decode_var(&from[offsize..]); - (BlockHandle { - offset: off, - size: sz, - }, - offsize + szsize) + ( + BlockHandle { + offset: off, + size: sz, + }, + offsize + szsize, + ) } pub fn new(offset: usize, size: usize) -> BlockHandle {
--- a/src/filter.rs Sun Mar 11 11:29:39 2018 +0100 +++ b/src/filter.rs Sun Mar 11 16:00:32 2018 +0100 @@ -1,9 +1,4 @@ -#![allow(dead_code)] - -//! This module is yet to be used, but will speed up table lookups. -//! - -use std::sync::Arc; +use std::rc::Rc; use integer_encoding::FixedInt; @@ -13,23 +8,36 @@ pub trait FilterPolicy { /// Returns a string identifying this policy. fn name(&self) -> &'static str; - /// Create a filter matching the given keys. - fn create_filter(&self, keys: &Vec<&[u8]>) -> Vec<u8>; + /// Create a filter matching the given keys. Keys are given as a long byte array that is + /// indexed by the offsets contained in key_offsets. + fn create_filter(&self, keys: &[u8], key_offsets: &[usize]) -> Vec<u8>; /// Check whether the given key may match the filter. fn key_may_match(&self, key: &[u8], filter: &[u8]) -> bool; } /// A boxed and refcounted filter policy (reference-counted because a Box with unsized content /// couldn't be cloned otherwise) -pub type BoxedFilterPolicy = Arc<Box<FilterPolicy>>; +pub type BoxedFilterPolicy = Rc<Box<FilterPolicy>>; + +impl FilterPolicy for BoxedFilterPolicy { + fn name(&self) -> &'static str { + (**self).name() + } + fn create_filter(&self, keys: &[u8], key_offsets: &[usize]) -> Vec<u8> { + (**self).create_filter(keys, key_offsets) + } + fn key_may_match(&self, key: &[u8], filter: &[u8]) -> bool { + (**self).key_may_match(key, filter) + } +} /// Used for tables that don't have filter blocks but need a type parameter. #[derive(Clone)] pub struct NoFilterPolicy; impl NoFilterPolicy { - pub fn new() -> BoxedFilterPolicy { - Arc::new(Box::new(NoFilterPolicy)) + pub fn new() -> NoFilterPolicy { + NoFilterPolicy } } @@ -37,7 +45,7 @@ fn name(&self) -> &'static str { "_" } - fn create_filter(&self, _: &Vec<&[u8]>) -> Vec<u8> { + fn create_filter(&self, _: &[u8], _: &[usize]) -> Vec<u8> { vec![] } fn key_may_match(&self, _: &[u8], _: &[u8]) -> bool { @@ -57,8 +65,8 @@ /// Beware the magic numbers... impl BloomPolicy { /// Returns a new boxed BloomPolicy. - pub fn new(bits_per_key: u32) -> BoxedFilterPolicy { - Arc::new(Box::new(BloomPolicy::new_unwrapped(bits_per_key))) + pub fn new(bits_per_key: u32) -> BloomPolicy { + BloomPolicy::new_unwrapped(bits_per_key) } /// Returns a new BloomPolicy with the given parameter. @@ -102,7 +110,7 @@ let mut i = 0; for b in data[ix..].iter() { - h += (*b as u32) << (8 * i); + h = h.overflowing_add((*b as u32) << (8 * i)).0; i += 1; } @@ -117,17 +125,15 @@ fn name(&self) -> &'static str { "leveldb.BuiltinBloomFilter2" } - fn create_filter(&self, keys: &Vec<&[u8]>) -> Vec<u8> { - let filter_bits = keys.len() * self.bits_per_key as usize; - let mut filter = Vec::new(); + fn create_filter(&self, keys: &[u8], key_offsets: &[usize]) -> Vec<u8> { + let filter_bits = key_offsets.len() * self.bits_per_key as usize; + let mut filter: Vec<u8>; if filter_bits < 64 { - // Preallocate, then resize - filter.reserve(8 + 1); - filter.resize(8, 0 as u8); + filter = Vec::with_capacity(8 + 1); + filter.resize(8, 0); } else { - // Preallocate, then resize - filter.reserve(1 + ((filter_bits + 7) / 8)); + filter = Vec::with_capacity(1 + ((filter_bits + 7) / 8)); filter.resize((filter_bits + 7) / 8, 0); } @@ -136,16 +142,16 @@ // Encode k at the end of the filter. filter.push(self.k as u8); - for key in keys { + // Add all keys to the filter. + offset_data_iterate(keys, key_offsets, |key| { let mut h = self.bloom_hash(key); let delta = (h >> 17) | (h << 15); - for _ in 0..self.k { let bitpos = (h % adj_filter_bits) as usize; filter[bitpos / 8] |= 1 << (bitpos % 8); h = (h as u64 + delta as u64) as u32; } - } + }); filter } @@ -180,30 +186,30 @@ /// A User Key is u8*. /// An Internal Key is u8* u8{8} (where the second part encodes a tag and a sequence number). #[derive(Clone)] -pub struct InternalFilterPolicy { - internal: BoxedFilterPolicy, +pub struct InternalFilterPolicy<FP: FilterPolicy> { + internal: FP, } -impl InternalFilterPolicy { - pub fn new(inner: BoxedFilterPolicy) -> BoxedFilterPolicy { - Arc::new(Box::new(InternalFilterPolicy { internal: inner })) +impl<FP: FilterPolicy> InternalFilterPolicy<FP> { + pub fn new(inner: FP) -> InternalFilterPolicy<FP> { + InternalFilterPolicy { internal: inner } } } -impl FilterPolicy for InternalFilterPolicy { +impl<FP: FilterPolicy> FilterPolicy for InternalFilterPolicy<FP> { fn name(&self) -> &'static str { self.internal.name() } - fn create_filter(&self, keys: &Vec<&[u8]>) -> Vec<u8> { - let mut mod_keys = keys.clone(); - let mut i = 0; + fn create_filter(&self, keys: &[u8], key_offsets: &[usize]) -> Vec<u8> { + let mut mod_keys = Vec::with_capacity(keys.len() - key_offsets.len() * 8); + let mut mod_key_offsets = Vec::with_capacity(key_offsets.len()); - for key in keys { - mod_keys[i] = &key[0..key.len() - 8]; - i += 1; - } - self.internal.create_filter(&mod_keys) + offset_data_iterate(keys, key_offsets, |key| { + mod_key_offsets.push(mod_keys.len()); + mod_keys.extend_from_slice(&key[0..key.len() - 8]); + }); + self.internal.create_filter(&mod_keys, &mod_key_offsets) } fn key_may_match(&self, key: &[u8], filter: &[u8]) -> bool { @@ -211,6 +217,19 @@ } } +/// offset_data_iterate iterates over the entries in data that are indexed by the offsets given in +/// offsets. This is e.g. the internal format of a FilterBlock. +fn offset_data_iterate<F: FnMut(&[u8])>(data: &[u8], offsets: &[usize], mut f: F) { + for offix in 0..offsets.len() { + let upper = if offix == offsets.len() - 1 { + data.len() + } else { + offsets[offix + 1] + }; + let piece = &data[offsets[offix]..upper]; + f(piece); + } +} #[cfg(test)] mod tests { @@ -218,18 +237,28 @@ const _BITS_PER_KEY: u32 = 12; - fn input_data() -> Vec<&'static [u8]> { - let data = vec!["abc123def456".as_bytes(), - "xxx111xxx222".as_bytes(), - "ab00cd00ab".as_bytes(), - "908070605040302010".as_bytes()]; - data + fn input_data() -> (Vec<u8>, Vec<usize>) { + let mut concat = vec![]; + let mut offs = vec![]; + + for d in [ + "abc123def456".as_bytes(), + "xxx111xxx222".as_bytes(), + "ab00cd00ab".as_bytes(), + "908070605040302010".as_bytes(), + ].iter() + { + offs.push(concat.len()); + concat.extend_from_slice(d); + } + (concat, offs) } /// Creates a filter using the keys from input_data(). fn create_filter() -> Vec<u8> { let fpol = BloomPolicy::new(_BITS_PER_KEY); - let filter = fpol.create_filter(&input_data()); + let (data, offs) = input_data(); + let filter = fpol.create_filter(&data, &offs); assert_eq!(filter, vec![194, 148, 129, 140, 192, 196, 132, 164, 8]); filter @@ -239,10 +268,11 @@ fn test_filter_bloom() { let f = create_filter(); let fp = BloomPolicy::new(_BITS_PER_KEY); + let (data, offs) = input_data(); - for k in input_data().iter() { - assert!(fp.key_may_match(k, &f)); - } + offset_data_iterate(&data, &offs, |key| { + assert!(fp.key_may_match(key, &f)); + }); } #[test]
--- a/src/filter_block.rs Sun Mar 11 11:29:39 2018 +0100 +++ b/src/filter_block.rs Sun Mar 11 16:00:32 2018 +0100 @@ -1,7 +1,3 @@ -#![allow(dead_code)] - -//! parts of this module are not yet used (the reading part). - use block::BlockContents; use filter::BoxedFilterPolicy; @@ -10,8 +6,6 @@ use integer_encoding::FixedInt; const FILTER_BASE_LOG2: u32 = 11; - -#[allow(dead_code)] const FILTER_BASE: u32 = 1 << FILTER_BASE_LOG2; // 2 KiB /// For a given byte offset, returns the index of the filter that includes the key at that offset. @@ -28,33 +22,42 @@ /// /// where offsets are 4 bytes, offset of offsets is 4 bytes, and log2 of FILTER_BASE is 1 byte. /// Two consecutive filter offsets may be the same. -pub struct FilterBlockBuilder<'a> { +/// +/// TODO: See if we can remove the lifetime parameter. +pub struct FilterBlockBuilder { policy: BoxedFilterPolicy, // filters, concatenated filters: Vec<u8>, filter_offsets: Vec<usize>, // Reset on every start_block() - keys: Vec<&'a [u8]>, + key_offsets: Vec<usize>, + keys: Vec<u8>, } -impl<'a> FilterBlockBuilder<'a> { - pub fn new(fp: BoxedFilterPolicy) -> FilterBlockBuilder<'a> { +impl FilterBlockBuilder { + pub fn new(fp: BoxedFilterPolicy) -> FilterBlockBuilder { FilterBlockBuilder { policy: fp, // some pre-allocation - filters: Vec::with_capacity(1024 * 40), + filters: Vec::with_capacity(1024), filter_offsets: Vec::with_capacity(1024), + key_offsets: Vec::with_capacity(1024), keys: Vec::with_capacity(1024), } } + pub fn size_estimate(&self) -> usize { + self.filters.len() + 4 * self.filter_offsets.len() + 4 + 1 + } + pub fn filter_name(&self) -> &'static str { self.policy.name() } - pub fn add_key(&mut self, key: &'a [u8]) { - self.keys.push(key); + pub fn add_key(&mut self, key: &[u8]) { + self.key_offsets.push(self.keys.len()); + self.keys.extend_from_slice(key); } pub fn start_block(&mut self, offset: usize) { @@ -68,15 +71,15 @@ fn generate_filter(&mut self) { self.filter_offsets.push(self.filters.len()); - if self.keys.is_empty() { return; } - let filter = self.policy.create_filter(&self.keys); + let filter = self.policy.create_filter(&self.keys, &self.key_offsets); self.filters.extend_from_slice(&filter); self.keys.clear(); + self.key_offsets.clear(); } pub fn finish(mut self) -> Vec<u8> { @@ -87,7 +90,6 @@ let mut result = self.filters; let offsets_offset = result.len(); let mut ix = result.len(); - result.resize(ix + 4 * self.filter_offsets.len() + 5, 0); // Put filter offsets at the end @@ -98,7 +100,6 @@ (offsets_offset as u32).encode_fixed(&mut result[ix..ix + 4]); ix += 4; - result[ix] = FILTER_BASE_LOG2 as u8; result @@ -157,7 +158,8 @@ assert!(filter_begin < filter_end); assert!(filter_end <= self.offsets_offset); - self.policy.key_may_match(key, &self.block[filter_begin..filter_end]) + self.policy + .key_may_match(key, &self.block[filter_begin..filter_end]) } } @@ -175,12 +177,17 @@ } fn get_keys() -> Vec<&'static [u8]> { - vec!["abcd".as_bytes(), "efgh".as_bytes(), "ijkl".as_bytes(), "mnopqrstuvwxyz".as_bytes()] + vec![ + "abcd".as_bytes(), + "efgh".as_bytes(), + "ijkl".as_bytes(), + "mnopqrstuvwxyz".as_bytes(), + ] } fn produce_filter_block() -> Vec<u8> { let keys = get_keys(); - let mut bld = FilterBlockBuilder::new(BloomPolicy::new(32)); + let mut bld = FilterBlockBuilder::new(Rc::new(Box::new(BloomPolicy::new(32)))); bld.start_block(0); @@ -204,26 +211,38 @@ // 2 blocks of 4 filters of 4 bytes plus 1B for `k`; plus three filter offsets (because of // the block offsets of 0 and 5000); plus footer assert_eq!(result.len(), 2 * (get_keys().len() * 4 + 1) + (3 * 4) + 5); - assert_eq!(result, - vec![234, 195, 25, 155, 61, 141, 173, 140, 221, 28, 222, 92, 220, 112, 234, - 227, 22, 234, 195, 25, 155, 61, 141, 173, 140, 221, 28, 222, 92, 220, - 112, 234, 227, 22, 0, 0, 0, 0, 17, 0, 0, 0, 17, 0, 0, 0, 34, 0, 0, 0, 11]); + assert_eq!( + result, + vec![ + 234, 195, 25, 155, 61, 141, 173, 140, 221, 28, 222, 92, 220, 112, 234, 227, 22, + 234, 195, 25, 155, 61, 141, 173, 140, 221, 28, 222, 92, 220, 112, 234, 227, 22, 0, + 0, 0, 0, 17, 0, 0, 0, 17, 0, 0, 0, 34, 0, 0, 0, 11, + ] + ); } #[test] fn test_filter_block_build_read() { let result = produce_filter_block(); - let reader = FilterBlockReader::new_owned(BloomPolicy::new(32), result); + let reader = FilterBlockReader::new_owned(Rc::new(Box::new(BloomPolicy::new(32))), result); - assert_eq!(reader.offset_of(get_filter_index(5121, FILTER_BASE_LOG2)), - 17); // third block in third filter + assert_eq!( + reader.offset_of(get_filter_index(5121, FILTER_BASE_LOG2)), + 17 + ); // third block in third filter - let unknown_keys = vec!["xsb".as_bytes(), "9sad".as_bytes(), "assssaaaass".as_bytes()]; + let unknown_keys = vec![ + "xsb".as_bytes(), + "9sad".as_bytes(), + "assssaaaass".as_bytes(), + ]; for block_offset in vec![0, 1024, 5000, 6025].into_iter() { for key in get_keys().iter() { - assert!(reader.key_may_match(block_offset, key), - format!("{} {:?} ", block_offset, key)); + assert!( + reader.key_may_match(block_offset, key), + format!("{} {:?} ", block_offset, key) + ); } for key in unknown_keys.iter() { assert!(!reader.key_may_match(block_offset, key));
--- a/src/lib.rs Sun Mar 11 11:29:39 2018 +0100 +++ b/src/lib.rs Sun Mar 11 16:00:32 2018 +0100 @@ -1,10 +1,19 @@ extern crate crc; extern crate integer_encoding; +extern crate snap; +#[cfg(test)] +#[macro_use] +extern crate time_test; mod block; +mod block_builder; mod blockhandle; +mod cache; +pub mod error; pub mod filter; mod filter_block; +mod table_block; +mod test_util; mod types; pub mod cmp; @@ -16,7 +25,7 @@ pub use cmp::Cmp; pub use iterator::StandardComparator; pub use iterator::SSIterator; -pub use options::{Options, ReadOptions}; +pub use options::Options; pub use table_builder::TableBuilder; pub use table_reader::{Table, TableIterator};
--- a/src/options.rs Sun Mar 11 11:29:39 2018 +0100 +++ b/src/options.rs Sun Mar 11 16:00:32 2018 +0100 @@ -1,14 +1,19 @@ +use block::Block; +use cache::Cache; use cmp::{Cmp, DefaultCmp}; -use types::SequenceNumber; +use filter; +use types::{share, Shared}; use std::default::Default; -use std::sync::Arc; +use std::rc::Rc; const KB: usize = 1 << 10; const MB: usize = KB * KB; const BLOCK_MAX_SIZE: usize = 4 * KB; +const BLOCK_CACHE_CAPACITY: usize = 8 * MB; const WRITE_BUFFER_SIZE: usize = 4 * MB; +const DEFAULT_BITS_PER_KEY: u32 = 10; // NOTE: This may need to be optimized. #[derive(Clone, Copy, PartialEq, Debug)] pub enum CompressionType { @@ -24,62 +29,36 @@ } } -/// [not all member types implemented yet] +/// Options contains general parameters for a LevelDB instance. Most of the names are +/// self-explanatory; the defaults are defined in the `Default` implementation. /// +/// Note: Compression is not yet implemented. #[derive(Clone)] pub struct Options { - pub cmp: Arc<Box<Cmp>>, - pub create_if_missing: bool, - pub error_if_exists: bool, - pub paranoid_checks: bool, - // pub logger: Logger, + pub cmp: Rc<Box<Cmp>>, pub write_buffer_size: usize, - pub max_open_files: usize, + pub block_cache: Shared<Cache<Block>>, pub block_size: usize, pub block_restart_interval: usize, pub compression_type: CompressionType, - pub reuse_logs: bool, + pub filter_policy: filter::BoxedFilterPolicy, } impl Default for Options { fn default() -> Options { Options { - cmp: Arc::new(Box::new(DefaultCmp)), - create_if_missing: true, - error_if_exists: false, - paranoid_checks: false, + cmp: Rc::new(Box::new(DefaultCmp)), write_buffer_size: WRITE_BUFFER_SIZE, - max_open_files: 1 << 10, + // 2000 elements by default + block_cache: share(Cache::new(BLOCK_CACHE_CAPACITY / BLOCK_MAX_SIZE)), block_size: BLOCK_MAX_SIZE, block_restart_interval: 16, - reuse_logs: false, compression_type: CompressionType::CompressionNone, + filter_policy: Rc::new(Box::new(filter::BloomPolicy::new(DEFAULT_BITS_PER_KEY))), } } } -/// Supplied to DB read operations. -pub struct ReadOptions { - pub verify_checksums: bool, - pub snapshot: Option<SequenceNumber>, +pub fn for_test() -> Options { + Options::default() } - -impl Default for ReadOptions { - fn default() -> Self { - ReadOptions { - verify_checksums: false, - snapshot: None, - } - } -} - -/// Supplied to write operations -pub struct WriteOptions { - pub sync: bool, -} - -impl Default for WriteOptions { - fn default() -> Self { - WriteOptions { sync: false } - } -}
--- a/src/table_builder.rs Sun Mar 11 11:29:39 2018 +0100 +++ b/src/table_builder.rs Sun Mar 11 16:00:32 2018 +0100 @@ -1,15 +1,20 @@ -use block::{BlockBuilder, BlockContents}; +use block::BlockContents; +use block_builder::BlockBuilder; use blockhandle::BlockHandle; -use filter::{BoxedFilterPolicy, NoFilterPolicy}; +use error::Result; +use filter::NoFilterPolicy; use filter_block::FilterBlockBuilder; use options::{CompressionType, Options}; +use types::{mask_crc, unmask_crc}; +use std::cmp::Ordering; use std::io::Write; -use std::cmp::Ordering; +use std::rc::Rc; use crc::crc32; use crc::Hasher32; -use integer_encoding::FixedInt; +use integer_encoding::FixedIntWriter; +use snap::Encoder; pub const FOOTER_LENGTH: usize = 40; pub const FULL_FOOTER_LENGTH: usize = FOOTER_LENGTH + 8; @@ -20,7 +25,7 @@ pub const TABLE_BLOCK_CKSUM_LEN: usize = 4; /// Footer is a helper for encoding/decoding a table footer. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Footer { pub meta_index: BlockHandle, pub index: BlockHandle, @@ -70,11 +75,11 @@ /// DATA BLOCKs, META BLOCKs, INDEX BLOCK and METAINDEX BLOCK are built using the code in /// the `block` module. /// -/// The FOOTER consists of a BlockHandle wthat points to the metaindex block, another pointing to +/// The FOOTER consists of a BlockHandle that points to the metaindex block, another pointing to /// the index block, padding to fill up to 40 B and at the end the 8B magic number /// 0xdb4775248b80fb57. -pub struct TableBuilder<'a, Dst: Write> { +pub struct TableBuilder<Dst: Write> { opt: Options, dst: Dst, @@ -84,21 +89,21 @@ data_block: Option<BlockBuilder>, index_block: Option<BlockBuilder>, - filter_block: Option<FilterBlockBuilder<'a>>, + filter_block: Option<FilterBlockBuilder>, } -impl<'a, Dst: Write> TableBuilder<'a, Dst> { - pub fn new_no_filter(opt: Options, dst: Dst) -> TableBuilder<'a, Dst> { - TableBuilder::new(opt, dst, NoFilterPolicy::new()) +impl<Dst: Write> TableBuilder<Dst> { + pub fn new_no_filter(mut opt: Options, dst: Dst) -> TableBuilder<Dst> { + opt.filter_policy = Rc::new(Box::new(NoFilterPolicy::new())); + TableBuilder::new(opt, dst) } } /// TableBuilder is used for building a new SSTable. It groups entries into blocks, /// calculating checksums and bloom filters. -impl<'a, Dst: Write> TableBuilder<'a, Dst> { - /// Create a new TableBuilder. Currently the best choice for `fpol` is `NoFilterPolicy` (mod - /// filter; or use new_no_filter()) - pub fn new(opt: Options, dst: Dst, fpol: BoxedFilterPolicy) -> TableBuilder<'a, Dst> { +impl<Dst: Write> TableBuilder<Dst> { + /// Create a new table builder. + pub fn new(opt: Options, dst: Dst) -> TableBuilder<Dst> { TableBuilder { opt: opt.clone(), dst: dst, @@ -106,8 +111,8 @@ prev_block_last_key: vec![], num_entries: 0, data_block: Some(BlockBuilder::new(opt.clone())), + filter_block: Some(FilterBlockBuilder::new(opt.filter_policy.clone())), index_block: Some(BlockBuilder::new(opt)), - filter_block: Some(FilterBlockBuilder::new(fpol)), } } @@ -115,8 +120,22 @@ self.num_entries } + pub fn size_estimate(&self) -> usize { + let mut size = 0; + if let Some(ref b) = self.data_block { + size += b.size_estimate(); + } + if let Some(ref b) = self.index_block { + size += b.size_estimate(); + } + if let Some(ref b) = self.filter_block { + size += b.size_estimate(); + } + size + self.offset + FULL_FOOTER_LENGTH + } + /// Add a key to the table. The key as to be lexically greater or equal to the last one added. - pub fn add(&mut self, key: &'a [u8], val: &[u8]) { + pub fn add(&mut self, key: &[u8], val: &[u8]) -> Result<()> { assert!(self.data_block.is_some()); if !self.prev_block_last_key.is_empty() { @@ -124,7 +143,7 @@ } if self.data_block.as_ref().unwrap().size_estimate() > self.opt.block_size { - self.write_data_block(key); + self.write_data_block(key)?; } let dblock = &mut self.data_block.as_mut().unwrap(); @@ -135,12 +154,13 @@ self.num_entries += 1; dblock.add(key, val); + Ok(()) } /// Writes an index entry for the current data_block where `next_key` is the first key of the /// next block. /// Calls write_block() for writing the block to disk. - fn write_data_block(&mut self, next_key: &[u8]) { + fn write_data_block(&mut self, next_key: &[u8]) -> Result<()> { assert!(self.data_block.is_some()); let block = self.data_block.take().unwrap(); @@ -148,56 +168,59 @@ self.prev_block_last_key = Vec::from(block.last_key()); let contents = block.finish(); - let handle = BlockHandle::new(self.offset, contents.len()); + let ctype = self.opt.compression_type; + let handle = self.write_block(contents, ctype)?; + let mut handle_enc = [0 as u8; 16]; let enc_len = handle.encode_to(&mut handle_enc); - self.index_block.as_mut().unwrap().add(&sep, &handle_enc[0..enc_len]); + self.index_block + .as_mut() + .unwrap() + .add(&sep, &handle_enc[0..enc_len]); self.data_block = Some(BlockBuilder::new(self.opt.clone())); - let ctype = self.opt.compression_type; - - self.write_block(contents, ctype); - if let Some(ref mut fblock) = self.filter_block { fblock.start_block(self.offset); } + + Ok(()) } /// Calculates the checksum, writes the block to disk and updates the offset. - fn write_block(&mut self, block: BlockContents, t: CompressionType) -> BlockHandle { - // compression is still unimplemented - assert_eq!(t, CompressionType::CompressionNone); + fn write_block(&mut self, block: BlockContents, ctype: CompressionType) -> Result<BlockHandle> { + let mut data = block; + if ctype == CompressionType::CompressionSnappy { + let mut encoder = Encoder::new(); + data = encoder.compress_vec(&data)?; + } - let mut buf = [0 as u8; TABLE_BLOCK_CKSUM_LEN]; let mut digest = crc32::Digest::new(crc32::CASTAGNOLI); - digest.write(&block); - digest.write(&[self.opt.compression_type as u8; TABLE_BLOCK_COMPRESS_LEN]); - digest.sum32().encode_fixed(&mut buf); + digest.write(&data); + digest.write(&[ctype as u8; TABLE_BLOCK_COMPRESS_LEN]); - // TODO: Handle errors here. - let _ = self.dst.write(&block); - let _ = self.dst.write(&[t as u8; TABLE_BLOCK_COMPRESS_LEN]); - let _ = self.dst.write(&buf); + self.dst.write(&data)?; + self.dst.write(&[ctype as u8; TABLE_BLOCK_COMPRESS_LEN])?; + self.dst.write_fixedint(mask_crc(digest.sum32()))?; - let handle = BlockHandle::new(self.offset, block.len()); + let handle = BlockHandle::new(self.offset, data.len()); + self.offset += data.len() + TABLE_BLOCK_COMPRESS_LEN + TABLE_BLOCK_CKSUM_LEN; - self.offset += block.len() + TABLE_BLOCK_COMPRESS_LEN + TABLE_BLOCK_CKSUM_LEN; - - handle + Ok(handle) } - pub fn finish(mut self) { + pub fn finish(mut self) -> Result<usize> { assert!(self.data_block.is_some()); let ctype = self.opt.compression_type; // If there's a pending data block, write it if self.data_block.as_ref().unwrap().entries() > 0 { // Find a key reliably past the last key - let key_past_last = - self.opt.cmp.find_short_succ(self.data_block.as_ref().unwrap().last_key()); - self.write_data_block(&key_past_last); + let key_past_last = self.opt + .cmp + .find_short_succ(self.data_block.as_ref().unwrap().last_key()); + self.write_data_block(&key_past_last)?; } // Create metaindex block @@ -208,7 +231,7 @@ let fblock = self.filter_block.take().unwrap(); let filter_key = format!("filter.{}", fblock.filter_name()); let fblock_data = fblock.finish(); - let fblock_handle = self.write_block(fblock_data, CompressionType::CompressionNone); + let fblock_handle = self.write_block(fblock_data, CompressionType::CompressionNone)?; let mut handle_enc = [0 as u8; 16]; let enc_len = fblock_handle.encode_to(&mut handle_enc); @@ -218,27 +241,28 @@ // write metaindex block let meta_ix = meta_ix_block.finish(); - let meta_ix_handle = self.write_block(meta_ix, ctype); + let meta_ix_handle = self.write_block(meta_ix, ctype)?; // write index block let index_cont = self.index_block.take().unwrap().finish(); - let ix_handle = self.write_block(index_cont, ctype); + let ix_handle = self.write_block(index_cont, ctype)?; // write footer. let footer = Footer::new(meta_ix_handle, ix_handle); let mut buf = [0; FULL_FOOTER_LENGTH]; footer.encode(&mut buf); - self.offset += self.dst.write(&buf[..]).unwrap(); + self.offset += self.dst.write(&buf[..])?; + self.dst.flush()?; + Ok(self.offset) } } #[cfg(test)] mod tests { - use super::{Footer, TableBuilder}; + use super::*; use blockhandle::BlockHandle; - use filter::BloomPolicy; - use options::Options; + use options; #[test] fn test_footer() { @@ -251,39 +275,63 @@ assert_eq!(f2.meta_index.size(), 4); assert_eq!(f2.index.offset(), 55); assert_eq!(f2.index.size(), 5); - } #[test] fn test_table_builder() { let mut d = Vec::with_capacity(512); - let mut opt = Options::default(); + let mut opt = options::for_test(); opt.block_restart_interval = 3; - let mut b = TableBuilder::new(opt, &mut d, BloomPolicy::new(4)); + opt.compression_type = CompressionType::CompressionSnappy; + let mut b = TableBuilder::new(opt, &mut d); - let data = vec![("abc", "def"), ("abd", "dee"), ("bcd", "asa"), ("bsr", "a00")]; + let data = vec![ + ("abc", "def"), + ("abe", "dee"), + ("bcd", "asa"), + ("dcc", "a00"), + ]; + let data2 = vec![ + ("abd", "def"), + ("abf", "dee"), + ("ccd", "asa"), + ("dcd", "a00"), + ]; - for &(k, v) in data.iter() { - b.add(k.as_bytes(), v.as_bytes()); + for i in 0..data.len() { + b.add(&data[i].0.as_bytes(), &data[i].1.as_bytes()).unwrap(); + b.add(&data2[i].0.as_bytes(), &data2[i].1.as_bytes()) + .unwrap(); } + let estimate = b.size_estimate(); + + assert_eq!(143, estimate); assert!(b.filter_block.is_some()); - b.finish(); + + let actual = b.finish().unwrap(); + assert_eq!(223, actual); } #[test] #[should_panic] fn test_bad_input() { let mut d = Vec::with_capacity(512); - let mut opt = Options::default(); + let mut opt = options::for_test(); opt.block_restart_interval = 3; - let mut b = TableBuilder::new(opt, &mut d, BloomPolicy::new(4)); + let mut b = TableBuilder::new(opt, &mut d); // Test two equal consecutive keys - let data = vec![("abc", "def"), ("abc", "dee"), ("bcd", "asa"), ("bsr", "a00")]; + let data = vec![ + ("abc", "def"), + ("abc", "dee"), + ("bcd", "asa"), + ("bsr", "a00"), + ]; for &(k, v) in data.iter() { - b.add(k.as_bytes(), v.as_bytes()); + b.add(k.as_bytes(), v.as_bytes()).unwrap(); } + b.finish().unwrap(); } }
--- a/src/table_reader.rs Sun Mar 11 11:29:39 2018 +0100 +++ b/src/table_reader.rs Sun Mar 11 16:00:32 2018 +0100 @@ -1,138 +1,130 @@ -#![allow(dead_code)] - use block::{Block, BlockIter}; use blockhandle::BlockHandle; -use filter::BoxedFilterPolicy; +use cache; +use error::Result; +use filter; use filter_block::FilterBlockReader; -use options::{self, CompressionType, Options}; +use options::Options; +use table_block; use table_builder::{self, Footer}; -use iterator::SSIterator; +use types::{current_key_val, RandomAccess, SSIterator}; use std::cmp::Ordering; -use std::io::{self, Read, Seek, SeekFrom, Result}; +use std::rc::Rc; -use integer_encoding::FixedInt; -use crc::crc32::{self, Hasher32}; +use integer_encoding::FixedIntWriter; /// Reads the table footer. -fn read_footer<R: Read + Seek>(f: &mut R, size: usize) -> Result<Footer> { - try!(f.seek(SeekFrom::Start((size - table_builder::FULL_FOOTER_LENGTH) as u64))); - let mut buf = [0; table_builder::FULL_FOOTER_LENGTH]; - try!(f.read_exact(&mut buf)); +fn read_footer(f: &RandomAccess, size: usize) -> Result<Footer> { + let mut buf = vec![0; table_builder::FULL_FOOTER_LENGTH]; + f.read_at(size - table_builder::FULL_FOOTER_LENGTH, &mut buf)?; Ok(Footer::decode(&buf)) } -fn read_bytes<R: Read + Seek>(f: &mut R, location: &BlockHandle) -> Result<Vec<u8>> { - try!(f.seek(SeekFrom::Start(location.offset() as u64))); - - let mut buf = Vec::new(); - buf.resize(location.size(), 0); - - try!(f.read_exact(&mut buf[0..location.size()])); - - Ok(buf) -} - -struct TableBlock { - block: Block, - checksum: u32, - compression: CompressionType, -} +#[derive(Clone)] +pub struct Table { + file: Rc<Box<RandomAccess>>, + file_size: usize, + cache_id: cache::CacheID, -impl TableBlock { - /// Reads a block at location. - fn read_block<R: Read + Seek>(opt: Options, - f: &mut R, - location: &BlockHandle) - -> Result<TableBlock> { - // The block is denoted by offset and length in BlockHandle. A block in an encoded - // table is followed by 1B compression type and 4B checksum. - let buf = try!(read_bytes(f, location)); - let compress = try!(read_bytes(f, - &BlockHandle::new(location.offset() + location.size(), - table_builder::TABLE_BLOCK_COMPRESS_LEN))); - let cksum = try!(read_bytes(f, - &BlockHandle::new(location.offset() + location.size() + - table_builder::TABLE_BLOCK_COMPRESS_LEN, - table_builder::TABLE_BLOCK_CKSUM_LEN))); - Ok(TableBlock { - block: Block::new(opt, buf), - checksum: u32::decode_fixed(&cksum), - compression: options::int_to_compressiontype(compress[0] as u32) - .unwrap_or(CompressionType::CompressionNone), - }) - } - - /// Verify checksum of block - fn verify(&self) -> bool { - let mut digest = crc32::Digest::new(crc32::CASTAGNOLI); - digest.write(&self.block.contents()); - digest.write(&[self.compression as u8; 1]); - - digest.sum32() == self.checksum - } -} - -pub struct Table<R: Read + Seek> { - file: R, opt: Options, footer: Footer, indexblock: Block, - #[allow(dead_code)] filters: Option<FilterBlockReader>, } -impl<R: Read + Seek> Table<R> { - /// Creates a new table reader. - pub fn new(opt: Options, mut file: R, size: usize, fp: BoxedFilterPolicy) -> Result<Table<R>> { - let footer = try!(read_footer(&mut file, size)); - - let indexblock = try!(TableBlock::read_block(opt.clone(), &mut file, &footer.index)); - let metaindexblock = - try!(TableBlock::read_block(opt.clone(), &mut file, &footer.meta_index)); - - if !indexblock.verify() || !metaindexblock.verify() { - return Err(io::Error::new(io::ErrorKind::InvalidData, - "Indexblock/Metaindexblock failed verification")); - } +impl Table { + /// Creates a new table reader operating on unformatted keys (i.e., UserKey). + fn new(opt: Options, file: Rc<Box<RandomAccess>>, size: usize) -> Result<Table> { + let footer = try!(read_footer(file.as_ref().as_ref(), size)); + let indexblock = try!(table_block::read_table_block( + opt.clone(), + file.as_ref().as_ref(), + &footer.index + )); + let metaindexblock = try!(table_block::read_table_block( + opt.clone(), + file.as_ref().as_ref(), + &footer.meta_index + )); - // Open filter block for reading - let mut filter_block_reader = None; - let filter_name = format!("filter.{}", fp.name()).as_bytes().to_vec(); - - let mut metaindexiter = metaindexblock.block.iter(); - - metaindexiter.seek(&filter_name); - - if let Some((_key, val)) = metaindexiter.current() { - let filter_block_location = BlockHandle::decode(&val).0; - - if filter_block_location.size() > 0 { - let buf = try!(read_bytes(&mut file, &filter_block_location)); - filter_block_reader = Some(FilterBlockReader::new_owned(fp, buf)); - } - } - - metaindexiter.reset(); + let filter_block_reader = + Table::read_filter_block(&metaindexblock, file.as_ref().as_ref(), &opt)?; + let cache_id = opt.block_cache.borrow_mut().new_cache_id(); Ok(Table { file: file, + file_size: size, + cache_id: cache_id, opt: opt, footer: footer, filters: filter_block_reader, - indexblock: indexblock.block, + indexblock: indexblock, }) } - fn read_block(&mut self, location: &BlockHandle) -> Result<TableBlock> { - let b = try!(TableBlock::read_block(self.opt.clone(), &mut self.file, location)); + fn read_filter_block( + metaix: &Block, + file: &RandomAccess, + options: &Options, + ) -> Result<Option<FilterBlockReader>> { + // Open filter block for reading + let filter_name = format!("filter.{}", options.filter_policy.name()) + .as_bytes() + .to_vec(); + + let mut metaindexiter = metaix.iter(); + metaindexiter.seek(&filter_name); + + if let Some((_key, val)) = current_key_val(&metaindexiter) { + let filter_block_location = BlockHandle::decode(&val).0; + if filter_block_location.size() > 0 { + return Ok(Some(table_block::read_filter_block( + file, + &filter_block_location, + options.filter_policy.clone(), + )?)); + } + } + Ok(None) + } - if !b.verify() { - Err(io::Error::new(io::ErrorKind::InvalidData, "Data block failed verification")) - } else { - Ok(b) + /// block_cache_handle creates a CacheKey for a block with a given offset to be used in the + /// block cache. + fn block_cache_handle(&self, block_off: usize) -> cache::CacheKey { + let mut dst = [0; 2 * 8]; + (&mut dst[..8]) + .write_fixedint(self.cache_id) + .expect("error writing to vec"); + (&mut dst[8..]) + .write_fixedint(block_off as u64) + .expect("error writing to vec"); + dst + } + + /// Read a block from the current table at `location`, and cache it in the options' block + /// cache. + fn read_block(&self, location: &BlockHandle) -> Result<Block> { + let cachekey = self.block_cache_handle(location.offset()); + if let Some(block) = self.opt.block_cache.borrow_mut().get(&cachekey) { + return Ok(block.clone()); } + + // Two times as_ref(): First time to get a ref from Rc<>, then one from Box<>. + let b = try!(table_block::read_table_block( + self.opt.clone(), + self.file.as_ref().as_ref(), + location + )); + + // insert a cheap copy (Rc). + self.opt + .block_cache + .borrow_mut() + .insert(&cachekey, b.clone()); + + Ok(b) } /// Returns the offset of the block that contains `key`. @@ -141,7 +133,7 @@ iter.seek(key); - if let Some((_, val)) = iter.current() { + if let Some((_, val)) = current_key_val(&iter) { let location = BlockHandle::decode(&val).0; return location.offset(); } @@ -149,82 +141,83 @@ return self.footer.meta_index.offset(); } - // Iterators read from the file; thus only one iterator can be borrowed (mutably) per scope - fn iter<'a>(&'a mut self) -> TableIterator<'a, R> { + /// Iterators read from the file; thus only one iterator can be borrowed (mutably) per scope + pub fn iter(&self) -> TableIterator { let iter = TableIterator { - current_block: self.indexblock.iter(), - init: false, + current_block: None, current_block_off: 0, index_block: self.indexblock.iter(), - opt: self.opt.clone(), - table: self, + table: self.clone(), }; iter } - /// Retrieve value from table. This function uses the attached filters, so is better suited if - /// you frequently look for non-existing values (as it will detect the non-existence of an - /// entry in a block without having to load the block). - pub fn get(&mut self, key: &[u8]) -> Option<Vec<u8>> { + /// Retrieve next-biggest entry for key from table. This function uses the attached filters, so + /// is better suited if you frequently look for non-existing values (as it will detect the + /// non-existence of an entry in a block without having to load the block). + /// + /// The caller must check if the returned key, which is the raw key (not e.g. the user portion + /// of an InternalKey) is acceptable (e.g. correct value type or sequence number), as it may + /// not be an exact match for key. + /// + /// This is done this way because some key types, like internal keys, will not result in an + /// exact match; it depends on other comparators than the one that the table reader knows + /// whether a match is acceptable. + pub fn get(&self, key: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>> { let mut index_iter = self.indexblock.iter(); index_iter.seek(key); let handle; - if let Some((last_in_block, h)) = index_iter.current() { + if let Some((last_in_block, h)) = current_key_val(&index_iter) { if self.opt.cmp.cmp(key, &last_in_block) == Ordering::Less { handle = BlockHandle::decode(&h).0; } else { - return None; + return Ok(None); } } else { - return None; + return Ok(None); } // found correct block. - // Check bloom filter + + // Check bloom (or whatever) filter if let Some(ref filters) = self.filters { if !filters.key_may_match(handle.offset(), key) { - return None; + return Ok(None); } } // Read block (potentially from cache) - let mut iter; - if let Ok(tb) = self.read_block(&handle) { - iter = tb.block.iter(); - } else { - return None; - } + let tb = self.read_block(&handle)?; + let mut iter = tb.iter(); // Go to entry and check if it's the wanted entry. iter.seek(key); - if let Some((k, v)) = iter.current() { - if self.opt.cmp.cmp(key, &k) == Ordering::Equal { - Some(v) - } else { - None + if let Some((k, v)) = current_key_val(&iter) { + if self.opt.cmp.cmp(&k, key) >= Ordering::Equal { + return Ok(Some((k, v))); } - } else { - None } + Ok(None) } } /// This iterator is a "TwoLevelIterator"; it uses an index block in order to get an offset hint /// into the data blocks. -pub struct TableIterator<'a, R: 'a + Read + Seek> { - table: &'a mut Table<R>, - opt: Options, - // We're not using Option<BlockIter>, but instead a separate `init` field. That makes it easier - // working with the current block in the iterator methods (no borrowing annoyance as with - // Option<>) - current_block: BlockIter, +pub struct TableIterator { + // A TableIterator is independent of its table (on the syntax level -- it doesn't know its + // Table's lifetime). This is mainly required by the dynamic iterators used everywhere, where a + // lifetime makes things like returning an iterator from a function neigh-impossible. + // + // Instead, reference-counted pointers and locks inside the Table ensure that all + // TableIterators still share a table. + table: Table, + current_block: Option<BlockIter>, current_block_off: usize, - init: bool, index_block: BlockIter, } -impl<'a, R: Read + Seek> TableIterator<'a, R> { +impl TableIterator { // Skips to the entry referenced by the next entry in the index block. // This is called once a block has run out of entries. // Err means corruption or I/O error; Ok(true) means a new block was loaded; Ok(false) means @@ -240,112 +233,114 @@ // Load the block at `handle` into `self.current_block` fn load_block(&mut self, handle: &[u8]) -> Result<()> { let (new_block_handle, _) = BlockHandle::decode(handle); + let block = self.table.read_block(&new_block_handle)?; - let block = try!(self.table.read_block(&new_block_handle)); - - self.current_block = block.block.iter(); + self.current_block = Some(block.iter()); self.current_block_off = new_block_handle.offset(); Ok(()) } } -impl<'a, R: Read + Seek> Iterator for TableIterator<'a, R> { - type Item = (Vec<u8>, Vec<u8>); - - fn next(&mut self) -> Option<Self::Item> { - // init essentially means that `current_block` is a data block (it's initially filled with - // an index block as filler). - if self.init { - if let Some((key, val)) = self.current_block.next() { - Some((key, val)) - } else { - match self.skip_to_next_entry() { - Ok(true) => self.next(), - Ok(false) => None, - // try next block, this might be corruption - Err(_) => self.next(), +impl SSIterator for TableIterator { + fn advance(&mut self) -> bool { + // Uninitialized case. + if self.current_block.is_none() { + match self.skip_to_next_entry() { + Ok(true) => return self.advance(), + Ok(false) => { + self.reset(); + return false; } - } - } else { - match self.skip_to_next_entry() { - Ok(true) => { - // Only initialize if we got an entry - self.init = true; - self.next() - } - Ok(false) => None, // try next block from index, this might be corruption - Err(_) => self.next(), + Err(_) => return self.advance(), } } + + // Initialized case -- does the current block have more entries? + if let Some(ref mut cb) = self.current_block { + if cb.advance() { + return true; + } + } + + // If the current block is exhausted, try loading the next block. + self.current_block = None; + match self.skip_to_next_entry() { + Ok(true) => self.advance(), + Ok(false) => { + self.reset(); + false + } + // try next block, this might be corruption + Err(_) => self.advance(), + } } -} -impl<'a, R: Read + Seek> SSIterator for TableIterator<'a, R> { // A call to valid() after seeking is necessary to ensure that the seek worked (e.g., no error // while reading from disk) fn seek(&mut self, to: &[u8]) { // first seek in index block, rewind by one entry (so we get the next smaller index entry), // then set current_block and seek there - self.index_block.seek(to); - if let Some((past_block, handle)) = self.index_block.current() { - if self.opt.cmp.cmp(to, &past_block) <= Ordering::Equal { + // It's possible that this is a seek past-last; reset in that case. + if let Some((past_block, handle)) = current_key_val(&self.index_block) { + if self.table.opt.cmp.cmp(to, &past_block) <= Ordering::Equal { // ok, found right block: continue if let Ok(()) = self.load_block(&handle) { - self.current_block.seek(to); - self.init = true; + // current_block is always set if load_block() returned Ok. + self.current_block.as_mut().unwrap().seek(to); + return; + } + } + } + // Reached in case of failure. + self.reset(); + } + + fn prev(&mut self) -> bool { + // happy path: current block contains previous entry + if let Some(ref mut cb) = self.current_block { + if cb.prev() { + return true; + } + } + + // Go back one block and look for the last entry in the previous block + if self.index_block.prev() { + if let Some((_, handle)) = current_key_val(&self.index_block) { + if self.load_block(&handle).is_ok() { + self.current_block.as_mut().unwrap().seek_to_last(); + self.current_block.as_ref().unwrap().valid() } else { self.reset(); - return; + false } } else { - self.reset(); - return; + false } } else { - panic!("Unexpected None from current() (bug)"); - } - } - - fn prev(&mut self) -> Option<Self::Item> { - // happy path: current block contains previous entry - if let Some(result) = self.current_block.prev() { - Some(result) - } else { - // Go back one block and look for the last entry in the previous block - if let Some((_, handle)) = self.index_block.prev() { - if self.load_block(&handle).is_ok() { - self.current_block.seek_to_last(); - self.current_block.current() - } else { - self.reset(); - None - } - } else { - None - } + false } } fn reset(&mut self) { self.index_block.reset(); - self.init = false; + self.current_block = None; } - // This iterator is special in that it's valid even before the first call to next(). It behaves - // correctly, though. + // This iterator is special in that it's valid even before the first call to advance(). It + // behaves correctly, though. fn valid(&self) -> bool { - self.init && (self.current_block.valid() || self.index_block.valid()) + self.current_block.is_some() && (self.current_block.as_ref().unwrap().valid()) } - fn current(&self) -> Option<Self::Item> { - if self.init { - self.current_block.current() + fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool { + if let Some(ref cb) = self.current_block { + cb.current(key, val) } else { - None + false } } } @@ -353,45 +348,71 @@ #[cfg(test)] mod tests { use filter::BloomPolicy; - use options::Options; + use options::{self, CompressionType}; use table_builder::TableBuilder; - use iterator::SSIterator; - - use std::io::Cursor; + use test_util::{test_iterator_properties, SSIteratorIter}; + use types::{current_key_val, SSIterator}; use super::*; fn build_data() -> Vec<(&'static str, &'static str)> { - vec![// block 1 - ("abc", "def"), - ("abd", "dee"), - ("bcd", "asa"), - // block 2 - ("bsr", "a00"), - ("xyz", "xxx"), - ("xzz", "yyy"), - // block 3 - ("zzz", "111")] + vec![ + // block 1 + ("abc", "def"), + ("abd", "dee"), + ("bcd", "asa"), + // block 2 + ("bsr", "a00"), + ("xyz", "xxx"), + ("xzz", "yyy"), + // block 3 + ("zzz", "111"), + ] } - // Build a table containing raw keys (no format) - fn build_table() -> (Vec<u8>, usize) { + // Build a table containing raw keys (no format). It returns (vector, length) for convenience + // reason, a call f(v, v.len()) doesn't work for borrowing reasons. + fn build_table(data: Vec<(&'static str, &'static str)>) -> (Vec<u8>, usize) { let mut d = Vec::with_capacity(512); - let mut opt = Options::default(); + let mut opt = options::for_test(); opt.block_restart_interval = 2; opt.block_size = 32; + opt.compression_type = CompressionType::CompressionSnappy; { // Uses the standard comparator in opt. - let mut b = TableBuilder::new(opt, &mut d, BloomPolicy::new(4)); - let data = build_data(); + let mut b = TableBuilder::new(opt, &mut d); for &(k, v) in data.iter() { - b.add(k.as_bytes(), v.as_bytes()); + b.add(k.as_bytes(), v.as_bytes()).unwrap(); } - b.finish(); + b.finish().unwrap(); + } + + let size = d.len(); + (d, size) + } + // Build a table containing keys. + fn build_internal_table() -> (Vec<u8>, usize) { + let mut d = Vec::with_capacity(512); + let mut opt = options::for_test(); + opt.block_restart_interval = 1; + opt.block_size = 32; + opt.filter_policy = Rc::new(Box::new(BloomPolicy::new(4))); + + let mut i = 1 as u64; + let data = build_data(); + + { + let mut b = TableBuilder::new(opt, &mut d); + + for &(ref k, ref v) in data.iter() { + b.add(k.as_bytes(), v.as_bytes()).unwrap(); + } + + b.finish().unwrap(); } let size = d.len(); @@ -399,71 +420,94 @@ (d, size) } - #[test] - fn test_table_reader_checksum() { - let (mut src, size) = build_table(); - println!("{}", size); - - src[10] += 1; + fn wrap_buffer(src: Vec<u8>) -> Rc<Box<RandomAccess>> { + Rc::new(Box::new(src)) + } - let mut table = Table::new(Options::default(), - Cursor::new(&src as &[u8]), - size, - BloomPolicy::new(4)) - .unwrap(); + #[test] + fn test_table_approximate_offset() { + let (src, size) = build_table(build_data()); + let mut opt = options::for_test(); + opt.block_size = 32; + let table = Table::new(opt, wrap_buffer(src), size).unwrap(); + let mut iter = table.iter(); - assert!(table.filters.is_some()); - assert_eq!(table.filters.as_ref().unwrap().num(), 1); - - { - let iter = table.iter(); - // first block is skipped - assert_eq!(iter.count(), 4); + let expected_offsets = vec![0, 0, 0, 44, 44, 44, 89]; + let mut i = 0; + for (k, _) in SSIteratorIter::wrap(&mut iter) { + assert_eq!(expected_offsets[i], table.approx_offset_of(&k)); + i += 1; } - { - let iter = table.iter(); + // Key-past-last returns offset of metaindex block. + assert_eq!(137, table.approx_offset_of("{aa".as_bytes())); + } + + #[test] + fn test_table_block_cache_use() { + let (src, size) = build_table(build_data()); + let mut opt = options::for_test(); + opt.block_size = 32; - for (k, _) in iter { - if k == build_data()[5].0.as_bytes() { - return; - } - } + let table = Table::new(opt.clone(), wrap_buffer(src), size).unwrap(); + let mut iter = table.iter(); - panic!("Should have hit 5th record in table!"); - } + // index/metaindex blocks are not cached. That'd be a waste of memory. + assert_eq!(opt.block_cache.borrow().count(), 0); + iter.next(); + assert_eq!(opt.block_cache.borrow().count(), 1); + // This may fail if block parameters or data change. In that case, adapt it. + iter.next(); + iter.next(); + iter.next(); + iter.next(); + assert_eq!(opt.block_cache.borrow().count(), 2); } #[test] fn test_table_iterator_fwd_bwd() { - let (src, size) = build_table(); + let (src, size) = build_table(build_data()); let data = build_data(); - let mut table = Table::new(Options::default(), - Cursor::new(&src as &[u8]), - size, - BloomPolicy::new(4)) - .unwrap(); + let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap(); let mut iter = table.iter(); let mut i = 0; while let Some((k, v)) = iter.next() { - assert_eq!((data[i].0.as_bytes(), data[i].1.as_bytes()), - (k.as_ref(), v.as_ref())); + assert_eq!( + (data[i].0.as_bytes(), data[i].1.as_bytes()), + (k.as_ref(), v.as_ref()) + ); i += 1; } assert_eq!(i, data.len()); - assert!(iter.next().is_none()); + assert!(!iter.valid()); + // Go forward again, to last entry. + while let Some((key, _)) = iter.next() { + if key.as_slice() == b"zzz" { + break; + } + } + + assert!(iter.valid()); // backwards count let mut j = 0; - while let Some((k, v)) = iter.prev() { - j += 1; - assert_eq!((data[data.len() - 1 - j].0.as_bytes(), - data[data.len() - 1 - j].1.as_bytes()), - (k.as_ref(), v.as_ref())); + while iter.prev() { + if let Some((k, v)) = current_key_val(&iter) { + j += 1; + assert_eq!( + ( + data[data.len() - 1 - j].0.as_bytes(), + data[data.len() - 1 - j].1.as_bytes() + ), + (k.as_ref(), v.as_ref()) + ); + } else { + break; + } } // expecting 7 - 1, because the last entry that the iterator stopped on is the last entry @@ -473,21 +517,17 @@ #[test] fn test_table_iterator_filter() { - let (src, size) = build_table(); + let (src, size) = build_table(build_data()); - let mut table = Table::new(Options::default(), - Cursor::new(&src as &[u8]), - size, - BloomPolicy::new(4)) - .unwrap(); + let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap(); + assert!(table.filters.is_some()); let filter_reader = table.filters.clone().unwrap(); let mut iter = table.iter(); loop { if let Some((k, _)) = iter.next() { assert!(filter_reader.key_may_match(iter.current_block_off, &k)); - assert!(!filter_reader.key_may_match(iter.current_block_off, - "somerandomkey".as_bytes())); + assert!(!filter_reader.key_may_match(iter.current_block_off, b"somerandomkey")); } else { break; } @@ -496,47 +536,48 @@ #[test] fn test_table_iterator_state_behavior() { - let (src, size) = build_table(); + let (src, size) = build_table(build_data()); - let mut table = Table::new(Options::default(), - Cursor::new(&src as &[u8]), - size, - BloomPolicy::new(4)) - .unwrap(); + let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap(); let mut iter = table.iter(); // behavior test // See comment on valid() assert!(!iter.valid()); - assert!(iter.current().is_none()); - assert!(iter.prev().is_none()); + assert!(current_key_val(&iter).is_none()); + assert!(!iter.prev()); - assert!(iter.next().is_some()); - let first = iter.current(); + assert!(iter.advance()); + let first = current_key_val(&iter); assert!(iter.valid()); - assert!(iter.current().is_some()); + assert!(current_key_val(&iter).is_some()); - assert!(iter.next().is_some()); - assert!(iter.prev().is_some()); - assert!(iter.current().is_some()); + assert!(iter.advance()); + assert!(iter.prev()); + assert!(iter.valid()); iter.reset(); assert!(!iter.valid()); - assert!(iter.current().is_none()); + assert!(current_key_val(&iter).is_none()); assert_eq!(first, iter.next()); } #[test] + fn test_table_iterator_behavior_standard() { + let mut data = build_data(); + data.truncate(4); + let (src, size) = build_table(data); + let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap(); + test_iterator_properties(table.iter()); + } + + #[test] fn test_table_iterator_values() { - let (src, size) = build_table(); + let (src, size) = build_table(build_data()); let data = build_data(); - let mut table = Table::new(Options::default(), - Cursor::new(&src as &[u8]), - size, - BloomPolicy::new(4)) - .unwrap(); + let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap(); let mut iter = table.iter(); let mut i = 0; @@ -548,9 +589,11 @@ loop { iter.prev(); - if let Some((k, v)) = iter.current() { - assert_eq!((data[i].0.as_bytes(), data[i].1.as_bytes()), - (k.as_ref(), v.as_ref())); + if let Some((k, v)) = current_key_val(&iter) { + assert_eq!( + (data[i].0.as_bytes(), data[i].1.as_bytes()), + (k.as_ref(), v.as_ref()) + ); } else { break; } @@ -567,40 +610,88 @@ #[test] fn test_table_iterator_seek() { - let (src, size) = build_table(); + let (src, size) = build_table(build_data()); - let mut table = Table::new(Options::default(), - Cursor::new(&src as &[u8]), - size, - BloomPolicy::new(4)) - .unwrap(); + let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap(); let mut iter = table.iter(); - iter.seek("bcd".as_bytes()); + iter.seek(b"bcd"); + assert!(iter.valid()); + assert_eq!( + current_key_val(&iter), + Some((b"bcd".to_vec(), b"asa".to_vec())) + ); + iter.seek(b"abc"); assert!(iter.valid()); - assert_eq!(iter.current(), - Some(("bcd".as_bytes().to_vec(), "asa".as_bytes().to_vec()))); - iter.seek("abc".as_bytes()); + assert_eq!( + current_key_val(&iter), + Some((b"abc".to_vec(), b"def".to_vec())) + ); + + // Seek-past-last invalidates. + iter.seek("{{{".as_bytes()); + assert!(!iter.valid()); + iter.seek(b"bbb"); assert!(iter.valid()); - assert_eq!(iter.current(), - Some(("abc".as_bytes().to_vec(), "def".as_bytes().to_vec()))); } #[test] fn test_table_get() { - let (src, size) = build_table(); + let (src, size) = build_table(build_data()); + + let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap(); + let table2 = table.clone(); + + let mut _iter = table.iter(); + // Test that all of the table's entries are reachable via get() + for (k, v) in SSIteratorIter::wrap(&mut _iter) { + let r = table2.get(&k); + assert_eq!(Ok(Some((k, v))), r); + } + + assert_eq!(table.opt.block_cache.borrow().count(), 3); - let mut table = Table::new(Options::default(), - Cursor::new(&src as &[u8]), - size, - BloomPolicy::new(4)) - .unwrap(); + // test that filters work and don't return anything at all. + assert!(table.get(b"aaa").unwrap().is_none()); + assert!(table.get(b"aaaa").unwrap().is_none()); + assert!(table.get(b"aa").unwrap().is_none()); + assert!(table.get(b"abcd").unwrap().is_none()); + assert!(table.get(b"abb").unwrap().is_none()); + assert!(table.get(b"zzy").unwrap().is_none()); + assert!(table.get(b"zz1").unwrap().is_none()); + assert!(table.get("zz{".as_bytes()).unwrap().is_none()); + } + + #[test] + fn test_table_reader_checksum() { + let (mut src, size) = build_table(build_data()); - assert!(table.get("aaa".as_bytes()).is_none()); - assert_eq!(table.get("abc".as_bytes()), Some("def".as_bytes().to_vec())); - assert!(table.get("abcd".as_bytes()).is_none()); - assert_eq!(table.get("bcd".as_bytes()), Some("asa".as_bytes().to_vec())); - assert_eq!(table.get("zzz".as_bytes()), Some("111".as_bytes().to_vec())); - assert!(table.get("zz1".as_bytes()).is_none()); + src[10] += 1; + + let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap(); + + assert!(table.filters.is_some()); + assert_eq!(table.filters.as_ref().unwrap().num(), 1); + + { + let mut _iter = table.iter(); + let iter = SSIteratorIter::wrap(&mut _iter); + // first block is skipped + assert_eq!(iter.count(), 4); + } + + { + let mut _iter = table.iter(); + let iter = SSIteratorIter::wrap(&mut _iter); + + for (k, _) in iter { + if k == build_data()[5].0.as_bytes() { + return; + } + } + + panic!("Should have hit 5th record in table!"); + } } + }
--- a/src/types.rs Sun Mar 11 11:29:39 2018 +0100 +++ b/src/types.rs Sun Mar 11 16:00:32 2018 +0100 @@ -1,75 +1,241 @@ -#![allow(dead_code)] +//! A collection of fundamental and/or simple types used by other modules. A bit of a grab bag :-) + +use error::{err, Result, StatusCode}; + +use std::cell::RefCell; +use std::rc::Rc; -//! A collection of fundamental and/or simple types used by other modules +pub trait RandomAccess { + fn read_at(&self, off: usize, dst: &mut [u8]) -> Result<usize>; +} + +/// BufferBackedFile is a simple type implementing RandomAccess on a Vec<u8>. +pub type BufferBackedFile = Vec<u8>; -use std::error::Error; -use std::fmt::{self, Display, Formatter}; -use std::io; -use std::result; +impl RandomAccess for BufferBackedFile { + fn read_at(&self, off: usize, dst: &mut [u8]) -> Result<usize> { + if off > self.len() { + return Ok(0); + } + let remaining = self.len() - off; + let to_read = if dst.len() > remaining { + remaining + } else { + dst.len() + }; + (&mut dst[0..to_read]).copy_from_slice(&self[off..off + to_read]); + Ok(to_read) + } +} -#[derive(Debug, PartialOrd, PartialEq)] -pub enum ValueType { - TypeDeletion = 0, - TypeValue = 1, -} +pub const NUM_LEVELS: usize = 7; /// Represents a sequence number of a single entry. pub type SequenceNumber = u64; pub const MAX_SEQUENCE_NUMBER: SequenceNumber = (1 << 56) - 1; -#[derive(Clone, Debug)] -#[allow(dead_code)] -pub enum Status { - OK, - NotFound(String), - Corruption(String), - NotSupported(String), - InvalidArgument(String), - PermissionDenied(String), - IOError(String), - Unknown(String), +/// A shared thingy with interior mutability. +pub type Shared<T> = Rc<RefCell<T>>; + +pub fn share<T>(t: T) -> Rc<RefCell<T>> { + Rc::new(RefCell::new(t)) +} + +#[derive(PartialEq)] +pub enum Direction { + Forward, + Reverse, +} + +/// Denotes a key range +pub struct Range<'a> { + pub start: &'a [u8], + pub limit: &'a [u8], } -impl Display for Status { - fn fmt(&self, fmt: &mut Formatter) -> result::Result<(), fmt::Error> { - fmt.write_str(self.description()) +/// An extension of the standard `Iterator` trait that supports some methods necessary for LevelDB. +/// This works because the iterators used are stateful and keep the last returned element. +/// +/// Note: Implementing types are expected to hold `!valid()` before the first call to `advance()`. +/// +/// test_util::test_iterator_properties() verifies that all properties hold. +pub trait SSIterator { + /// Advances the position of the iterator by one element (which can be retrieved using + /// current(). If no more elements are available, advance() returns false, and the iterator + /// becomes invalid (i.e. as if reset() had been called). + fn advance(&mut self) -> bool; + /// Return the current item (i.e. the item most recently returned by `next()`). + fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool; + /// Seek the iterator to `key` or the next bigger key. If the seek is invalid (past last + /// element, or before first element), the iterator is `reset()` and not valid. + fn seek(&mut self, key: &[u8]); + /// Resets the iterator to be `!valid()`, i.e. positioned before the first element. + fn reset(&mut self); + /// Returns true if the iterator is not positioned before the first or after the last element, + /// i.e. if `current()` would succeed. + fn valid(&self) -> bool; + /// Go to the previous item; if the iterator is moved beyond the first element, `prev()` + /// returns false and it will be `!valid()`. This is inefficient for most iterator + /// implementations. + fn prev(&mut self) -> bool; + + // default implementations. + + /// next is like Iterator::next(). It's implemented here because Rust disallows implementing a + /// foreign trait for any type, thus we can't do `impl<T: SSIterator> Iterator<Item=Vec<u8>> + /// for T {}`. + fn next(&mut self) -> Option<(Vec<u8>, Vec<u8>)> { + if !self.advance() { + return None; + } + let (mut key, mut val) = (vec![], vec![]); + if self.current(&mut key, &mut val) { + Some((key, val)) + } else { + None + } + } + + /// seek_to_first seeks to the first element. + fn seek_to_first(&mut self) { + self.reset(); + self.advance(); + } +} + +/// current_key_val is a helper allocating two vectors and filling them with the current key/value +/// of the specified iterator. +pub fn current_key_val<It: SSIterator + ?Sized>(it: &It) -> Option<(Vec<u8>, Vec<u8>)> { + let (mut k, mut v) = (vec![], vec![]); + if it.current(&mut k, &mut v) { + Some((k, v)) + } else { + None + } +} + +impl SSIterator for Box<SSIterator> { + fn advance(&mut self) -> bool { + self.as_mut().advance() + } + fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool { + self.as_ref().current(key, val) + } + fn seek(&mut self, key: &[u8]) { + self.as_mut().seek(key) + } + fn reset(&mut self) { + self.as_mut().reset() + } + fn valid(&self) -> bool { + self.as_ref().valid() + } + fn prev(&mut self) -> bool { + self.as_mut().prev() } } -impl Error for Status { - fn description(&self) -> &str { - match *self { - Status::OK => "ok", - Status::NotFound(ref s) => s, - Status::Corruption(ref s) => s, - Status::NotSupported(ref s) => s, - Status::InvalidArgument(ref s) => s, - Status::PermissionDenied(ref s) => s, - Status::IOError(ref s) => s, - Status::Unknown(ref s) => s, - } - } +/// The unique (sequential) number of a file. +pub type FileNum = u64; + +/// Describes a file on disk. +#[derive(Clone, Debug, Default, PartialEq)] +pub struct FileMetaData { + // default: size / 16384. + pub allowed_seeks: usize, + pub num: FileNum, + pub size: usize, + // these are in InternalKey format: + pub smallest: Vec<u8>, + pub largest: Vec<u8>, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum FileType { + Log, + DBLock, + Table, + Descriptor, + Current, + Temp, + InfoLog, } -/// LevelDB's result type -pub type Result<T> = result::Result<T, Status>; - -pub fn from_io_result<T>(e: io::Result<T>) -> Result<T> { - match e { - Ok(r) => result::Result::Ok(r), - Err(e) => { - let err = e.description().to_string(); +pub fn parse_file_name(f: &str) -> Result<(FileNum, FileType)> { + if f == "CURRENT" { + return Ok((0, FileType::Current)); + } else if f == "LOCK" { + return Ok((0, FileType::DBLock)); + } else if f == "LOG" || f == "LOG.old" { + return Ok((0, FileType::InfoLog)); + } else if f.starts_with("MANIFEST-") { + if let Some(ix) = f.find('-') { + if let Ok(num) = FileNum::from_str_radix(&f[ix + 1..], 10) { + return Ok((num, FileType::Descriptor)); + } + return err( + StatusCode::InvalidArgument, + "manifest file number is invalid", + ); + } + return err(StatusCode::InvalidArgument, "manifest file has no dash"); + } else if let Some(ix) = f.find('.') { + // 00012345.log 00123.sst ... + if let Ok(num) = FileNum::from_str_radix(&f[0..ix], 10) { + let typ = match &f[ix + 1..] { + "log" => FileType::Log, + "sst" | "ldb" => FileType::Table, + "dbtmp" => FileType::Temp, + _ => { + return err( + StatusCode::InvalidArgument, + "unknown numbered file extension", + ) + } + }; + return Ok((num, typ)); + } + return err( + StatusCode::InvalidArgument, + "invalid file number for table or temp file", + ); + } + err(StatusCode::InvalidArgument, "unknown file type") +} - let r = match e.kind() { - io::ErrorKind::NotFound => Err(Status::NotFound(err)), - io::ErrorKind::InvalidData => Err(Status::Corruption(err)), - io::ErrorKind::InvalidInput => Err(Status::InvalidArgument(err)), - io::ErrorKind::PermissionDenied => Err(Status::PermissionDenied(err)), - _ => Err(Status::IOError(err)), - }; +const MASK_DELTA: u32 = 0xa282ead8; + +pub fn mask_crc(c: u32) -> u32 { + (c.wrapping_shr(15) | c.wrapping_shl(17)).wrapping_add(MASK_DELTA) +} + +pub fn unmask_crc(mc: u32) -> u32 { + let rot = mc.wrapping_sub(MASK_DELTA); + (rot.wrapping_shr(17) | rot.wrapping_shl(15)) +} + +#[cfg(test)] +mod tests { + use super::*; - r + #[test] + fn test_types_parse_file_name() { + for c in &[ + ("CURRENT", (0, FileType::Current)), + ("LOCK", (0, FileType::DBLock)), + ("LOG", (0, FileType::InfoLog)), + ("LOG.old", (0, FileType::InfoLog)), + ("MANIFEST-01234", (1234, FileType::Descriptor)), + ("001122.sst", (1122, FileType::Table)), + ("001122.ldb", (1122, FileType::Table)), + ("001122.dbtmp", (1122, FileType::Temp)), + ] { + assert_eq!(parse_file_name(c.0).unwrap(), c.1); } + assert!(parse_file_name("xyz.LOCK").is_err()); + assert!(parse_file_name("01a.sst").is_err()); + assert!(parse_file_name("0011.abc").is_err()); + assert!(parse_file_name("MANIFEST-trolol").is_err()); } }