view src/table_reader.rs @ 609:68887f0a3e83

Add Compressor
author kaiyohugo <41114603+KAIYOHUGO@users.noreply.github.com>
date Tue, 04 Jul 2023 21:50:32 +0800
parents aef949a87fc2
children
line wrap: on
line source

use crate::block::{Block, BlockIter};
use crate::blockhandle::BlockHandle;
use crate::cache;
use crate::cmp::InternalKeyCmp;
use crate::env::RandomAccess;
use crate::error::{self, err, Result};
use crate::filter;
use crate::filter_block::FilterBlockReader;
use crate::key_types::InternalKey;
use crate::options::Options;
use crate::table_block;
use crate::table_builder::{self, Footer};
use crate::types::{current_key_val, LdbIterator};

use std::cmp::Ordering;
use std::rc::Rc;

use integer_encoding::FixedIntWriter;

/// Reads the table footer.
fn read_footer(f: &dyn RandomAccess, size: usize) -> Result<Footer> {
    let mut buf = vec![0; table_builder::FULL_FOOTER_LENGTH];
    f.read_at(size - table_builder::FULL_FOOTER_LENGTH, &mut buf)?;
    match Footer::decode(&buf) {
        Some(ok) => Ok(ok),
        None => err(
            error::StatusCode::Corruption,
            &format!("Couldn't decode damaged footer {:?}", &buf),
        ),
    }
}

#[derive(Clone)]
pub struct Table {
    file: Rc<Box<dyn RandomAccess>>,
    file_size: usize,
    cache_id: cache::CacheID,

    opt: Options,

    footer: Footer,
    indexblock: Block,
    filters: Option<FilterBlockReader>,
}

impl Table {
    /// Creates a new table reader operating on unformatted keys (i.e., UserKey).
    fn new_raw(opt: Options, file: Rc<Box<dyn RandomAccess>>, size: usize) -> Result<Table> {
        let footer = read_footer(file.as_ref().as_ref(), size)?;
        let indexblock =
            table_block::read_table_block(opt.clone(), file.as_ref().as_ref(), &footer.index)?;
        let metaindexblock =
            table_block::read_table_block(opt.clone(), file.as_ref().as_ref(), &footer.meta_index)?;

        let filter_block_reader =
            Table::read_filter_block(&metaindexblock, file.as_ref().as_ref(), &opt)?;
        let cache_id = opt.block_cache.borrow_mut().new_cache_id();

        Ok(Table {
            file,
            file_size: size,
            cache_id,
            opt,
            footer,
            filters: filter_block_reader,
            indexblock,
        })
    }

    fn read_filter_block(
        metaix: &Block,
        file: &dyn RandomAccess,
        options: &Options,
    ) -> Result<Option<FilterBlockReader>> {
        // Open filter block for reading
        let filter_name = format!("filter.{}", options.filter_policy.name())
            .as_bytes()
            .to_vec();

        let mut metaindexiter = metaix.iter();
        metaindexiter.seek(&filter_name);

        if let Some((_key, val)) = current_key_val(&metaindexiter) {
            let fbl = BlockHandle::decode(&val);
            let filter_block_location = match fbl {
                None => {
                    return err(
                        error::StatusCode::Corruption,
                        &format!("Couldn't decode corrupt blockhandle {:?}", &val),
                    )
                }
                Some(ok) => ok.0,
            };
            if filter_block_location.size() > 0 {
                return Ok(Some(table_block::read_filter_block(
                    file,
                    &filter_block_location,
                    options.filter_policy.clone(),
                )?));
            }
        }
        Ok(None)
    }

    /// Creates a new table reader operating on internal keys (i.e., InternalKey). This means that
    /// a different comparator (internal_key_cmp) and a different filter policy
    /// (InternalFilterPolicy) are used.
    pub fn new(mut opt: Options, file: Rc<Box<dyn RandomAccess>>, size: usize) -> Result<Table> {
        opt.cmp = Rc::new(Box::new(InternalKeyCmp(opt.cmp.clone())));
        opt.filter_policy = Rc::new(Box::new(filter::InternalFilterPolicy::new(
            opt.filter_policy,
        )));
        Table::new_raw(opt, file, size)
    }

