view src/table_builder.rs @ 341:5a266a55f459

memtable: General clean-up, improvement in seek code
author Lewin Bormann <lbo@spheniscida.de>
date Wed, 04 Oct 2017 19:59:43 +0200
parents 26d6f6e1da7d
children 24a2501732de
line wrap: on
line source

use block::{BlockBuilder, BlockContents};
use blockhandle::BlockHandle;
use cmp::InternalKeyCmp;
use error::Result;
use filter::{InternalFilterPolicy, NoFilterPolicy};
use filter_block::FilterBlockBuilder;
use key_types::InternalKey;
use options::{CompressionType, Options};

use std::cmp::Ordering;
use std::io::Write;
use std::rc::Rc;

use crc::crc32;
use crc::Hasher32;
use integer_encoding::FixedInt;

pub const FOOTER_LENGTH: usize = 40;
pub const FULL_FOOTER_LENGTH: usize = FOOTER_LENGTH + 8;
pub const MAGIC_FOOTER_NUMBER: u64 = 0xdb4775248b80fb57;
pub const MAGIC_FOOTER_ENCODED: [u8; 8] = [0x57, 0xfb, 0x80, 0x8b, 0x24, 0x75, 0x47, 0xdb];

pub const TABLE_BLOCK_COMPRESS_LEN: usize = 1;
pub const TABLE_BLOCK_CKSUM_LEN: usize = 4;

/// Footer is a helper for encoding/decoding a table footer.
#[derive(Debug, Clone)]
pub struct Footer {
    pub meta_index: BlockHandle,
    pub index: BlockHandle,
}

/// A Table footer contains a pointer to the metaindex block, another pointer to the index block,
/// and a magic number:
/// [ { table data ... , METAINDEX blockhandle, INDEX blockhandle, PADDING bytes } = 40 bytes,
/// MAGIC_FOOTER_ENCODED ]
impl Footer {
    pub fn new(metaix: BlockHandle, index: BlockHandle) -> Footer {
        Footer {
            meta_index: metaix,
            index: index,
        }
    }

    pub fn decode(from: &[u8]) -> Footer {
        assert!(from.len() >= FULL_FOOTER_LENGTH);
        assert_eq!(&from[FOOTER_LENGTH..], &MAGIC_FOOTER_ENCODED);
        let (meta, metalen) = BlockHandle::decode(&from[0..]);
        let (ix, _) = BlockHandle::decode(&from[metalen..]);

        Footer {
            meta_index: meta,
            index: ix,
        }
    }

    pub fn encode(&self, to: &mut [u8]) {
        assert!(to.len() >= FOOTER_LENGTH + 8);

        let s1 = self.meta_index.encode_to(to);
        let s2 = self.index.encode_to(&mut to[s1..]);

        for i in s1 + s2..FOOTER_LENGTH {
            to[i] = 0;
        }
        for i in FOOTER_LENGTH..FULL_FOOTER_LENGTH {
            to[i] = MAGIC_FOOTER_ENCODED[i - FOOTER_LENGTH];
        }
    }
}

/// A table consists of DATA BLOCKs, META BLOCKs, a METAINDEX BLOCK, an INDEX BLOCK and a FOOTER.
///
/// DATA BLOCKs, META BLOCKs, INDEX BLOCK and METAINDEX BLOCK are built using the code in
/// the `block` module.
///
/// The FOOTER consists of a BlockHandle wthat points to the metaindex block, another pointing to
/// the index block, padding to fill up to 40 B and at the end the 8B magic number
/// 0xdb4775248b80fb57.

pub struct TableBuilder<Dst: Write> {
    opt: Options,
    dst: Dst,

    offset: usize,
    num_entries: usize,
    prev_block_last_key: Vec<u8>,

    data_block: Option<BlockBuilder>,
    index_block: Option<BlockBuilder>,
    filter_block: Option<FilterBlockBuilder>,
}

impl<Dst: Write> TableBuilder<Dst> {
    pub fn new_no_filter(mut opt: Options, dst: Dst) -> TableBuilder<Dst> {
        opt.filter_policy = Rc::new(Box::new(NoFilterPolicy::new()));
        TableBuilder::new(opt, dst)
    }
}

