Mercurial > lbo > hg > leveldb-rs
changeset 149:11f887625190
Implement cache for TableBlocks.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 29 Jan 2017 15:26:56 +0100 |
parents | 08f5579b8598 |
children | 01e070c97ee7 |
files | src/block.rs src/cache.rs src/options.rs src/table_reader.rs |
diffstat | 4 files changed, 88 insertions(+), 51 deletions(-) [+] |
line wrap: on
line diff
--- a/src/block.rs Sun Jan 29 14:28:51 2017 +0100 +++ b/src/block.rs Sun Jan 29 15:26:56 2017 +0100 @@ -26,6 +26,7 @@ /// A RESTART is a fixed u32 pointing to the beginning of an ENTRY. /// /// N_RESTARTS contains the number of restarts. +#[derive(Clone)] pub struct Block { block: Rc<BlockContents>, opt: Options,
--- a/src/cache.rs Sun Jan 29 14:28:51 2017 +0100 +++ b/src/cache.rs Sun Jan 29 15:26:56 2017 +0100 @@ -1,8 +1,6 @@ use std::collections::HashMap; use std::mem::{swap, replace}; -use integer_encoding::FixedIntWriter; - // No clone, no copy! That asserts that an LRUHandle exists only once. type LRUHandle<T> = *mut LRUNode<T>; @@ -16,6 +14,7 @@ head: LRUNode<T>, count: usize, } + /// This is likely unstable; more investigation is needed into correct behavior! impl<T> LRUList<T> { fn new() -> LRUList<T> { @@ -152,18 +151,9 @@ } pub type CacheKey = Vec<u8>; -pub struct CacheID(u64); +pub type CacheID = u64; type CacheEntry<T> = (T, LRUHandle<CacheKey>); -impl CacheID { - // Serialize a Cache ID to a byte string. - pub fn serialize(&self) -> Vec<u8> { - let mut v = vec![0; 8]; - let _ = v.write_fixedint(self.0); - v - } -} - /// Implementation of `ShardedLRUCache`. /// Based on a HashMap; the elements are linked in order to support the LRU ordering. pub struct Cache<T> { @@ -187,9 +177,10 @@ } } + /// Returns a per-cache ID that can be used to partition the cache among several users. pub fn new_cache_id(&mut self) -> CacheID { self.id += 1; - return CacheID(self.id); + return self.id; } /// How many the cache currently contains
--- a/src/options.rs Sun Jan 29 14:28:51 2017 +0100 +++ b/src/options.rs Sun Jan 29 15:26:56 2017 +0100 @@ -1,4 +1,4 @@ -use block::Block; +use table_reader::TableBlock; use cache::Cache; use cmp::{Cmp, DefaultCmp}; use types::SequenceNumber; @@ -38,7 +38,7 @@ // pub logger: Logger, pub write_buffer_size: usize, pub max_open_files: usize, - pub block_cache: Arc<Mutex<Cache<Block>>>, + pub block_cache: Arc<Mutex<Cache<TableBlock>>>, pub block_size: usize, pub block_restart_interval: usize, pub compression_type: CompressionType,
--- a/src/table_reader.rs Sun Jan 29 14:28:51 2017 +0100 +++ b/src/table_reader.rs Sun Jan 29 15:26:56 2017 +0100 @@ -1,6 +1,6 @@ use block::{Block, BlockIter}; use blockhandle::BlockHandle; -use cache::CacheID; +use cache; use cmp::InternalKeyCmp; use error::{Status, StatusCode, Result}; use filter::{BoxedFilterPolicy, InternalFilterPolicy}; @@ -14,7 +14,7 @@ use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; -use integer_encoding::FixedInt; +use integer_encoding::{FixedInt, FixedIntWriter}; use crc::crc32::{self, Hasher32}; /// Reads the table footer. @@ -36,7 +36,8 @@ Ok(buf) } -struct TableBlock { +#[derive(Clone)] +pub struct TableBlock { block: Block, checksum: u32, compression: CompressionType, @@ -79,7 +80,7 @@ pub struct Table<R: Read + Seek> { file: R, file_size: usize, - cache_id: CacheID, + cache_id: cache::CacheID, opt: Options, @@ -118,9 +119,7 @@ filter_block_reader = Some(FilterBlockReader::new_owned(fp, buf)); } } - metaindexiter.reset(); - let cache_id = opt.block_cache.lock().unwrap().new_cache_id(); Ok(Table { @@ -143,14 +142,34 @@ Ok(t) } + fn block_cache_handle(&self, block_off: usize) -> cache::CacheKey { + let mut dst = Vec::with_capacity(2 * 8); + dst.write_fixedint(self.cache_id).expect("error writing to vec"); + dst.write_fixedint(block_off as u64).expect("error writing to vec"); + dst + } + + /// Read a block from the current table at `location`, and cache it in the options' block + /// cache. fn read_block(&mut self, location: &BlockHandle) -> Result<TableBlock> { + let cachekey = self.block_cache_handle(location.offset()); + if let Ok(ref mut block_cache) = self.opt.block_cache.lock() { + if let Some(block) = block_cache.get(&cachekey) { + return Ok(block.clone()); + } + } + let b = try!(TableBlock::read_block(self.opt.clone(), &mut self.file, location)); if !b.verify() { - Err(Status::new(StatusCode::InvalidData, "Data block failed verification")) - } else { - Ok(b) + return Err(Status::new(StatusCode::InvalidData, "Data block failed verification")); } + if let Ok(ref mut block_cache) = self.opt.block_cache.lock() { + // inserting a cheap copy (Rc) + block_cache.insert(&cachekey, b.clone()); + } + + Ok(b) } /// Returns the offset of the block that contains `key`. @@ -194,7 +213,7 @@ None } - // Future impl: + // Future impl: TODO // // let mut index_block = self.indexblock.iter(); // @@ -445,38 +464,28 @@ } #[test] - fn test_table_reader_checksum() { - let (mut src, size) = build_table(); - println!("{}", size); + fn test_table_cache_use() { + let (src, size) = build_table(); + let mut opt = Options::default(); + opt.block_size = 32; - src[10] += 1; - - let mut table = Table::new_raw(Options::default(), + let mut table = Table::new_raw(opt.clone(), Cursor::new(&src as &[u8]), size, BloomPolicy::new(4)) .unwrap(); - - assert!(table.filters.is_some()); - assert_eq!(table.filters.as_ref().unwrap().num(), 1); - - { - let iter = table.iter(); - // first block is skipped - assert_eq!(iter.count(), 4); - } + let mut iter = table.iter(); - { - let iter = table.iter(); - - for (k, _) in iter { - if k == build_data()[5].0.as_bytes() { - return; - } - } - - panic!("Should have hit 5th record in table!"); - } + // index/metaindex blocks are not cached. That'd be a waste of memory. + assert_eq!(opt.block_cache.lock().unwrap().count(), 0); + iter.next(); + assert_eq!(opt.block_cache.lock().unwrap().count(), 1); + // This may fail if block parameters or data change. In that case, adapt it. + iter.next(); + iter.next(); + iter.next(); + iter.next(); + assert_eq!(opt.block_cache.lock().unwrap().count(), 2); } #[test] @@ -687,4 +696,40 @@ } } } + + #[test] + fn test_table_reader_checksum() { + let (mut src, size) = build_table(); + println!("{}", size); + + src[10] += 1; + + let mut table = Table::new_raw(Options::default(), + Cursor::new(&src as &[u8]), + size, + BloomPolicy::new(4)) + .unwrap(); + + assert!(table.filters.is_some()); + assert_eq!(table.filters.as_ref().unwrap().num(), 1); + + { + let iter = table.iter(); + // first block is skipped + assert_eq!(iter.count(), 4); + } + + { + let iter = table.iter(); + + for (k, _) in iter { + if k == build_data()[5].0.as_bytes() { + return; + } + } + + panic!("Should have hit 5th record in table!"); + } + } + }