    /// block_cache_handle creates a CacheKey for a block with a given offset to be used in the
    /// block cache.
    fn block_cache_handle(&self, block_off: usize) -> cache::CacheKey {
        let mut dst = [0; 2 * 8];
        (&mut dst[..8])
            .write_fixedint(self.cache_id)
            .expect("error writing to vec");
        (&mut dst[8..])
            .write_fixedint(block_off as u64)
            .expect("error writing to vec");
        dst
    }

    /// Read a block from the current table at `location`, and cache it in the options' block
    /// cache.
    fn read_block(&self, location: &BlockHandle) -> Result<Block> {
        let cachekey = self.block_cache_handle(location.offset());
        if let Some(block) = self.opt.block_cache.borrow_mut().get(&cachekey) {
            return Ok(block.clone());
        }

        // Two times as_ref(): First time to get a ref from Rc<>, then one from Box<>.
        let b =
            table_block::read_table_block(self.opt.clone(), self.file.as_ref().as_ref(), location)?;

        // insert a cheap copy (Rc).
        self.opt
            .block_cache
            .borrow_mut()
            .insert(&cachekey, b.clone());

        Ok(b)
    }

    /// Returns the offset of the block that contains `key`.
    pub fn approx_offset_of(&self, key: &[u8]) -> usize {
        let mut iter = self.indexblock.iter();

        iter.seek(key);

        if let Some((_, val)) = current_key_val(&iter) {
            let location = BlockHandle::decode(&val).unwrap().0;
            return location.offset();
        }

        self.footer.meta_index.offset()
    }

    /// Iterators read from the file; thus only one iterator can be borrowed (mutably) per scope
    pub fn iter(&self) -> TableIterator {
        TableIterator {
            current_block: None,
            current_block_off: 0,
            index_block: self.indexblock.iter(),
            table: self.clone(),
        }
    }

    /// Retrieve next-biggest entry for key from table. This function uses the attached filters, so
    /// is better suited if you frequently look for non-existing values (as it will detect the
    /// non-existence of an entry in a block without having to load the block).
    ///
    /// The caller must check if the returned key, which is the raw key (not e.g. the user portion
    /// of an InternalKey) is acceptable (e.g. correct value type or sequence number), as it may
    /// not be an exact match for key.
    ///
    /// This is done this way because some key types, like internal keys, will not result in an
    /// exact match; it depends on other comparators than the one that the table reader knows
    /// whether a match is acceptable.
    pub fn get<'a>(&self, key: InternalKey<'a>) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
        let mut index_iter = self.indexblock.iter();
        index_iter.seek(key);

        let handle;
        if let Some((last_in_block, h)) = current_key_val(&index_iter) {
            if self.opt.cmp.cmp(key, &last_in_block) == Ordering::Less {
                handle = BlockHandle::decode(&h).unwrap().0;
            } else {
                return Ok(None);
            }
        } else {
            return Ok(None);
        }

        // found correct block.

        // Check bloom (or whatever) filter
        if let Some(ref filters) = self.filters {
            if !filters.key_may_match(handle.offset(), key) {
                return Ok(None);
            }
        }

        // Read block (potentially from cache)
        let tb = self.read_block(&handle)?;
        let mut iter = tb.iter();

        // Go to entry and check if it's the wanted entry.
        iter.seek(key);
        if let Some((k, v)) = current_key_val(&iter) {
            if self.opt.cmp.cmp(&k, key) >= Ordering::Equal {
                return Ok(Some((k, v)));
            }
        }
        Ok(None)
    }
}