/// TableBuilder is used for building a new SSTable. It groups entries into blocks,
/// calculating checksums and bloom filters.
impl<Dst: Write> TableBuilder<Dst> {
    /// Create a new table builder.
    /// The comparator in opt will be wrapped in a InternalKeyCmp, and the filter policy
    /// in an InternalFilterPolicy.
    pub fn new(mut opt: Options, dst: Dst) -> TableBuilder<Dst> {
        opt.cmp = Rc::new(Box::new(InternalKeyCmp(opt.cmp.clone())));
        opt.filter_policy = Rc::new(Box::new(InternalFilterPolicy::new(opt.filter_policy)));
        TableBuilder::new_raw(opt, dst)
    }

    /// Like new(), but doesn't wrap the comparator in an InternalKeyCmp (for testing)
    pub fn new_raw(opt: Options, dst: Dst) -> TableBuilder<Dst> {
        TableBuilder {
            opt: opt.clone(),
            dst: dst,
            offset: 0,
            prev_block_last_key: vec![],
            num_entries: 0,
            data_block: Some(BlockBuilder::new(opt.clone())),
            filter_block: Some(FilterBlockBuilder::new(opt.filter_policy.clone())),
            index_block: Some(BlockBuilder::new(opt)),
        }
    }

    pub fn entries(&self) -> usize {
        self.num_entries
    }

    pub fn size_estimate(&self) -> usize {
        let mut size = 0;
        if let Some(ref b) = self.data_block {
            size += b.size_estimate();
        }
        if let Some(ref b) = self.index_block {
            size += b.size_estimate();
        }
        if let Some(ref b) = self.filter_block {
            size += b.size_estimate();
        }
        size + self.offset + FULL_FOOTER_LENGTH
    }

    /// Add a key to the table. The key as to be lexically greater or equal to the last one added.
    pub fn add<'a>(&mut self, key: InternalKey<'a>, val: &[u8]) -> Result<()> {
        assert!(self.data_block.is_some());

        if !self.prev_block_last_key.is_empty() {
            assert!(self.opt.cmp.cmp(&self.prev_block_last_key, key) == Ordering::Less);
        }

        if self.data_block.as_ref().unwrap().size_estimate() > self.opt.block_size {
            self.write_data_block(key)?;
        }

        let dblock = &mut self.data_block.as_mut().unwrap();

        if let Some(ref mut fblock) = self.filter_block {
            fblock.add_key(key);
        }

        self.num_entries += 1;
        dblock.add(key, val);
        Ok(())
    }

    /// Writes an index entry for the current data_block where `next_key` is the first key of the
    /// next block.
    /// Calls write_block() for writing the block to disk.
    fn write_data_block<'b>(&mut self, next_key: InternalKey<'b>) -> Result<()> {
        assert!(self.data_block.is_some());

        let block = self.data_block.take().unwrap();
        let sep = self.opt.cmp.find_shortest_sep(&block.last_key(), next_key);
        self.prev_block_last_key = Vec::from(block.last_key());
        let contents = block.finish();

        let handle = BlockHandle::new(self.offset, contents.len());
        let mut handle_enc = [0 as u8; 16];
        let enc_len = handle.encode_to(&mut handle_enc);

        self.index_block.as_mut().unwrap().add(&sep, &handle_enc[0..enc_len]);
        self.data_block = Some(BlockBuilder::new(self.opt.clone()));

        let ctype = self.opt.compression_type;
        self.write_block(contents, ctype)?;

        if let Some(ref mut fblock) = self.filter_block {
            fblock.start_block(self.offset);
        }

        Ok(())
    }

    /// Calculates the checksum, writes the block to disk and updates the offset.
    fn write_block(&mut self, block: BlockContents, t: CompressionType) -> Result<BlockHandle> {
        // compression is still unimplemented
        assert_eq!(t, CompressionType::CompressionNone);

        let mut buf = [0 as u8; TABLE_BLOCK_CKSUM_LEN];
        let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);

        digest.write(&block);
        digest.write(&[self.opt.compression_type as u8; TABLE_BLOCK_COMPRESS_LEN]);
        digest.sum32().encode_fixed(&mut buf);

        self.dst.write(&block)?;
        self.dst.write(&[t as u8; TABLE_BLOCK_COMPRESS_LEN])?;
        self.dst.write(&buf)?;

        let handle = BlockHandle::new(self.offset, block.len());
        self.offset += block.len() + TABLE_BLOCK_COMPRESS_LEN + TABLE_BLOCK_CKSUM_LEN;

        Ok(handle)
    }

    pub fn finish(mut self) -> Result<usize> {
        assert!(self.data_block.is_some());
        let ctype = self.opt.compression_type;

        // If there's a pending data block, write it
        if self.data_block.as_ref().unwrap().entries() > 0 {
            // Find a key reliably past the last key
            let key_past_last =
                self.opt.cmp.find_short_succ(self.data_block.as_ref().unwrap().last_key());
            self.write_data_block(&key_past_last)?;
        }

        // Create metaindex block
        let mut meta_ix_block = BlockBuilder::new(self.opt.clone());

        if self.filter_block.is_some() {
            // if there's a filter block, write the filter block and add it to the metaindex block.
            let fblock = self.filter_block.take().unwrap();
            let filter_key = format!("filter.{}", fblock.filter_name());
            let fblock_data = fblock.finish();
            let fblock_handle = self.write_block(fblock_data, CompressionType::CompressionNone)?;

            let mut handle_enc = [0 as u8; 16];
            let enc_len = fblock_handle.encode_to(&mut handle_enc);

            meta_ix_block.add(filter_key.as_bytes(), &handle_enc[0..enc_len]);
        }

        // write metaindex block
        let meta_ix = meta_ix_block.finish();
        let meta_ix_handle = self.write_block(meta_ix, ctype)?;

        // write index block
        let index_cont = self.index_block.take().unwrap().finish();
        let ix_handle = self.write_block(index_cont, ctype)?;

        // write footer.
        let footer = Footer::new(meta_ix_handle, ix_handle);
        let mut buf = [0; FULL_FOOTER_LENGTH];
        footer.encode(&mut buf);

        self.offset += self.dst.write(&buf[..])?;
        self.dst.flush()?;
        Ok(self.offset)
    }
}

