Mercurial > lbo > hg > sstable
changeset 25:04468480cdcf
Rebase SSTable code on latest LevelDB development (non-public)
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Tue, 03 Jan 2017 19:28:14 +0100 |
parents | 620ac9082634 |
children | d380295dba2f |
files | src/block.rs src/lib.rs src/options.rs src/table_builder.rs src/table_reader.rs |
diffstat | 5 files changed, 690 insertions(+), 468 deletions(-) [+] |
line wrap: on
line diff
--- a/src/block.rs Mon Jan 02 12:53:29 2017 +0100 +++ b/src/block.rs Tue Jan 03 19:28:14 2017 +0100 @@ -2,8 +2,8 @@ use std::rc::Rc; -use options::BuildOptions; -use iterator::{SSIterator, Comparator}; +use options::Options; +use iterator::SSIterator; use integer_encoding::FixedInt; use integer_encoding::VarInt; @@ -26,19 +26,24 @@ /// A RESTART is a fixed u32 pointing to the beginning of an ENTRY. /// /// N_RESTARTS contains the number of restarts. -pub struct Block<C: Comparator> { +pub struct Block { block: Rc<BlockContents>, - cmp: C, + opt: Options, } -impl<C: Comparator> Block<C> { - pub fn iter(&self) -> BlockIter<C> { +impl Block { + /// Return an iterator over this block. + /// Note that the iterator isn't bound to the block's lifetime; the iterator uses the same + /// refcounted block contents as this block. (meaning also that if the iterator isn't released, + /// the memory occupied by the block isn't, either) + pub fn iter(&self) -> BlockIter { let restarts = u32::decode_fixed(&self.block[self.block.len() - 4..]); let restart_offset = self.block.len() - 4 - 4 * restarts as usize; BlockIter { block: self.block.clone(), - cmp: self.cmp, + opt: self.opt.clone(), + offset: 0, restarts_off: restart_offset, current_entry_offset: 0, @@ -53,47 +58,73 @@ self.block.clone() } - pub fn new(contents: BlockContents, cmp: C) -> Block<C> { + pub fn new(opt: Options, contents: BlockContents) -> Block { assert!(contents.len() > 4); Block { block: Rc::new(contents), - cmp: cmp, + opt: opt, } } } - -#[derive(Debug)] -pub struct BlockIter<C: Comparator> { +pub struct BlockIter { + /// The underlying block contents. + /// TODO: Maybe (probably...) this needs an Arc. block: Rc<BlockContents>, - cmp: C, - // start of next entry + opt: Options, + /// offset of restarts area within the block. + restarts_off: usize, + + /// start of next entry to be parsed. offset: usize, - // offset of restarts area - restarts_off: usize, + /// offset of the current entry. current_entry_offset: usize, - // tracks the last restart we encountered + /// index of the most recent restart. current_restart_ix: usize, - // We assemble the key from two parts usually, so we keep the current full key here. + /// We assemble the key from two parts usually, so we keep the current full key here. key: Vec<u8>, + /// Offset of the current value within the block. val_offset: usize, } -impl<C: Comparator> BlockIter<C> { +impl BlockIter { + /// Return the number of restarts in this block. fn number_restarts(&self) -> usize { u32::decode_fixed(&self.block[self.block.len() - 4..]) as usize } + /// Seek to restart point `ix`. After the seek, current() will return the entry at that restart + /// point. + fn seek_to_restart_point(&mut self, ix: usize) { + let off = self.get_restart_point(ix); + + self.offset = off; + self.current_entry_offset = off; + self.current_restart_ix = ix; + // advances self.offset to point to the next entry + let (shared, non_shared, _, head_len) = self.parse_entry_and_advance(); + + assert_eq!(shared, 0); + + self.assemble_key(off + head_len, shared, non_shared); + } + + /// Return the offset that restart `ix` points to. fn get_restart_point(&self, ix: usize) -> usize { let restart = self.restarts_off + 4 * ix; u32::decode_fixed(&self.block[restart..restart + 4]) as usize } -} -impl<C: Comparator> BlockIter<C> { - // Returns SHARED, NON_SHARED and VALSIZE from the current position. Advances self.offset. - fn parse_entry(&mut self) -> (usize, usize, usize) { + /// The layout of an entry is + /// [SHARED varint, NON_SHARED varint, VALSIZE varint, KEY (NON_SHARED bytes), + /// VALUE (VALSIZE bytes)]. + /// + /// Returns SHARED, NON_SHARED, VALSIZE and [length of length spec] from the current position, + /// where 'length spec' is the length of the three values in the entry header, as described + /// above. + /// Advances self.offset to point to the beginning of the next entry. + fn parse_entry_and_advance(&mut self) -> (usize, usize, usize, usize) { let mut i = 0; let (shared, sharedlen) = usize::decode_var(&self.block[self.offset..]); i += sharedlen; @@ -104,89 +135,104 @@ let (valsize, valsizelen) = usize::decode_var(&self.block[self.offset + i..]); i += valsizelen; - self.offset += i; + self.val_offset = self.offset + i + non_shared; + self.offset = self.offset + i + non_shared + valsize; - (shared, non_shared, valsize) + (shared, non_shared, valsize, i) } - /// offset is assumed to be at the beginning of the non-shared key part. - /// offset is not advanced. - fn assemble_key(&mut self, shared: usize, non_shared: usize) { + /// Assemble the current key from shared and non-shared parts (an entry usually contains only + /// the part of the key that is different from the previous key). + /// + /// `off` is the offset of the key string within the whole block (self.current_entry_offset + /// + entry header length); `shared` and `non_shared` are the lengths of the shared + /// respectively non-shared parts of the key. + /// Only self.key is mutated. + fn assemble_key(&mut self, off: usize, shared: usize, non_shared: usize) { self.key.resize(shared, 0); - self.key.extend_from_slice(&self.block[self.offset..self.offset + non_shared]); + self.key.extend_from_slice(&self.block[off..off + non_shared]); } pub fn seek_to_last(&mut self) { if self.number_restarts() > 0 { - let restart = self.get_restart_point(self.number_restarts() - 1); - self.offset = restart; - self.current_entry_offset = restart; - self.current_restart_ix = self.number_restarts() - 1; + let num_restarts = self.number_restarts(); + self.seek_to_restart_point(num_restarts - 1); } else { self.reset(); } while let Some((_, _)) = self.next() { } - - self.prev(); } } -impl<C: Comparator> Iterator for BlockIter<C> { +impl Iterator for BlockIter { type Item = (Vec<u8>, Vec<u8>); fn next(&mut self) -> Option<Self::Item> { - self.current_entry_offset = self.offset; - - if self.current_entry_offset >= self.restarts_off { + if self.offset >= self.restarts_off { + self.offset = self.restarts_off; + // current_entry_offset is left at the offset of the last entry return None; + } else { + self.current_entry_offset = self.offset; } - let (shared, non_shared, valsize) = self.parse_entry(); - self.assemble_key(shared, non_shared); + + let current_off = self.current_entry_offset; - self.val_offset = self.offset + non_shared; - self.offset = self.val_offset + valsize; + let (shared, non_shared, valsize, entry_head_len) = self.parse_entry_and_advance(); + self.assemble_key(current_off + entry_head_len, shared, non_shared); + // Adjust current_restart_ix let num_restarts = self.number_restarts(); while self.current_restart_ix + 1 < num_restarts && self.get_restart_point(self.current_restart_ix + 1) < self.current_entry_offset { self.current_restart_ix += 1; } + Some((self.key.clone(), Vec::from(&self.block[self.val_offset..self.val_offset + valsize]))) } } -impl<C: Comparator> SSIterator for BlockIter<C> { +impl SSIterator for BlockIter { fn reset(&mut self) { self.offset = 0; + self.val_offset = 0; self.current_restart_ix = 0; self.key.clear(); - self.val_offset = 0; } + fn prev(&mut self) -> Option<Self::Item> { // as in the original implementation -- seek to last restart point, then look for key - let current_offset = self.current_entry_offset; + let orig_offset = self.current_entry_offset; // At the beginning, can't go further back - if current_offset == 0 { + if orig_offset == 0 { self.reset(); return None; } - while self.get_restart_point(self.current_restart_ix) >= current_offset { + while self.get_restart_point(self.current_restart_ix) >= orig_offset { + // todo: double check this + if self.current_restart_ix == 0 { + self.offset = self.restarts_off; + self.current_restart_ix = self.number_restarts(); + break; + } self.current_restart_ix -= 1; } self.offset = self.get_restart_point(self.current_restart_ix); - assert!(self.offset < current_offset); + assert!(self.offset < orig_offset); let mut result; + // Stop if the next entry would be the original one (self.offset always points to the start + // of the next entry) loop { result = self.next(); - if self.offset >= current_offset { + if self.offset >= orig_offset { break; } } @@ -206,19 +252,14 @@ // Do a binary search over the restart points. while left < right { let middle = (left + right + 1) / 2; - self.offset = self.get_restart_point(middle); - // advances self.offset - let (shared, non_shared, _) = self.parse_entry(); + self.seek_to_restart_point(middle); - // At a restart, the shared part is supposed to be 0. - assert_eq!(shared, 0); + let c = self.opt.cmp.cmp(&self.key, to); - let cmp = self.cmp.cmp(to, &self.block[self.offset..self.offset + non_shared]); - - if cmp == Ordering::Less { + if c == Ordering::Less { + left = middle; + } else { right = middle - 1; - } else { - left = middle; } } @@ -228,7 +269,7 @@ // Linear search from here on while let Some((k, _)) = self.next() { - if self.cmp.cmp(k.as_slice(), to) >= Ordering::Equal { + if self.opt.cmp.cmp(k.as_slice(), to) >= Ordering::Equal { return; } } @@ -247,9 +288,8 @@ } } -pub struct BlockBuilder<C: Comparator> { - opt: BuildOptions, - cmp: C, +pub struct BlockBuilder { + opt: Options, buffer: Vec<u8>, restarts: Vec<u32>, @@ -257,15 +297,14 @@ counter: usize, } -impl<C: Comparator> BlockBuilder<C> { - pub fn new(o: BuildOptions, cmp: C) -> BlockBuilder<C> { +impl BlockBuilder { + pub fn new(o: Options) -> BlockBuilder { let mut restarts = vec![0]; restarts.reserve(1023); BlockBuilder { buffer: Vec::with_capacity(o.block_size), opt: o, - cmp: cmp, restarts: restarts, last_key: Vec::new(), counter: 0, @@ -295,7 +334,7 @@ pub fn add(&mut self, key: &[u8], val: &[u8]) { assert!(self.counter <= self.opt.block_restart_interval); assert!(self.buffer.is_empty() || - self.cmp.cmp(self.last_key.as_slice(), key) == Ordering::Less); + self.opt.cmp.cmp(self.last_key.as_slice(), key) == Ordering::Less); let mut shared = 0; @@ -333,8 +372,6 @@ self.last_key.resize(shared, 0); self.last_key.extend_from_slice(&key[shared..]); - // assert_eq!(&self.last_key[..], key); - self.counter += 1; } @@ -360,7 +397,7 @@ #[cfg(test)] mod tests { use super::*; - use iterator::*; + use iterator::SSIterator; use options::*; fn get_data() -> Vec<(&'static [u8], &'static [u8])> { @@ -374,10 +411,10 @@ #[test] fn test_block_builder() { - let mut o = BuildOptions::default(); + let mut o = Options::default(); o.block_restart_interval = 3; - let mut builder = BlockBuilder::new(o, StandardComparator); + let mut builder = BlockBuilder::new(o); for &(k, v) in get_data().iter() { builder.add(k, v); @@ -387,33 +424,19 @@ let block = builder.finish(); assert_eq!(block.len(), 149); - - } - - #[test] - fn test_block_builder_reset() { - let o = BuildOptions::default(); - - let mut builder = BlockBuilder::new(o, StandardComparator); - - builder.add("abc".as_bytes(), "def".as_bytes()); - - assert!(builder.size_estimate() > 4); - builder.reset(); - assert_eq!(builder.size_estimate(), 4); } #[test] fn test_block_empty() { - let mut o = BuildOptions::default(); + let mut o = Options::default(); o.block_restart_interval = 16; - let builder = BlockBuilder::new(o, StandardComparator); + let builder = BlockBuilder::new(o); let blockc = builder.finish(); assert_eq!(blockc.len(), 8); assert_eq!(blockc, vec![0, 0, 0, 0, 1, 0, 0, 0]); - let block = Block::new(blockc, StandardComparator); + let block = Block::new(Options::default(), blockc); for _ in block.iter() { panic!("expected 0 iterations"); @@ -423,48 +446,39 @@ #[test] fn test_block_build_iterate() { let data = get_data(); - let mut builder = BlockBuilder::new(BuildOptions::default(), StandardComparator); + let mut builder = BlockBuilder::new(Options::default()); for &(k, v) in data.iter() { builder.add(k, v); } let block_contents = builder.finish(); - let block = Block::new(block_contents, StandardComparator); - let block_iter = block.iter(); + let block = Block::new(Options::default(), block_contents).iter(); let mut i = 0; - assert!(!block_iter.valid()); + assert!(!block.valid()); - for (k, v) in block_iter { + for (k, v) in block { assert_eq!(&k[..], data[i].0); assert_eq!(v, data[i].1); i += 1; } assert_eq!(i, data.len()); - - let mut block_iter = block.iter(); - - block_iter.reset(); - assert!(block_iter.next().is_some()); - assert!(block_iter.valid()); - block_iter.reset(); - assert!(!block_iter.valid()); } #[test] fn test_block_iterate_reverse() { - let mut o = BuildOptions::default(); + let mut o = Options::default(); o.block_restart_interval = 3; let data = get_data(); - let mut builder = BlockBuilder::new(o, StandardComparator); + let mut builder = BlockBuilder::new(o.clone()); for &(k, v) in data.iter() { builder.add(k, v); } let block_contents = builder.finish(); - let mut block = Block::new(block_contents, StandardComparator).iter(); + let mut block = Block::new(o.clone(), block_contents).iter(); assert!(!block.valid()); assert_eq!(block.next(), @@ -478,15 +492,26 @@ Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec()))); block.prev(); assert!(!block.valid()); + + // Verify that prev() from the last entry goes to the prev-to-last entry + // (essentially, that next() returning None doesn't advance anything) + + while let Some(_) = block.next() { + } + + block.prev(); + assert!(block.valid()); + assert_eq!(block.current(), + Some(("prefix_key2".as_bytes().to_vec(), "value".as_bytes().to_vec()))); } #[test] fn test_block_seek() { - let mut o = BuildOptions::default(); + let mut o = Options::default(); o.block_restart_interval = 3; let data = get_data(); - let mut builder = BlockBuilder::new(o, StandardComparator); + let mut builder = BlockBuilder::new(o.clone()); for &(k, v) in data.iter() { builder.add(k, v); @@ -494,7 +519,7 @@ let block_contents = builder.finish(); - let mut block = Block::new(block_contents, StandardComparator).iter(); + let mut block = Block::new(o.clone(), block_contents).iter(); block.seek(&"prefix_key2".as_bytes()); assert!(block.valid()); @@ -510,18 +535,28 @@ assert!(block.valid()); assert_eq!(block.current(), Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec()))); + + block.seek(&"prefix_key3".as_bytes()); + assert!(block.valid()); + assert_eq!(block.current(), + Some(("prefix_key3".as_bytes().to_vec(), "value".as_bytes().to_vec()))); + + block.seek(&"prefix_key8".as_bytes()); + assert!(block.valid()); + assert_eq!(block.current(), + Some(("prefix_key3".as_bytes().to_vec(), "value".as_bytes().to_vec()))); } #[test] fn test_block_seek_to_last() { - let mut o = BuildOptions::default(); + let mut o = Options::default(); // Test with different number of restarts for block_restart_interval in vec![2, 6, 10] { o.block_restart_interval = block_restart_interval; let data = get_data(); - let mut builder = BlockBuilder::new(o, StandardComparator); + let mut builder = BlockBuilder::new(o.clone()); for &(k, v) in data.iter() { builder.add(k, v); @@ -529,7 +564,7 @@ let block_contents = builder.finish(); - let mut block = Block::new(block_contents, StandardComparator).iter(); + let mut block = Block::new(o.clone(), block_contents).iter(); block.seek_to_last(); assert!(block.valid());
--- a/src/lib.rs Mon Jan 02 12:53:29 2017 +0100 +++ b/src/lib.rs Tue Jan 03 19:28:14 2017 +0100 @@ -3,6 +3,11 @@ mod block; mod blockhandle; +mod cmp; +mod filter; +mod filter_block; +mod key_types; +mod types; pub mod iterator; pub mod options; @@ -11,7 +16,7 @@ pub use iterator::StandardComparator; pub use iterator::SSIterator; -pub use options::{BuildOptions, ReadOptions}; +pub use options::{Options, ReadOptions}; pub use table_builder::TableBuilder; pub use table_reader::{Table, TableIterator};
--- a/src/options.rs Mon Jan 02 12:53:29 2017 +0100 +++ b/src/options.rs Tue Jan 03 19:28:14 2017 +0100 @@ -1,4 +1,14 @@ +use cmp::{Cmp, DefaultCmp}; +use types::SequenceNumber; + use std::default::Default; +use std::sync::Arc; + +const KB: usize = 1 << 10; +const MB: usize = KB * KB; + +const BLOCK_MAX_SIZE: usize = 4 * KB; +const WRITE_BUFFER_SIZE: usize = 4 * MB; #[derive(Clone, Copy, PartialEq, Debug)] pub enum CompressionType { @@ -10,37 +20,66 @@ match i { 0 => Some(CompressionType::CompressionNone), 1 => Some(CompressionType::CompressionSnappy), - _ => None + _ => None, } } /// [not all member types implemented yet] /// -#[derive(Clone, Copy)] -pub struct BuildOptions { +#[derive(Clone)] +pub struct Options { + pub cmp: Arc<Box<Cmp>>, + pub create_if_missing: bool, + pub error_if_exists: bool, + pub paranoid_checks: bool, + // pub logger: Logger, + pub write_buffer_size: usize, + pub max_open_files: usize, pub block_size: usize, pub block_restart_interval: usize, - // Note: Compression is not implemented. pub compression_type: CompressionType, + pub reuse_logs: bool, } -impl Default for BuildOptions { - fn default() -> BuildOptions { - BuildOptions { - block_size: 4 * (1 << 10), +impl Default for Options { + fn default() -> Options { + Options { + cmp: Arc::new(Box::new(DefaultCmp)), + create_if_missing: true, + error_if_exists: false, + paranoid_checks: false, + write_buffer_size: WRITE_BUFFER_SIZE, + max_open_files: 1 << 10, + block_size: BLOCK_MAX_SIZE, block_restart_interval: 16, + reuse_logs: false, compression_type: CompressionType::CompressionNone, } } } -#[derive(Clone, Copy)] +/// Supplied to DB read operations. pub struct ReadOptions { - pub skip_bad_blocks: bool, + pub verify_checksums: bool, + pub snapshot: Option<SequenceNumber>, } impl Default for ReadOptions { - fn default() -> ReadOptions { - ReadOptions { skip_bad_blocks: true } + fn default() -> Self { + ReadOptions { + verify_checksums: false, + snapshot: None, + } } } + +/// Supplied to write operations +pub struct WriteOptions { + pub sync: bool, +} + +impl Default for WriteOptions { + fn default() -> Self { + WriteOptions { sync: false } + } +}
--- a/src/table_builder.rs Mon Jan 02 12:53:29 2017 +0100 +++ b/src/table_builder.rs Tue Jan 03 19:28:14 2017 +0100 @@ -1,12 +1,14 @@ use block::{BlockBuilder, BlockContents}; use blockhandle::BlockHandle; -use options::{CompressionType, BuildOptions}; -use iterator::{Comparator, StandardComparator}; +use cmp::InternalKeyCmp; +use filter::{BoxedFilterPolicy, NoFilterPolicy}; +use filter_block::FilterBlockBuilder; +use key_types::InternalKey; +use options::{CompressionType, Options}; -use std::io::{Result, Write}; -use std::fs::{File, OpenOptions}; -use std::path::Path; +use std::io::Write; use std::cmp::Ordering; +use std::sync::Arc; use crc::crc32; use crc::Hasher32; @@ -20,59 +22,44 @@ pub const TABLE_BLOCK_COMPRESS_LEN: usize = 1; pub const TABLE_BLOCK_CKSUM_LEN: usize = 4; -fn find_shortest_sep<C: Comparator>(c: &C, lo: &[u8], hi: &[u8]) -> Vec<u8> { - let min; - - if lo.len() < hi.len() { - min = lo.len(); - } else { - min = hi.len(); - } - - let mut diff_at = 0; - - while diff_at < min && lo[diff_at] == hi[diff_at] { - diff_at += 1; - } - - if diff_at == min { - return Vec::from(lo); - } else { - if lo[diff_at] < 0xff && lo[diff_at] + 1 < hi[diff_at] { - let mut result = Vec::from(&lo[0..diff_at + 1]); - result[diff_at] += 1; - assert_eq!(c.cmp(&result, hi), Ordering::Less); - return result; - } - return Vec::from(lo); - } -} - /// Footer is a helper for encoding/decoding a table footer. #[derive(Debug)] pub struct Footer { + pub meta_index: BlockHandle, pub index: BlockHandle, } +/// A Table footer contains a pointer to the metaindex block, another pointer to the index block, +/// and a magic number: +/// [ { table data ... , METAINDEX blockhandle, INDEX blockhandle, PADDING bytes } = 40 bytes, +/// MAGIC_FOOTER_ENCODED ] impl Footer { - pub fn new(index: BlockHandle) -> Footer { - Footer { index: index } + pub fn new(metaix: BlockHandle, index: BlockHandle) -> Footer { + Footer { + meta_index: metaix, + index: index, + } } pub fn decode(from: &[u8]) -> Footer { assert!(from.len() >= FULL_FOOTER_LENGTH); assert_eq!(&from[FOOTER_LENGTH..], &MAGIC_FOOTER_ENCODED); - let (ix, _) = BlockHandle::decode(&from[0..]); + let (meta, metalen) = BlockHandle::decode(&from[0..]); + let (ix, _) = BlockHandle::decode(&from[metalen..]); - Footer { index: ix } + Footer { + meta_index: meta, + index: ix, + } } pub fn encode(&self, to: &mut [u8]) { assert!(to.len() >= FOOTER_LENGTH + 8); - let s1 = self.index.encode_to(&mut to[0..]); + let s1 = self.meta_index.encode_to(to); + let s2 = self.index.encode_to(&mut to[s1..]); - for i in s1..FOOTER_LENGTH { + for i in s1 + s2..FOOTER_LENGTH { to[i] = 0; } for i in FOOTER_LENGTH..FULL_FOOTER_LENGTH { @@ -81,91 +68,94 @@ } } -/// A table consists of DATA BLOCKs, an INDEX BLOCK and a FOOTER. +/// A table consists of DATA BLOCKs, META BLOCKs, a METAINDEX BLOCK, an INDEX BLOCK and a FOOTER. /// -/// DATA BLOCKs, INDEX BLOCKs, and BLOCKs are built using the code in the `block` module. +/// DATA BLOCKs, META BLOCKs, INDEX BLOCK and METAINDEX BLOCK are built using the code in +/// the `block` module. /// -/// DATA BLOCKs contain the actual data, and a footer of [4B crc32; 1B compression]; -/// INDEX BLOCKS contain one entry per block, where the key is a string after the -/// last key of a block, and the value is a encoded BlockHandle pointing to that -/// block. -/// -/// The footer is a pointer pointing to the index block, padding to fill up to 40 B and at the end -/// the 8B magic number 0xdb4775248b80fb57. -/// -pub struct TableBuilder<C: Comparator, Dst: Write> { - o: BuildOptions, - cmp: C, +/// The FOOTER consists of a BlockHandle wthat points to the metaindex block, another pointing to +/// the index block, padding to fill up to 40 B and at the end the 8B magic number +/// 0xdb4775248b80fb57. + +pub struct TableBuilder<'a, Dst: Write> { + opt: Options, dst: Dst, offset: usize, num_entries: usize, prev_block_last_key: Vec<u8>, - data_block: Option<BlockBuilder<C>>, - index_block: Option<BlockBuilder<C>>, + data_block: Option<BlockBuilder>, + index_block: Option<BlockBuilder>, + filter_block: Option<FilterBlockBuilder<'a>>, } -impl<Dst: Write> TableBuilder<StandardComparator, Dst> { - /// Create a new TableBuilder with default comparator and BuildOptions. - pub fn new_defaults(dst: Dst) -> TableBuilder<StandardComparator, Dst> { - TableBuilder::new(dst, BuildOptions::default(), StandardComparator) +impl<'a, Dst: Write> TableBuilder<'a, Dst> { + pub fn new_no_filter(opt: Options, dst: Dst) -> TableBuilder<'a, Dst> { + TableBuilder::new(opt, dst, NoFilterPolicy::new()) } } -impl TableBuilder<StandardComparator, File> { - /// Open/create a file for writing a table. - /// This will truncate the file, if it exists. - pub fn new_to_file(file: &Path) -> Result<TableBuilder<StandardComparator, File>> { - let f = try!(OpenOptions::new().create(true).write(true).truncate(true).open(file)); - Ok(TableBuilder::new(f, BuildOptions::default(), StandardComparator)) +/// TableBuilder is used for building a new SSTable. It groups entries into blocks, +/// calculating checksums and bloom filters. +/// It's recommended that you use InternalFilterPolicy as FilterPol, as that policy extracts the +/// underlying user keys from the InternalKeys used as keys in the table. +impl<'a, Dst: Write> TableBuilder<'a, Dst> { + /// Create a new table builder. + /// The comparator in opt will be wrapped in a InternalKeyCmp. + pub fn new(mut opt: Options, dst: Dst, fpol: BoxedFilterPolicy) -> TableBuilder<'a, Dst> { + opt.cmp = Arc::new(Box::new(InternalKeyCmp(opt.cmp.clone()))); + TableBuilder::new_raw(opt, dst, fpol) } -} -impl<C: Comparator, Dst: Write> TableBuilder<C, Dst> { - /// Create a new TableBuilder. - pub fn new(dst: Dst, opt: BuildOptions, cmp: C) -> TableBuilder<C, Dst> { + /// Like new(), but doesn't wrap the comparator in an InternalKeyCmp (for testing) + pub fn new_raw(opt: Options, dst: Dst, fpol: BoxedFilterPolicy) -> TableBuilder<'a, Dst> { TableBuilder { - o: opt, - cmp: cmp, + opt: opt.clone(), dst: dst, offset: 0, prev_block_last_key: vec![], num_entries: 0, - data_block: Some(BlockBuilder::new(opt, cmp)), - index_block: Some(BlockBuilder::new(opt, cmp)), + data_block: Some(BlockBuilder::new(opt.clone())), + index_block: Some(BlockBuilder::new(opt)), + filter_block: Some(FilterBlockBuilder::new(fpol)), } } - /// Returns how many entries have been written. pub fn entries(&self) -> usize { self.num_entries } - /// Add an entry to this table. The key must be lexicographically greater than the last entry - /// written. - pub fn add(&mut self, key: &[u8], val: &[u8]) { + /// Add a key to the table. The key as to be lexically greater or equal to the last one added. + pub fn add(&mut self, key: InternalKey<'a>, val: &[u8]) { assert!(self.data_block.is_some()); - assert!(self.num_entries == 0 || - self.cmp.cmp(&self.prev_block_last_key, key) == Ordering::Less); - if self.data_block.as_ref().unwrap().size_estimate() > self.o.block_size { + if !self.prev_block_last_key.is_empty() { + assert!(self.opt.cmp.cmp(&self.prev_block_last_key, key) == Ordering::Less); + } + + if self.data_block.as_ref().unwrap().size_estimate() > self.opt.block_size { self.write_data_block(key); } let dblock = &mut self.data_block.as_mut().unwrap(); + if let Some(ref mut fblock) = self.filter_block { + fblock.add_key(key); + } + self.num_entries += 1; dblock.add(key, val); } /// Writes an index entry for the current data_block where `next_key` is the first key of the - /// next block. Then, write the actual block to disk. - fn write_data_block(&mut self, next_key: &[u8]) { + /// next block. + /// Calls write_block() for writing the block to disk. + fn write_data_block<'b>(&mut self, next_key: InternalKey<'b>) { assert!(self.data_block.is_some()); let block = self.data_block.take().unwrap(); - let sep = find_shortest_sep(&self.cmp, block.last_key(), next_key); + let sep = self.opt.cmp.find_shortest_sep(&block.last_key(), next_key); self.prev_block_last_key = Vec::from(block.last_key()); let contents = block.finish(); @@ -174,54 +164,79 @@ let enc_len = handle.encode_to(&mut handle_enc); self.index_block.as_mut().unwrap().add(&sep, &handle_enc[0..enc_len]); - self.data_block = Some(BlockBuilder::new(self.o, self.cmp)); + self.data_block = Some(BlockBuilder::new(self.opt.clone())); + + let ctype = self.opt.compression_type; - let ctype = self.o.compression_type; self.write_block(contents, ctype); + + if let Some(ref mut fblock) = self.filter_block { + fblock.start_block(self.offset); + } } - /// Writes a block to disk, with a trailing 4 byte CRC checksum. - fn write_block(&mut self, c: BlockContents, t: CompressionType) -> BlockHandle { + /// Calculates the checksum, writes the block to disk and updates the offset. + fn write_block(&mut self, block: BlockContents, t: CompressionType) -> BlockHandle { // compression is still unimplemented assert_eq!(t, CompressionType::CompressionNone); - let mut buf = [0 as u8; 4]; + let mut buf = [0 as u8; TABLE_BLOCK_CKSUM_LEN]; let mut digest = crc32::Digest::new(crc32::CASTAGNOLI); - digest.write(&c); - digest.write(&[self.o.compression_type as u8; 1]); + digest.write(&block); + digest.write(&[self.opt.compression_type as u8; TABLE_BLOCK_COMPRESS_LEN]); digest.sum32().encode_fixed(&mut buf); // TODO: Handle errors here. - let _ = self.dst.write(&c); - let _ = self.dst.write(&[t as u8; 1]); + let _ = self.dst.write(&block); + let _ = self.dst.write(&[t as u8; TABLE_BLOCK_COMPRESS_LEN]); let _ = self.dst.write(&buf); - let handle = BlockHandle::new(self.offset, c.len()); + let handle = BlockHandle::new(self.offset, block.len()); - self.offset += c.len() + 1 + buf.len(); + self.offset += block.len() + TABLE_BLOCK_COMPRESS_LEN + TABLE_BLOCK_CKSUM_LEN; handle } - /// Finish building this table. This *must* be called at the end, otherwise not all data may - /// land on disk. pub fn finish(mut self) { assert!(self.data_block.is_some()); - let ctype = self.o.compression_type; + let ctype = self.opt.compression_type; + + // If there's a pending data block, write it + if self.data_block.as_ref().unwrap().entries() > 0 { + // Find a key reliably past the last key + let key_past_last = + self.opt.cmp.find_short_succ(self.data_block.as_ref().unwrap().last_key()); + self.write_data_block(&key_past_last); + } + + // Create metaindex block + let mut meta_ix_block = BlockBuilder::new(self.opt.clone()); - // If there's a pending data block, write that one - let flush_last_block = self.data_block.as_ref().unwrap().entries() > 0; - if flush_last_block { - self.write_data_block(&[0xff as u8; 1]); + if self.filter_block.is_some() { + // if there's a filter block, write the filter block and add it to the metaindex block. + let fblock = self.filter_block.take().unwrap(); + let filter_key = format!("filter.{}", fblock.filter_name()); + let fblock_data = fblock.finish(); + let fblock_handle = self.write_block(fblock_data, CompressionType::CompressionNone); + + let mut handle_enc = [0 as u8; 16]; + let enc_len = fblock_handle.encode_to(&mut handle_enc); + + meta_ix_block.add(filter_key.as_bytes(), &handle_enc[0..enc_len]); } + // write metaindex block + let meta_ix = meta_ix_block.finish(); + let meta_ix_handle = self.write_block(meta_ix, ctype); + // write index block let index_cont = self.index_block.take().unwrap().finish(); let ix_handle = self.write_block(index_cont, ctype); // write footer. - let footer = Footer::new(ix_handle); + let footer = Footer::new(meta_ix_handle, ix_handle); let mut buf = [0; FULL_FOOTER_LENGTH]; footer.encode(&mut buf); @@ -231,36 +246,20 @@ #[cfg(test)] mod tests { - use super::{find_shortest_sep, Footer, TableBuilder}; - use iterator::StandardComparator; + use super::{Footer, TableBuilder}; use blockhandle::BlockHandle; - use options::BuildOptions; - - #[test] - fn test_shortest_sep() { - assert_eq!(find_shortest_sep(&StandardComparator, "abcd".as_bytes(), "abcf".as_bytes()), - "abce".as_bytes()); - assert_eq!(find_shortest_sep(&StandardComparator, - "abcdefghi".as_bytes(), - "abcffghi".as_bytes()), - "abce".as_bytes()); - assert_eq!(find_shortest_sep(&StandardComparator, "a".as_bytes(), "a".as_bytes()), - "a".as_bytes()); - assert_eq!(find_shortest_sep(&StandardComparator, "a".as_bytes(), "b".as_bytes()), - "a".as_bytes()); - assert_eq!(find_shortest_sep(&StandardComparator, "abc".as_bytes(), "zzz".as_bytes()), - "b".as_bytes()); - assert_eq!(find_shortest_sep(&StandardComparator, "".as_bytes(), "".as_bytes()), - "".as_bytes()); - } + use filter::BloomPolicy; + use options::Options; #[test] fn test_footer() { - let f = Footer::new(BlockHandle::new(55, 5)); + let f = Footer::new(BlockHandle::new(44, 4), BlockHandle::new(55, 5)); let mut buf = [0; 48]; f.encode(&mut buf[..]); let f2 = Footer::decode(&buf); + assert_eq!(f2.meta_index.offset(), 44); + assert_eq!(f2.meta_index.size(), 4); assert_eq!(f2.index.offset(), 55); assert_eq!(f2.index.size(), 5); @@ -269,9 +268,9 @@ #[test] fn test_table_builder() { let mut d = Vec::with_capacity(512); - let mut opt = BuildOptions::default(); + let mut opt = Options::default(); opt.block_restart_interval = 3; - let mut b = TableBuilder::new(&mut d, opt, StandardComparator); + let mut b = TableBuilder::new_raw(opt, &mut d, BloomPolicy::new(4)); let data = vec![("abc", "def"), ("abd", "dee"), ("bcd", "asa"), ("bsr", "a00")]; @@ -279,6 +278,7 @@ b.add(k.as_bytes(), v.as_bytes()); } + assert!(b.filter_block.is_some()); b.finish(); } @@ -286,9 +286,9 @@ #[should_panic] fn test_bad_input() { let mut d = Vec::with_capacity(512); - let mut opt = BuildOptions::default(); + let mut opt = Options::default(); opt.block_restart_interval = 3; - let mut b = TableBuilder::new(&mut d, opt, StandardComparator); + let mut b = TableBuilder::new_raw(opt, &mut d, BloomPolicy::new(4)); // Test two equal consecutive keys let data = vec![("abc", "def"), ("abc", "dee"), ("bcd", "asa"), ("bsr", "a00")];
--- a/src/table_reader.rs Mon Jan 02 12:53:29 2017 +0100 +++ b/src/table_reader.rs Tue Jan 03 19:28:14 2017 +0100 @@ -1,17 +1,19 @@ use block::{Block, BlockIter}; use blockhandle::BlockHandle; +use filter::{BoxedFilterPolicy, InternalFilterPolicy}; +use filter_block::FilterBlockReader; +use key_types::InternalKey; +use cmp::InternalKeyCmp; +use options::{self, CompressionType, Options}; use table_builder::{self, Footer}; -use iterator::{Comparator, StandardComparator, SSIterator}; -use options::{self, ReadOptions, CompressionType}; - -use integer_encoding::FixedInt; -use crc::crc32; -use crc::Hasher32; +use iterator::SSIterator; use std::cmp::Ordering; use std::io::{self, Read, Seek, SeekFrom, Result}; -use std::fs::{File, OpenOptions}; -use std::path::Path; +use std::sync::Arc; + +use integer_encoding::FixedInt; +use crc::crc32::{self, Hasher32}; /// Reads the table footer. fn read_footer<R: Read + Seek>(f: &mut R, size: usize) -> Result<Footer> { @@ -32,36 +34,36 @@ Ok(buf) } -/// Reads a block at location. -fn read_block<R: Read + Seek, C: Comparator>(cmp: &C, - f: &mut R, - location: &BlockHandle) - -> Result<TableBlock<C>> { - // The block is denoted by offset and length in BlockHandle. A block in an encoded - // table is followed by 1B compression type and 4B checksum. - let buf = try!(read_bytes(f, location)); - let compress = try!(read_bytes(f, - &BlockHandle::new(location.offset() + location.size(), - table_builder::TABLE_BLOCK_COMPRESS_LEN))); - let cksum = try!(read_bytes(f, - &BlockHandle::new(location.offset() + location.size() + - table_builder::TABLE_BLOCK_COMPRESS_LEN, - table_builder::TABLE_BLOCK_CKSUM_LEN))); - Ok(TableBlock { - block: Block::new(buf, *cmp), - checksum: u32::decode_fixed(&cksum), - compression: options::int_to_compressiontype(compress[0] as u32) - .unwrap_or(CompressionType::CompressionNone), - }) -} - -struct TableBlock<C: Comparator> { - block: Block<C>, +struct TableBlock { + block: Block, checksum: u32, compression: CompressionType, } -impl<C: Comparator> TableBlock<C> { +impl TableBlock { + /// Reads a block at location. + fn read_block<R: Read + Seek>(opt: Options, + f: &mut R, + location: &BlockHandle) + -> Result<TableBlock> { + // The block is denoted by offset and length in BlockHandle. A block in an encoded + // table is followed by 1B compression type and 4B checksum. + let buf = try!(read_bytes(f, location)); + let compress = try!(read_bytes(f, + &BlockHandle::new(location.offset() + location.size(), + table_builder::TABLE_BLOCK_COMPRESS_LEN))); + let cksum = try!(read_bytes(f, + &BlockHandle::new(location.offset() + location.size() + + table_builder::TABLE_BLOCK_COMPRESS_LEN, + table_builder::TABLE_BLOCK_CKSUM_LEN))); + Ok(TableBlock { + block: Block::new(opt, buf), + checksum: u32::decode_fixed(&cksum), + compression: options::int_to_compressiontype(compress[0] as u32) + .unwrap_or(CompressionType::CompressionNone), + }) + } + /// Verify checksum of block fn verify(&self) -> bool { let mut digest = crc32::Digest::new(crc32::CASTAGNOLI); @@ -72,54 +74,72 @@ } } -pub struct Table<R: Read + Seek, C: Comparator> { +pub struct Table<R: Read + Seek> { file: R, - file_size: usize, - opt: ReadOptions, - cmp: C, + opt: Options, - indexblock: Block<C>, -} - -impl<R: Read + Seek> Table<R, StandardComparator> { - /// Open a table for reading. - pub fn new_defaults(file: R, size: usize) -> Result<Table<R, StandardComparator>> { - Table::new(file, size, ReadOptions::default(), StandardComparator) - } + footer: Footer, + indexblock: Block, + #[allow(dead_code)] + filters: Option<FilterBlockReader>, } -impl Table<File, StandardComparator> { - /// Directly open a file for reading. - pub fn new_from_file(file: &Path) -> Result<Table<File, StandardComparator>> { - let f = try!(OpenOptions::new().read(true).open(file)); - let len = try!(f.metadata()).len() as usize; - - Table::new(f, len, ReadOptions::default(), StandardComparator) - } -} - -impl<R: Read + Seek, C: Comparator> Table<R, C> { - /// Open a table for reading. Note: The comparator must be the same that was chosen when - /// building the table. - pub fn new(mut file: R, size: usize, opt: ReadOptions, cmp: C) -> Result<Table<R, C>> { +impl<R: Read + Seek> Table<R> { + /// Creates a new table reader operating on unformatted keys (i.e., UserKey). + fn new_raw(opt: Options, mut file: R, size: usize, fp: BoxedFilterPolicy) -> Result<Table<R>> { let footer = try!(read_footer(&mut file, size)); - let indexblock = try!(read_block(&cmp, &mut file, &footer.index)); + let indexblock = try!(TableBlock::read_block(opt.clone(), &mut file, &footer.index)); + let metaindexblock = + try!(TableBlock::read_block(opt.clone(), &mut file, &footer.meta_index)); + + if !indexblock.verify() || !metaindexblock.verify() { + return Err(io::Error::new(io::ErrorKind::InvalidData, + "Indexblock/Metaindexblock failed verification")); + } + + // Open filter block for reading + let mut filter_block_reader = None; + let filter_name = format!("filter.{}", fp.name()).as_bytes().to_vec(); + + let mut metaindexiter = metaindexblock.block.iter(); + + metaindexiter.seek(&filter_name); + + if let Some((_key, val)) = metaindexiter.current() { + let filter_block_location = BlockHandle::decode(&val).0; + + if filter_block_location.size() > 0 { + let buf = try!(read_bytes(&mut file, &filter_block_location)); + filter_block_reader = Some(FilterBlockReader::new_owned(fp, buf)); + } + } + + metaindexiter.reset(); Ok(Table { file: file, - file_size: size, opt: opt, - cmp: cmp, + footer: footer, + filters: filter_block_reader, indexblock: indexblock.block, }) } - fn read_block(&mut self, location: &BlockHandle) -> Result<TableBlock<C>> { - let b = try!(read_block(&self.cmp, &mut self.file, location)); + /// Creates a new table reader operating on internal keys (i.e., InternalKey). This means that + /// a different comparator (internal_key_cmp) and a different filter policy + /// (InternalFilterPolicy) are used. + pub fn new(mut opt: Options, file: R, size: usize, fp: BoxedFilterPolicy) -> Result<Table<R>> { + opt.cmp = Arc::new(Box::new(InternalKeyCmp(opt.cmp.clone()))); + let t = try!(Table::new_raw(opt, file, size, InternalFilterPolicy::new(fp))); + Ok(t) + } - if !b.verify() && self.opt.skip_bad_blocks { + fn read_block(&mut self, location: &BlockHandle) -> Result<TableBlock> { + let b = try!(TableBlock::read_block(self.opt.clone(), &mut self.file, location)); + + if !b.verify() { Err(io::Error::new(io::ErrorKind::InvalidData, "Data block failed verification")) } else { Ok(b) @@ -137,128 +157,138 @@ return location.offset(); } - return self.file_size; + return self.footer.meta_index.offset(); } // Iterators read from the file; thus only one iterator can be borrowed (mutably) per scope - pub fn iter<'a>(&'a mut self) -> TableIterator<'a, R, C> { + fn iter<'a>(&'a mut self) -> TableIterator<'a, R> { let iter = TableIterator { - current_block: self.indexblock.iter(), // just for filling in here + current_block: self.indexblock.iter(), + init: false, + current_block_off: 0, index_block: self.indexblock.iter(), + opt: self.opt.clone(), table: self, - init: false, }; iter } - /// Retrieve an entry from the table. - /// - /// Note: As this doesn't keep state, using a TableIterator and seek() may be more efficient - /// when retrieving several entries from the same underlying block. - pub fn get(&mut self, key: &[u8]) -> Option<Vec<u8>> { + /// Retrieve value from table. This function uses the attached filters, so is better suited if + /// you frequently look for non-existing values (as it will detect the non-existence of an + /// entry in a block without having to load the block). + pub fn get<'a>(&mut self, to: InternalKey<'a>) -> Option<Vec<u8>> { let mut iter = self.iter(); - iter.seek(key); + iter.seek(to); if let Some((k, v)) = iter.current() { - if k == key { - return Some(v); - } + if k == to { Some(v) } else { None } + } else { + None } - return None; + + // TODO: replace this with a more efficient method using filters } } -/// Iterator over a Table. -pub struct TableIterator<'a, R: 'a + Read + Seek, C: 'a + Comparator> { - table: &'a mut Table<R, C>, - current_block: BlockIter<C>, - index_block: BlockIter<C>, - +/// This iterator is a "TwoLevelIterator"; it uses an index block in order to get an offset hint +/// into the data blocks. +pub struct TableIterator<'a, R: 'a + Read + Seek> { + table: &'a mut Table<R>, + opt: Options, + // We're not using Option<BlockIter>, but instead a separate `init` field. That makes it easier + // working with the current block in the iterator methods (no borrowing annoyance as with + // Option<>) + current_block: BlockIter, + current_block_off: usize, init: bool, + index_block: BlockIter, } -impl<'a, C: Comparator, R: Read + Seek> TableIterator<'a, R, C> { - /// Skips to the entry referenced by the next entry in the index block. - /// This is called once a block has run out of entries. - /// Returns Ok(false) if the end has been reached, returns Err(...) if it should be retried - /// (e.g., because there's a corrupted block) +impl<'a, R: Read + Seek> TableIterator<'a, R> { + // Skips to the entry referenced by the next entry in the index block. + // This is called once a block has run out of entries. + // Err means corruption or I/O error; Ok(true) means a new block was loaded; Ok(false) means + // tht there's no more entries. fn skip_to_next_entry(&mut self) -> Result<bool> { if let Some((_key, val)) = self.index_block.next() { - let r = self.load_block(&val); - - if let Err(e) = r { Err(e) } else { Ok(true) } + self.load_block(&val).map(|_| true) } else { Ok(false) } } - /// Load the block at `handle` into `self.current_block` + // Load the block at `handle` into `self.current_block` fn load_block(&mut self, handle: &[u8]) -> Result<()> { let (new_block_handle, _) = BlockHandle::decode(handle); let block = try!(self.table.read_block(&new_block_handle)); + self.current_block = block.block.iter(); + self.current_block_off = new_block_handle.offset(); Ok(()) } } -impl<'a, C: Comparator, R: Read + Seek> Iterator for TableIterator<'a, R, C> { +impl<'a, R: Read + Seek> Iterator for TableIterator<'a, R> { type Item = (Vec<u8>, Vec<u8>); fn next(&mut self) -> Option<Self::Item> { - if !self.init { - return match self.skip_to_next_entry() { + // init essentially means that `current_block` is a data block (it's initially filled with + // an index block as filler). + if self.init { + if let Some((key, val)) = self.current_block.next() { + Some((key, val)) + } else { + match self.skip_to_next_entry() { + Ok(true) => self.next(), + Ok(false) => None, + // try next block, this might be corruption + Err(_) => self.next(), + } + } + } else { + match self.skip_to_next_entry() { Ok(true) => { + // Only initialize if we got an entry self.init = true; self.next() } Ok(false) => None, - Err(_) => self.next(), - }; - } - if let Some((key, val)) = self.current_block.next() { - Some((key, val)) - } else { - match self.skip_to_next_entry() { - Ok(true) => self.next(), - Ok(false) => None, + // try next block from index, this might be corruption Err(_) => self.next(), } } } } -impl<'a, C: Comparator, R: Read + Seek> SSIterator for TableIterator<'a, R, C> { +impl<'a, R: Read + Seek> SSIterator for TableIterator<'a, R> { // A call to valid() after seeking is necessary to ensure that the seek worked (e.g., no error // while reading from disk) fn seek(&mut self, to: &[u8]) { - // first seek in index block, then set current_block and seek there + // first seek in index block, rewind by one entry (so we get the next smaller index entry), + // then set current_block and seek there self.index_block.seek(to); - if let Some((k, _)) = self.index_block.current() { - if self.table.cmp.cmp(to, &k) <= Ordering::Equal { - // ok, found right block: continue below + if let Some((past_block, handle)) = self.index_block.current() { + if self.opt.cmp.cmp(to, &past_block) <= Ordering::Equal { + // ok, found right block: continue + if let Ok(()) = self.load_block(&handle) { + self.current_block.seek(to); + self.init = true; + } else { + self.reset(); + return; + } } else { self.reset(); + return; } } else { panic!("Unexpected None from current() (bug)"); } - - // Read block and seek to entry in that block - if let Some((k, handle)) = self.index_block.current() { - assert!(self.table.cmp.cmp(to, &k) <= Ordering::Equal); - - if let Ok(()) = self.load_block(&handle) { - self.current_block.seek(to); - self.init = true; - } else { - self.reset(); - } - } } fn prev(&mut self) -> Option<Self::Item> { @@ -284,9 +314,6 @@ fn reset(&mut self) { self.index_block.reset(); self.init = false; - - while let Err(_) = self.skip_to_next_entry() { - } } // This iterator is special in that it's valid even before the first call to next(). It behaves @@ -296,39 +323,50 @@ } fn current(&self) -> Option<Self::Item> { - self.current_block.current() + if self.init { + self.current_block.current() + } else { + None + } } } #[cfg(test)] mod tests { - use options::{BuildOptions, ReadOptions}; + use filter::BloomPolicy; + use filter::InternalFilterPolicy; + use options::Options; use table_builder::TableBuilder; - use iterator::{StandardComparator, SSIterator}; + use iterator::SSIterator; + use key_types::LookupKey; use std::io::Cursor; use super::*; fn build_data() -> Vec<(&'static str, &'static str)> { - vec![("abc", "def"), + vec![// block 1 + ("abc", "def"), ("abd", "dee"), ("bcd", "asa"), + // block 2 ("bsr", "a00"), ("xyz", "xxx"), ("xzz", "yyy"), + // block 3 ("zzz", "111")] } - + // Build a table containing raw keys (no format) fn build_table() -> (Vec<u8>, usize) { let mut d = Vec::with_capacity(512); - let mut opt = BuildOptions::default(); + let mut opt = Options::default(); opt.block_restart_interval = 2; opt.block_size = 32; { - let mut b = TableBuilder::new(&mut d, opt, StandardComparator); + // Uses the standard comparator in opt. + let mut b = TableBuilder::new_raw(opt, &mut d, BloomPolicy::new(4)); let data = build_data(); for &(k, v) in data.iter() { @@ -336,6 +374,41 @@ } b.finish(); + + } + + let size = d.len(); + + (d, size) + } + + // Build a table containing keys in InternalKey format. + fn build_internal_table() -> (Vec<u8>, usize) { + let mut d = Vec::with_capacity(512); + let mut opt = Options::default(); + opt.block_restart_interval = 1; + opt.block_size = 32; + + let mut i = 0 as u64; + let data: Vec<(Vec<u8>, &'static str)> = build_data() + .into_iter() + .map(|(k, v)| { + i += 1; + (LookupKey::new(k.as_bytes(), i).internal_key().to_vec(), v) + }) + .collect(); + + { + // Uses InternalKeyCmp + let mut b = + TableBuilder::new(opt, &mut d, InternalFilterPolicy::new(BloomPolicy::new(4))); + + for &(ref k, ref v) in data.iter() { + b.add(k.as_slice(), v.as_bytes()); + } + + b.finish(); + } let size = d.len(); @@ -344,100 +417,108 @@ } #[test] - fn test_table_iterator_fwd() { - let (src, size) = build_table(); - let data = build_data(); + fn test_table_reader_checksum() { + let (mut src, size) = build_table(); + println!("{}", size); + + src[10] += 1; + + let mut table = Table::new_raw(Options::default(), + Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4)) + .unwrap(); + + assert!(table.filters.is_some()); + assert_eq!(table.filters.as_ref().unwrap().num(), 1); - let mut table = Table::new(Cursor::new(&src as &[u8]), - size, - ReadOptions::default(), - StandardComparator) - .unwrap(); - let iter = table.iter(); - let mut i = 0; + { + let iter = table.iter(); + // first block is skipped + assert_eq!(iter.count(), 4); + } - for (k, v) in iter { - assert_eq!((data[i].0.as_bytes(), data[i].1.as_bytes()), - (k.as_ref(), v.as_ref())); - i += 1; + { + let iter = table.iter(); + + for (k, _) in iter { + if k == build_data()[5].0.as_bytes() { + return; + } + } + + panic!("Should have hit 5th record in table!"); } } #[test] - fn test_table_data_corruption() { - let (mut src, size) = build_table(); + fn test_table_iterator_fwd_bwd() { + let (src, size) = build_table(); + let data = build_data(); - // Mess with first block - src[28] += 1; - - let mut table = Table::new(Cursor::new(&src as &[u8]), - size, - ReadOptions::default(), - StandardComparator) + let mut table = Table::new_raw(Options::default(), + Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4)) .unwrap(); let mut iter = table.iter(); + let mut i = 0; - // defective blocks are skipped, i.e. we should start with the second block + while let Some((k, v)) = iter.next() { + assert_eq!((data[i].0.as_bytes(), data[i].1.as_bytes()), + (k.as_ref(), v.as_ref())); + i += 1; + } + + assert_eq!(i, data.len()); + assert!(iter.next().is_none()); - assert!(iter.next().is_some()); - assert_eq!(iter.current(), - Some(("bsr".as_bytes().to_vec(), "a00".as_bytes().to_vec()))); - assert!(iter.next().is_some()); - assert_eq!(iter.current(), - Some(("xyz".as_bytes().to_vec(), "xxx".as_bytes().to_vec()))); - assert!(iter.prev().is_some()); - // corrupted blocks are skipped also when reading the other way round - assert!(iter.prev().is_none()); + // backwards count + let mut j = 0; + + while let Some((k, v)) = iter.prev() { + j += 1; + assert_eq!((data[data.len() - 1 - j].0.as_bytes(), + data[data.len() - 1 - j].1.as_bytes()), + (k.as_ref(), v.as_ref())); + } + + // expecting 7 - 1, because the last entry that the iterator stopped on is the last entry + // in the table; that is, it needs to go back over 6 entries. + assert_eq!(j, 6); } #[test] - fn test_table_data_corruption_regardless() { - let mut opt = ReadOptions::default(); - opt.skip_bad_blocks = false; - - let (mut src, size) = build_table(); + fn test_table_iterator_filter() { + let (src, size) = build_table(); - // Mess with first block - src[28] += 1; - - let mut table = Table::new(Cursor::new(&src as &[u8]), size, opt, StandardComparator) + let mut table = Table::new_raw(Options::default(), + Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4)) .unwrap(); + let filter_reader = table.filters.clone().unwrap(); let mut iter = table.iter(); - // defective blocks are NOT skipped! - - assert!(iter.next().is_some()); - assert_eq!(iter.current(), - Some(("abc".as_bytes().to_vec(), "def".as_bytes().to_vec()))); - assert!(iter.next().is_some()); - assert_eq!(iter.current(), - Some(("abd".as_bytes().to_vec(), "dee".as_bytes().to_vec()))); - } - - #[test] - fn test_table_get() { - let (src, size) = build_table(); - - let mut table = Table::new(Cursor::new(&src as &[u8]), - size, - ReadOptions::default(), - StandardComparator) - .unwrap(); - - assert_eq!(table.get("abc".as_bytes()), Some("def".as_bytes().to_vec())); - assert_eq!(table.get("zzz".as_bytes()), Some("111".as_bytes().to_vec())); - assert_eq!(table.get("xzz".as_bytes()), Some("yyy".as_bytes().to_vec())); - assert_eq!(table.get("xzy".as_bytes()), None); + loop { + if let Some((k, _)) = iter.next() { + assert!(filter_reader.key_may_match(iter.current_block_off, &k)); + assert!(!filter_reader.key_may_match(iter.current_block_off, + "somerandomkey".as_bytes())); + } else { + break; + } + } } #[test] fn test_table_iterator_state_behavior() { let (src, size) = build_table(); - let mut table = Table::new(Cursor::new(&src as &[u8]), - size, - ReadOptions::default(), - StandardComparator) + let mut table = Table::new_raw(Options::default(), + Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4)) .unwrap(); let mut iter = table.iter(); @@ -446,8 +527,10 @@ // See comment on valid() assert!(!iter.valid()); assert!(iter.current().is_none()); + assert!(iter.prev().is_none()); assert!(iter.next().is_some()); + let first = iter.current(); assert!(iter.valid()); assert!(iter.current().is_some()); @@ -458,6 +541,7 @@ iter.reset(); assert!(!iter.valid()); assert!(iter.current().is_none()); + assert_eq!(first, iter.next()); } #[test] @@ -465,10 +549,10 @@ let (src, size) = build_table(); let data = build_data(); - let mut table = Table::new(Cursor::new(&src as &[u8]), - size, - ReadOptions::default(), - StandardComparator) + let mut table = Table::new_raw(Options::default(), + Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4)) .unwrap(); let mut iter = table.iter(); let mut i = 0; @@ -478,7 +562,7 @@ // Go back to previous entry, check, go forward two entries, repeat // Verifies that prev/next works well. - while iter.valid() && i < data.len() { + loop { iter.prev(); if let Some((k, v)) = iter.current() { @@ -489,21 +573,23 @@ } i += 1; - iter.next(); - iter.next(); + if iter.next().is_none() || iter.next().is_none() { + break; + } } - assert_eq!(i, 7); + // Skipping the last value because the second next() above will break the loop + assert_eq!(i, 6); } #[test] fn test_table_iterator_seek() { let (src, size) = build_table(); - let mut table = Table::new(Cursor::new(&src as &[u8]), - size, - ReadOptions::default(), - StandardComparator) + let mut table = Table::new_raw(Options::default(), + Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4)) .unwrap(); let mut iter = table.iter(); @@ -516,4 +602,61 @@ assert_eq!(iter.current(), Some(("abc".as_bytes().to_vec(), "def".as_bytes().to_vec()))); } + + #[test] + fn test_table_get() { + let (src, size) = build_table(); + + let mut table = Table::new_raw(Options::default(), + Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4)) + .unwrap(); + + assert!(table.get("aaa".as_bytes()).is_none()); + assert_eq!(table.get("abc".as_bytes()), Some("def".as_bytes().to_vec())); + assert!(table.get("abcd".as_bytes()).is_none()); + assert_eq!(table.get("bcd".as_bytes()), Some("asa".as_bytes().to_vec())); + assert_eq!(table.get("zzz".as_bytes()), Some("111".as_bytes().to_vec())); + assert!(table.get("zz1".as_bytes()).is_none()); + } + + // This test verifies that the table and filters work with internal keys. This means: + // The table contains keys in InternalKey format and it uses a filter wrapped by + // InternalFilterPolicy. + // All the other tests use raw keys that don't have any internal structure; this is fine in + // general, but here we want to see that the other infrastructure works too. + #[test] + fn test_table_internal_keys() { + use key_types::LookupKey; + + let (src, size) = build_internal_table(); + + let mut table = Table::new(Options::default(), + Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4)) + .unwrap(); + let filter_reader = table.filters.clone().unwrap(); + + // Check that we're actually using internal keys + for (ref k, _) in table.iter() { + assert_eq!(k.len(), 3 + 8); + } + + let mut iter = table.iter(); + + loop { + if let Some((k, _)) = iter.next() { + let lk = LookupKey::new(&k, 123); + let userkey = lk.user_key(); + + assert!(filter_reader.key_may_match(iter.current_block_off, userkey)); + assert!(!filter_reader.key_may_match(iter.current_block_off, + "somerandomkey".as_bytes())); + } else { + break; + } + } + } }