/// This iterator is a "TwoLevelIterator"; it uses an index block in order to get an offset hint
/// into the data blocks.
pub struct TableIterator {
    // A TableIterator is independent of its table (on the syntax level -- it doesn't know its
    // Table's lifetime). This is mainly required by the dynamic iterators used everywhere, where a
    // lifetime makes things like returning an iterator from a function neigh-impossible.
    //
    // Instead, reference-counted pointers and locks inside the Table ensure that all
    // TableIterators still share a table.
    table: Table,
    current_block: Option<BlockIter>,
    current_block_off: usize,
    index_block: BlockIter,
}

impl TableIterator {
    // Skips to the entry referenced by the next entry in the index block.
    // This is called once a block has run out of entries.
    // Err means corruption or I/O error; Ok(true) means a new block was loaded; Ok(false) means
    // tht there's no more entries.
    fn skip_to_next_entry(&mut self) -> Result<bool> {
        if let Some((_key, val)) = self.index_block.next() {
            self.load_block(&val).map(|_| true)
        } else {
            Ok(false)
        }
    }

    // Load the block at `handle` into `self.current_block`
    fn load_block(&mut self, handle: &[u8]) -> Result<()> {
        let (new_block_handle, _) = match BlockHandle::decode(handle) {
            None => {
                return err(
                    error::StatusCode::Corruption,
                    "Couldn't decode corrupt block handle",
                )
            }
            Some(ok) => ok,
        };
        let block = self.table.read_block(&new_block_handle)?;

        self.current_block = Some(block.iter());
        self.current_block_off = new_block_handle.offset();

        Ok(())
    }
}

impl LdbIterator for TableIterator {
    fn advance(&mut self) -> bool {
        // Uninitialized case.
        if self.current_block.is_none() {
            match self.skip_to_next_entry() {
                Ok(true) => return self.advance(),
                Ok(false) => {
                    self.reset();
                    return false;
                }
                // try next block from index, this might be corruption
                Err(_) => return self.advance(),
            }
        }

        // Initialized case -- does the current block have more entries?
        if let Some(ref mut cb) = self.current_block {
            if cb.advance() {
                return true;
            }
        }

        // If the current block is exhausted, try loading the next block.
        self.current_block = None;
        match self.skip_to_next_entry() {
            Ok(true) => self.advance(),
            Ok(false) => {
                self.reset();
                false
            }
            // try next block, this might be corruption
            Err(_) => self.advance(),
        }
    }

    // A call to valid() after seeking is necessary to ensure that the seek worked (e.g., no error
    // while reading from disk)
    fn seek(&mut self, to: &[u8]) {
        // first seek in index block, rewind by one entry (so we get the next smaller index entry),
        // then set current_block and seek there
        self.index_block.seek(to);

        // It's possible that this is a seek past-last; reset in that case.
        if let Some((past_block, handle)) = current_key_val(&self.index_block) {
            if self.table.opt.cmp.cmp(to, &past_block) <= Ordering::Equal {
                // ok, found right block: continue
                if let Ok(()) = self.load_block(&handle) {
                    // current_block is always set if load_block() returned Ok.
                    self.current_block.as_mut().unwrap().seek(to);
                    return;
                }
            }
        }
        // Reached in case of failure.
        self.reset();
    }

    fn prev(&mut self) -> bool {
        // happy path: current block contains previous entry
        if let Some(ref mut cb) = self.current_block {
            if cb.prev() {
                return true;
            }
        }

        // Go back one block and look for the last entry in the previous block
        if self.index_block.prev() {
            if let Some((_, handle)) = current_key_val(&self.index_block) {
                if self.load_block(&handle).is_ok() {
                    self.current_block.as_mut().unwrap().seek_to_last();
                    self.current_block.as_ref().unwrap().valid()
                } else {
                    self.reset();
                    false
                }
            } else {
                false
            }
        } else {
            false
        }
    }

    fn reset(&mut self) {
        self.index_block.reset();
        self.current_block = None;
    }