#[cfg(test)]
mod tests {
    use super::{Footer, TableBuilder};
    use blockhandle::BlockHandle;
    use options;

    #[test]
    fn test_footer() {
        let f = Footer::new(BlockHandle::new(44, 4), BlockHandle::new(55, 5));
        let mut buf = [0; 48];
        f.encode(&mut buf[..]);

        let f2 = Footer::decode(&buf);
        assert_eq!(f2.meta_index.offset(), 44);
        assert_eq!(f2.meta_index.size(), 4);
        assert_eq!(f2.index.offset(), 55);
        assert_eq!(f2.index.size(), 5);

    }

    #[test]
    fn test_table_builder() {
        let mut d = Vec::with_capacity(512);
        let mut opt = options::for_test();
        opt.block_restart_interval = 3;
        let mut b = TableBuilder::new_raw(opt, &mut d);

        let data = vec![("abc", "def"), ("abe", "dee"), ("bcd", "asa"), ("dcc", "a00")];
        let data2 = vec![("abd", "def"), ("abf", "dee"), ("ccd", "asa"), ("dcd", "a00")];

        for i in 0..data.len() {
            b.add(&data[i].0.as_bytes(), &data[i].1.as_bytes()).unwrap();
            b.add(&data2[i].0.as_bytes(), &data2[i].1.as_bytes()).unwrap();
        }

        let estimate = b.size_estimate();

        assert_eq!(143, estimate);
        assert!(b.filter_block.is_some());

        let actual = b.finish().unwrap();
        assert_eq!(233, actual);
    }

    #[test]
    #[should_panic]
    fn test_bad_input() {
        let mut d = Vec::with_capacity(512);
        let mut opt = options::for_test();
        opt.block_restart_interval = 3;
        let mut b = TableBuilder::new_raw(opt, &mut d);

        // Test two equal consecutive keys
        let data = vec![("abc", "def"), ("abc", "dee"), ("bcd", "asa"), ("bsr", "a00")];

        for &(k, v) in data.iter() {
            b.add(k.as_bytes(), v.as_bytes()).unwrap();
        }
        b.finish().unwrap();
    }
}