Mercurial > lbo > hg > leveldb-rs
changeset 68:55abb1f11da8
Implement TableBuilder
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 02 Jul 2016 21:03:20 +0200 |
parents | 313f1e5f4c26 |
children | d6a5c7e8d40f |
files | src/lib.rs src/table_builder.rs |
diffstat | 2 files changed, 316 insertions(+), 1 deletions(-) [+] |
line wrap: on
line diff
--- a/src/lib.rs Sat Jul 02 21:02:54 2016 +0200 +++ b/src/lib.rs Sat Jul 02 21:03:20 2016 +0200 @@ -16,7 +16,7 @@ mod options; mod skipmap; mod snapshot; -mod table; +mod table_builder; mod types; mod write_batch;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/table_builder.rs Sat Jul 02 21:03:20 2016 +0200 @@ -0,0 +1,315 @@ +use block::{BlockBuilder, BlockContents}; +use filter::{FilterPolicy, NoFilterPolicy}; +use filter_block::FilterBlockBuilder; +use options::{CompressionType, Options}; +use types::Comparator; +use blockhandle::BlockHandle; + +use std::io::Write; +use std::cmp::Ordering; + +use crc::crc32; +use crc::Hasher32; +use integer_encoding::FixedInt; + +const FOOTER_LENGTH: usize = 40; +const FULL_FOOTER_LENGTH: usize = FOOTER_LENGTH + 8; +const MAGIC_FOOTER_NUMBER: u64 = 0xdb4775248b80fb57; +const MAGIC_FOOTER_ENCODED: [u8; 8] = [0x57, 0xfb, 0x80, 0x8b, 0x24, 0x75, 0x47, 0xdb]; + +fn find_shortest_sep<C: Comparator>(lo: &[u8], hi: &[u8]) -> Vec<u8> { + let min; + + if lo.len() < hi.len() { + min = lo.len(); + } else { + min = hi.len(); + } + + let mut diff_at = 0; + + while diff_at < min && lo[diff_at] == hi[diff_at] { + diff_at += 1; + } + + if diff_at == min { + return Vec::from(lo); + } else { + if lo[diff_at] < 0xff && lo[diff_at] + 1 < hi[diff_at] { + let mut result = Vec::from(&lo[0..diff_at + 1]); + result[diff_at] += 1; + assert_eq!(C::cmp(&result, hi), Ordering::Less); + return result; + } + return Vec::from(lo); + } +} + +/// Footer is a helper for encoding/decoding a table footer. +pub struct Footer { + pub meta_index: BlockHandle, + pub index: BlockHandle, +} + +impl Footer { + pub fn new(metaix: BlockHandle, index: BlockHandle) -> Footer { + Footer { + meta_index: metaix, + index: index, + } + } + + pub fn decode(from: &[u8]) -> Footer { + assert!(from.len() >= FULL_FOOTER_LENGTH); + assert_eq!(&from[FOOTER_LENGTH..], &MAGIC_FOOTER_ENCODED); + let (meta, metalen) = BlockHandle::decode(&from[0..]); + let (ix, _) = BlockHandle::decode(&from[metalen..]); + + Footer { + meta_index: meta, + index: ix, + } + } + + pub fn encode(&self, to: &mut [u8]) { + assert!(to.len() >= FOOTER_LENGTH + 8); + + let s1 = self.meta_index.encode_to(to); + let s2 = self.index.encode_to(&mut to[s1..]); + + for i in s1 + s2..FOOTER_LENGTH { + to[i] = 0; + } + for i in FOOTER_LENGTH..FULL_FOOTER_LENGTH { + to[i] = MAGIC_FOOTER_ENCODED[i - FOOTER_LENGTH]; + } + } +} + +/// A table consists of DATA BLOCKs, META BLOCKs, a METAINDEX BLOCK, an INDEX BLOCK and a FOOTER. +/// +/// DATA BLOCKs, META BLOCKs, INDEX BLOCK and METAINDEX BLOCK are built using the code in +/// the `block` module. +/// +/// The FOOTER consists of a BlockHandle wthat points to the metaindex block, another pointing to +/// the index block, padding to fill up to 40 B and at the end the 8B magic number +/// 0xdb4775248b80fb57. + +pub struct TableBuilder<'a, C: Comparator, Dst: Write, FilterPol: FilterPolicy> { + o: Options, + cmp: C, + dst: Dst, + + offset: usize, + num_entries: usize, + prev_block_last_key: Vec<u8>, + + data_block: Option<BlockBuilder<C>>, + index_block: Option<BlockBuilder<C>>, + filter_block: Option<FilterBlockBuilder<'a, FilterPol>>, +} + +impl<'a, C: Comparator, Dst: Write> TableBuilder<'a, C, Dst, NoFilterPolicy> { + pub fn new_no_filter(opt: Options, + cmp: C, + dst: Dst) + -> TableBuilder<'a, C, Dst, NoFilterPolicy> { + TableBuilder { + o: opt, + cmp: cmp, + dst: dst, + offset: 0, + prev_block_last_key: vec![], + num_entries: 0, + data_block: Some(BlockBuilder::new(opt, cmp)), + index_block: Some(BlockBuilder::new(opt, cmp)), + filter_block: None, + } + } +} + +impl<'a, C: Comparator, Dst: Write, FilterPol: FilterPolicy> TableBuilder<'a, C, Dst, FilterPol> { + pub fn new(opt: Options, + cmp: C, + dst: Dst, + fpol: FilterPol) + -> TableBuilder<'a, C, Dst, FilterPol> { + TableBuilder { + o: opt, + cmp: cmp, + dst: dst, + offset: 0, + prev_block_last_key: vec![], + num_entries: 0, + data_block: Some(BlockBuilder::new(opt, cmp)), + index_block: Some(BlockBuilder::new(opt, cmp)), + filter_block: Some(FilterBlockBuilder::new(fpol)), + } + } + + + pub fn add(&mut self, key: &'a [u8], val: &[u8]) { + assert!(self.data_block.is_some()); + assert!(self.num_entries == 0 || C::cmp(&self.prev_block_last_key, key) == Ordering::Less); + + + if self.data_block.as_ref().unwrap().size_estimate() > self.o.block_size { + self.write_data_block(key); + } + + let dblock = &mut self.data_block.as_mut().unwrap(); + + if let Some(ref mut fblock) = self.filter_block { + fblock.add_key(key); + } + + self.num_entries += 1; + dblock.add(key, val); + } + + /// Writes an index entry for the current data_block where `next_key` is the first key of the + /// next block. + fn write_data_block(&mut self, next_key: &[u8]) { + assert!(self.data_block.is_some()); + + let block = self.data_block.take().unwrap(); + let sep = find_shortest_sep::<C>(block.last_key(), next_key); + self.prev_block_last_key = Vec::from(block.last_key()); + let contents = block.finish(); + + let handle = BlockHandle::new(self.offset, contents.len()); + let mut handle_enc = [0 as u8; 16]; + let enc_len = handle.encode_to(&mut handle_enc); + + self.index_block.as_mut().unwrap().add(&sep, &handle_enc[0..enc_len]); + self.data_block = Some(BlockBuilder::new(self.o, self.cmp)); + + let ctype = self.o.compression_type; + self.write_block(contents, ctype); + + if let Some(ref mut fblock) = self.filter_block { + fblock.start_block(self.offset); + } + } + + fn write_block(&mut self, c: BlockContents, t: CompressionType) -> BlockHandle { + // compression is still unimplemented + assert_eq!(t, CompressionType::CompressionNone); + + let mut buf = [0 as u8; 4]; + let mut digest = crc32::Digest::new(crc32::CASTAGNOLI); + + digest.write(&c); + digest.write(&[self.o.compression_type as u8; 1]); + digest.sum32().encode_fixed(&mut buf); + + // TODO: Handle errors here. + let _ = self.dst.write(&c); + let _ = self.dst.write(&[t as u8; 1]); + let _ = self.dst.write(&buf); + + self.offset += c.len() + 1 + buf.len(); + + BlockHandle::new(self.offset, c.len()) + } + + pub fn finish(mut self) { + assert!(self.data_block.is_some()); + let ctype = self.o.compression_type; + + // If there's a pending data block, write that one + let flush_last_block = self.data_block.as_ref().unwrap().entries() > 0; + if flush_last_block { + self.write_data_block(&[0xff as u8; 1]); + } + + // Create metaindex block + let mut meta_ix_block = BlockBuilder::new(self.o, self.cmp); + + if self.filter_block.is_some() { + // if there's a filter block, write the filter block and add it to the metaindex block. + let fblock = self.filter_block.take().unwrap(); + let filter_key = format!("filter.{}", fblock.filter_name()); + let fblock_data = fblock.finish(); + let fblock_handle = self.write_block(fblock_data, CompressionType::CompressionNone); + + let mut handle_enc = [0 as u8; 16]; + let enc_len = fblock_handle.encode_to(&mut handle_enc); + + meta_ix_block.add(filter_key.as_bytes(), &handle_enc[0..enc_len]); + } + + // write metaindex block + let meta_ix_handle = self.write_block(meta_ix_block.finish(), ctype); + + // write index block + let index_cont = self.index_block.take().unwrap().finish(); + let ix_handle = self.write_block(index_cont, ctype); + + // write footer. + let footer = Footer::new(meta_ix_handle, ix_handle); + let mut buf = [0; FULL_FOOTER_LENGTH]; + footer.encode(&mut buf); + + self.offset += self.dst.write(&buf[..]).unwrap(); + } +} + +#[cfg(test)] +mod tests { + use super::{find_shortest_sep, Footer, TableBuilder}; + use types::StandardComparator; + use blockhandle::BlockHandle; + use filter::BloomPolicy; + use options::Options; + + #[test] + fn test_shortest_sep() { + assert_eq!(find_shortest_sep::<StandardComparator>("abcd".as_bytes(), "abcf".as_bytes()), + "abce".as_bytes()); + assert_eq!(find_shortest_sep::<StandardComparator>("abcdefghi".as_bytes(), + "abcffghi".as_bytes()), + "abce".as_bytes()); + assert_eq!(find_shortest_sep::<StandardComparator>("a".as_bytes(), "a".as_bytes()), + "a".as_bytes()); + assert_eq!(find_shortest_sep::<StandardComparator>("a".as_bytes(), "b".as_bytes()), + "a".as_bytes()); + assert_eq!(find_shortest_sep::<StandardComparator>("abc".as_bytes(), "zzz".as_bytes()), + "b".as_bytes()); + assert_eq!(find_shortest_sep::<StandardComparator>("".as_bytes(), "".as_bytes()), + "".as_bytes()); + } + + #[test] + fn test_footer() { + let f = Footer::new(BlockHandle::new(44, 4), BlockHandle::new(55, 5)); + let mut buf = [0; 48]; + f.encode(&mut buf[..]); + + let f2 = Footer::decode(&buf); + assert_eq!(f2.meta_index.offset(), 44); + assert_eq!(f2.meta_index.size(), 4); + assert_eq!(f2.index.offset(), 55); + assert_eq!(f2.index.size(), 5); + + } + + #[test] + fn test_table_builder() { + let mut d = Vec::with_capacity(512); + { + let mut opt = Options::default(); + opt.block_restart_interval = 3; + let mut b = TableBuilder::new(opt, StandardComparator, &mut d, BloomPolicy::new(4)); + + let data = vec![("abc", "def"), ("abd", "dee"), ("bcd", "asa"), ("bsr", "a00")]; + + for &(k, v) in data.iter() { + b.add(k.as_bytes(), v.as_bytes()); + } + + b.finish(); + } + println!("{:?}", d); + } +}