    // This iterator is special in that it's valid even before the first call to advance(). It
    // behaves correctly, though.
    fn valid(&self) -> bool {
        self.current_block.is_some() && (self.current_block.as_ref().unwrap().valid())
    }

    fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool {
        if let Some(ref cb) = self.current_block {
            cb.current(key, val)
        } else {
            false
        }
    }
}

#[cfg(test)]
mod tests {
    use crate::compressor::CompressorId;
    use crate::filter::BloomPolicy;
    use crate::key_types::LookupKey;
    use crate::table_builder::TableBuilder;
    use crate::test_util::{test_iterator_properties, LdbIteratorIter};
    use crate::types::{current_key_val, LdbIterator};
    use crate::{compressor, options};

    use super::*;

    fn build_data() -> Vec<(&'static str, &'static str)> {
        vec![
            // block 1
            ("abc", "def"),
            ("abd", "dee"),
            ("bcd", "asa"),
            // block 2
            ("bsr", "a00"),
            ("xyz", "xxx"),
            ("xzz", "yyy"),
            // block 3
            ("zzz", "111"),
        ]
    }

    // Build a table containing raw keys (no format). It returns (vector, length) for convenience
    // reason, a call f(v, v.len()) doesn't work for borrowing reasons.
    fn build_table(data: Vec<(&'static str, &'static str)>) -> (Vec<u8>, usize) {
        let mut d = Vec::with_capacity(512);
        let mut opt = options::for_test();
        opt.block_restart_interval = 2;
        opt.block_size = 32;
        opt.compressor = compressor::SnappyCompressor::ID;

        {
            // Uses the standard comparator in opt.
            let mut b = TableBuilder::new_raw(opt, &mut d);

            for &(k, v) in data.iter() {
                b.add(k.as_bytes(), v.as_bytes()).unwrap();
            }

            b.finish().unwrap();
        }

        let size = d.len();
        (d, size)
    }

    // Build a table containing keys in InternalKey format.
    fn build_internal_table() -> (Vec<u8>, usize) {
        let mut d = Vec::with_capacity(512);
        let mut opt = options::for_test();
        opt.block_restart_interval = 1;
        opt.block_size = 32;
        opt.filter_policy = Rc::new(Box::new(BloomPolicy::new(4)));

        let mut i = 1 as u64;
        let data: Vec<(Vec<u8>, &'static str)> = build_data()
            .into_iter()
            .map(|(k, v)| {
                i += 1;
                (LookupKey::new(k.as_bytes(), i).internal_key().to_vec(), v)
            })
            .collect();

        {
            // Uses InternalKeyCmp
            let mut b = TableBuilder::new(opt, &mut d);

            for &(ref k, ref v) in data.iter() {
                b.add(k.as_slice(), v.as_bytes()).unwrap();
            }

            b.finish().unwrap();
        }

        let size = d.len();

        (d, size)
    }

    fn wrap_buffer(src: Vec<u8>) -> Rc<Box<dyn RandomAccess>> {
        Rc::new(Box::new(src))
    }

    #[test]
    fn test_table_approximate_offset() {
        let (src, size) = build_table(build_data());
        let mut opt = options::for_test();
        opt.block_size = 32;
        let table = Table::new_raw(opt, wrap_buffer(src), size).unwrap();
        let mut iter = table.iter();

        let expected_offsets = vec![0, 0, 0, 44, 44, 44, 89];
        let mut i = 0;
        for (k, _) in LdbIteratorIter::wrap(&mut iter) {
            assert_eq!(expected_offsets[i], table.approx_offset_of(&k));
            i += 1;
        }

        // Key-past-last returns offset of metaindex block.
        assert_eq!(137, table.approx_offset_of("{aa".as_bytes()));
    }

    #[test]
    fn test_table_block_cache_use() {
        let (src, size) = build_table(build_data());
        let mut opt = options::for_test();
        opt.block_size = 32;

        let table = Table::new_raw(opt.clone(), wrap_buffer(src), size).unwrap();
        let mut iter = table.iter();

        // index/metaindex blocks are not cached. That'd be a waste of memory.
        assert_eq!(opt.block_cache.borrow().count(), 0);
        iter.next();
        assert_eq!(opt.block_cache.borrow().count(), 1);
        // This may fail if block parameters or data change. In that case, adapt it.
        iter.next();
        iter.next();
        iter.next();
        iter.next();
        assert_eq!(opt.block_cache.borrow().count(), 2);
    }

    #[test]
    fn test_table_iterator_fwd_bwd() {
        let (src, size) = build_table(build_data());
        let data = build_data();

        let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap();
        let mut iter = table.iter();
        let mut i = 0;

        while let Some((k, v)) = iter.next() {
            assert_eq!(
                (data[i].0.as_bytes(), data[i].1.as_bytes()),
                (k.as_ref(), v.as_ref())
            );
            i += 1;
        }

        assert_eq!(i, data.len());
        assert!(!iter.valid());

        // Go forward again, to last entry.
        while let Some((key, _)) = iter.next() {
            if key.as_slice() == b"zzz" {
                break;
            }
        }

        assert!(iter.valid());
        // backwards count
        let mut j = 0;

        while iter.prev() {
            if let Some((k, v)) = current_key_val(&iter) {
                j += 1;
                assert_eq!(
                    (
                        data[data.len() - 1 - j].0.as_bytes(),
                        data[data.len() - 1 - j].1.as_bytes()
                    ),
                    (k.as_ref(), v.as_ref())
                );
            } else {
                break;
            }
        }

        // expecting 7 - 1, because the last entry that the iterator stopped on is the last entry
        // in the table; that is, it needs to go back over 6 entries.
        assert_eq!(j, 6);
    }

    #[test]
    fn test_table_iterator_filter() {
        let (src, size) = build_table(build_data());

        let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap();
        assert!(table.filters.is_some());
        let filter_reader = table.filters.clone().unwrap();
        let mut iter = table.iter();

        loop {
            if let Some((k, _)) = iter.next() {
                assert!(filter_reader.key_may_match(iter.current_block_off, &k));
                assert!(!filter_reader.key_may_match(iter.current_block_off, b"somerandomkey"));
            } else {
                break;
            }
        }
    }

    #[test]
    fn test_table_iterator_state_behavior() {
        let (src, size) = build_table(build_data());

        let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap();
        let mut iter = table.iter();

        // behavior test

        // See comment on valid()
        assert!(!iter.valid());
        assert!(current_key_val(&iter).is_none());
        assert!(!iter.prev());

        assert!(iter.advance());
        let first = current_key_val(&iter);
        assert!(iter.valid());
        assert!(current_key_val(&iter).is_some());

        assert!(iter.advance());
        assert!(iter.prev());
        assert!(iter.valid());

        iter.reset();
        assert!(!iter.valid());
        assert!(current_key_val(&iter).is_none());
        assert_eq!(first, iter.next());
    }

    #[test]
    fn test_table_iterator_behavior_standard() {
        let mut data = build_data();
        data.truncate(4);
        let (src, size) = build_table(data);
        let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap();
        test_iterator_properties(table.iter());
    }

    #[test]
    fn test_table_iterator_values() {
        let (src, size) = build_table(build_data());
        let data = build_data();

        let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap();
        let mut iter = table.iter();
        let mut i = 0;

        iter.next();
        iter.next();

        // Go back to previous entry, check, go forward two entries, repeat
        // Verifies that prev/next works well.
        loop {
            iter.prev();

            if let Some((k, v)) = current_key_val(&iter) {
                assert_eq!(
                    (data[i].0.as_bytes(), data[i].1.as_bytes()),
                    (k.as_ref(), v.as_ref())
                );
            } else {
                break;
            }

            i += 1;
            if iter.next().is_none() || iter.next().is_none() {
                break;
            }
        }

        // Skipping the last value because the second next() above will break the loop
        assert_eq!(i, 6);
    }

    #[test]
    fn test_table_iterator_seek() {
        let (src, size) = build_table(build_data());

        let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap();
        let mut iter = table.iter();

        iter.seek(b"bcd");
        assert!(iter.valid());
        assert_eq!(
            current_key_val(&iter),
            Some((b"bcd".to_vec(), b"asa".to_vec()))
        );
        iter.seek(b"abc");
        assert!(iter.valid());
        assert_eq!(
            current_key_val(&iter),
            Some((b"abc".to_vec(), b"def".to_vec()))
        );

        // Seek-past-last invalidates.
        iter.seek("{{{".as_bytes());
        assert!(!iter.valid());
        iter.seek(b"bbb");
        assert!(iter.valid());
    }

    #[test]
    fn test_table_get() {
        let (src, size) = build_table(build_data());

        let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap();
        let table2 = table.clone();

        let mut _iter = table.iter();
        // Test that all of the table's entries are reachable via get()
        for (k, v) in LdbIteratorIter::wrap(&mut _iter) {
            let r = table2.get(&k);
            assert_eq!(Ok(Some((k, v))), r);
        }

        assert_eq!(table.opt.block_cache.borrow().count(), 3);

        // test that filters work and don't return anything at all.
        assert!(table.get(b"aaa").unwrap().is_none());
        assert!(table.get(b"aaaa").unwrap().is_none());
        assert!(table.get(b"aa").unwrap().is_none());
        assert!(table.get(b"abcd").unwrap().is_none());
        assert!(table.get(b"abb").unwrap().is_none());
        assert!(table.get(b"zzy").unwrap().is_none());
        assert!(table.get(b"zz1").unwrap().is_none());
        assert!(table.get("zz{".as_bytes()).unwrap().is_none());
    }

    // This test verifies that the table and filters work with internal keys. This means:
    // The table contains keys in InternalKey format and it uses a filter wrapped by
    // InternalFilterPolicy.
    // All the other tests use raw keys that don't have any internal structure; this is fine in
    // general, but here we want to see that the other infrastructure works too.
    #[test]
    fn test_table_internal_keys() {
        use crate::key_types::LookupKey;

        let (src, size) = build_internal_table();

        let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap();
        let filter_reader = table.filters.clone().unwrap();

        // Check that we're actually using internal keys
        let mut _iter = table.iter();
        for (ref k, ref v) in LdbIteratorIter::wrap(&mut _iter) {
            assert_eq!(k.len(), 3 + 8);
            assert_eq!((k.to_vec(), v.to_vec()), table.get(k).unwrap().unwrap());
        }

        assert!(table
            .get(LookupKey::new(b"abc", 1000).internal_key())
            .unwrap()
            .is_some());

        let mut iter = table.iter();

        loop {
            if let Some((k, _)) = iter.next() {
                let lk = LookupKey::new(&k, 123);
                let userkey = lk.user_key();

                assert!(filter_reader.key_may_match(iter.current_block_off, userkey));
                assert!(!filter_reader.key_may_match(iter.current_block_off, b"somerandomkey"));
            } else {
                break;
            }
        }
    }

    #[test]
    fn test_table_reader_checksum() {
        let (mut src, size) = build_table(build_data());

        src[10] += 1;

        let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap();

        assert!(table.filters.is_some());
        assert_eq!(table.filters.as_ref().unwrap().num(), 1);

        {
            let mut _iter = table.iter();
            let iter = LdbIteratorIter::wrap(&mut _iter);
            // first block is skipped
            assert_eq!(iter.count(), 4);
        }

        {
            let mut _iter = table.iter();
            let iter = LdbIteratorIter::wrap(&mut _iter);

            for (k, _) in iter {
                if k == build_data()[5].0.as_bytes() {
                    return;
                }
            }

            panic!("Should have hit 5th record in table!");
        }
    }
}