changeset 149:11f887625190

Implement cache for TableBlocks.
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 29 Jan 2017 15:26:56 +0100
parents 08f5579b8598
children 01e070c97ee7
files src/block.rs src/cache.rs src/options.rs src/table_reader.rs
diffstat 4 files changed, 88 insertions(+), 51 deletions(-) [+]
line wrap: on
line diff
--- a/src/block.rs	Sun Jan 29 14:28:51 2017 +0100
+++ b/src/block.rs	Sun Jan 29 15:26:56 2017 +0100
@@ -26,6 +26,7 @@
 /// A RESTART is a fixed u32 pointing to the beginning of an ENTRY.
 ///
 /// N_RESTARTS contains the number of restarts.
+#[derive(Clone)]
 pub struct Block {
     block: Rc<BlockContents>,
     opt: Options,
--- a/src/cache.rs	Sun Jan 29 14:28:51 2017 +0100
+++ b/src/cache.rs	Sun Jan 29 15:26:56 2017 +0100
@@ -1,8 +1,6 @@
 use std::collections::HashMap;
 use std::mem::{swap, replace};
 
-use integer_encoding::FixedIntWriter;
-
 // No clone, no copy! That asserts that an LRUHandle exists only once.
 type LRUHandle<T> = *mut LRUNode<T>;
 
@@ -16,6 +14,7 @@
     head: LRUNode<T>,
     count: usize,
 }
+
 /// This is likely unstable; more investigation is needed into correct behavior!
 impl<T> LRUList<T> {
     fn new() -> LRUList<T> {
@@ -152,18 +151,9 @@
 }
 
 pub type CacheKey = Vec<u8>;
-pub struct CacheID(u64);
+pub type CacheID = u64;
 type CacheEntry<T> = (T, LRUHandle<CacheKey>);
 
-impl CacheID {
-    // Serialize a Cache ID to a byte string.
-    pub fn serialize(&self) -> Vec<u8> {
-        let mut v = vec![0; 8];
-        let _ = v.write_fixedint(self.0);
-        v
-    }
-}
-
 /// Implementation of `ShardedLRUCache`.
 /// Based on a HashMap; the elements are linked in order to support the LRU ordering.
 pub struct Cache<T> {
@@ -187,9 +177,10 @@
         }
     }
 
+    /// Returns a per-cache ID that can be used to partition the cache among several users.
     pub fn new_cache_id(&mut self) -> CacheID {
         self.id += 1;
-        return CacheID(self.id);
+        return self.id;
     }
 
     /// How many the cache currently contains
--- a/src/options.rs	Sun Jan 29 14:28:51 2017 +0100
+++ b/src/options.rs	Sun Jan 29 15:26:56 2017 +0100
@@ -1,4 +1,4 @@
-use block::Block;
+use table_reader::TableBlock;
 use cache::Cache;
 use cmp::{Cmp, DefaultCmp};
 use types::SequenceNumber;
@@ -38,7 +38,7 @@
     // pub logger: Logger,
     pub write_buffer_size: usize,
     pub max_open_files: usize,
-    pub block_cache: Arc<Mutex<Cache<Block>>>,
+    pub block_cache: Arc<Mutex<Cache<TableBlock>>>,
     pub block_size: usize,
     pub block_restart_interval: usize,
     pub compression_type: CompressionType,
--- a/src/table_reader.rs	Sun Jan 29 14:28:51 2017 +0100
+++ b/src/table_reader.rs	Sun Jan 29 15:26:56 2017 +0100
@@ -1,6 +1,6 @@
 use block::{Block, BlockIter};
 use blockhandle::BlockHandle;
-use cache::CacheID;
+use cache;
 use cmp::InternalKeyCmp;
 use error::{Status, StatusCode, Result};
 use filter::{BoxedFilterPolicy, InternalFilterPolicy};
@@ -14,7 +14,7 @@
 use std::io::{Read, Seek, SeekFrom};
 use std::sync::Arc;
 
-use integer_encoding::FixedInt;
+use integer_encoding::{FixedInt, FixedIntWriter};
 use crc::crc32::{self, Hasher32};
 
 /// Reads the table footer.
@@ -36,7 +36,8 @@
     Ok(buf)
 }
 
-struct TableBlock {
+#[derive(Clone)]
+pub struct TableBlock {
     block: Block,
     checksum: u32,
     compression: CompressionType,
@@ -79,7 +80,7 @@
 pub struct Table<R: Read + Seek> {
     file: R,
     file_size: usize,
-    cache_id: CacheID,
+    cache_id: cache::CacheID,
 
     opt: Options,
 
@@ -118,9 +119,7 @@
                 filter_block_reader = Some(FilterBlockReader::new_owned(fp, buf));
             }
         }
-
         metaindexiter.reset();
-
         let cache_id = opt.block_cache.lock().unwrap().new_cache_id();
 
         Ok(Table {
@@ -143,14 +142,34 @@
         Ok(t)
     }
 
+    fn block_cache_handle(&self, block_off: usize) -> cache::CacheKey {
+        let mut dst = Vec::with_capacity(2 * 8);
+        dst.write_fixedint(self.cache_id).expect("error writing to vec");
+        dst.write_fixedint(block_off as u64).expect("error writing to vec");
+        dst
+    }
+
+    /// Read a block from the current table at `location`, and cache it in the options' block
+    /// cache.
     fn read_block(&mut self, location: &BlockHandle) -> Result<TableBlock> {
+        let cachekey = self.block_cache_handle(location.offset());
+        if let Ok(ref mut block_cache) = self.opt.block_cache.lock() {
+            if let Some(block) = block_cache.get(&cachekey) {
+                return Ok(block.clone());
+            }
+        }
+
         let b = try!(TableBlock::read_block(self.opt.clone(), &mut self.file, location));
 
         if !b.verify() {
-            Err(Status::new(StatusCode::InvalidData, "Data block failed verification"))
-        } else {
-            Ok(b)
+            return Err(Status::new(StatusCode::InvalidData, "Data block failed verification"));
         }
+        if let Ok(ref mut block_cache) = self.opt.block_cache.lock() {
+            // inserting a cheap copy (Rc)
+            block_cache.insert(&cachekey, b.clone());
+        }
+
+        Ok(b)
     }
 
     /// Returns the offset of the block that contains `key`.
@@ -194,7 +213,7 @@
             None
         }
 
-        // Future impl:
+        // Future impl: TODO
         //
         // let mut index_block = self.indexblock.iter();
         //
@@ -445,38 +464,28 @@
     }
 
     #[test]
-    fn test_table_reader_checksum() {
-        let (mut src, size) = build_table();
-        println!("{}", size);
+    fn test_table_cache_use() {
+        let (src, size) = build_table();
+        let mut opt = Options::default();
+        opt.block_size = 32;
 
-        src[10] += 1;
-
-        let mut table = Table::new_raw(Options::default(),
+        let mut table = Table::new_raw(opt.clone(),
                                        Cursor::new(&src as &[u8]),
                                        size,
                                        BloomPolicy::new(4))
             .unwrap();
-
-        assert!(table.filters.is_some());
-        assert_eq!(table.filters.as_ref().unwrap().num(), 1);
-
-        {
-            let iter = table.iter();
-            // first block is skipped
-            assert_eq!(iter.count(), 4);
-        }
+        let mut iter = table.iter();
 
-        {
-            let iter = table.iter();
-
-            for (k, _) in iter {
-                if k == build_data()[5].0.as_bytes() {
-                    return;
-                }
-            }
-
-            panic!("Should have hit 5th record in table!");
-        }
+        // index/metaindex blocks are not cached. That'd be a waste of memory.
+        assert_eq!(opt.block_cache.lock().unwrap().count(), 0);
+        iter.next();
+        assert_eq!(opt.block_cache.lock().unwrap().count(), 1);
+        // This may fail if block parameters or data change. In that case, adapt it.
+        iter.next();
+        iter.next();
+        iter.next();
+        iter.next();
+        assert_eq!(opt.block_cache.lock().unwrap().count(), 2);
     }
 
     #[test]
@@ -687,4 +696,40 @@
             }
         }
     }
+
+    #[test]
+    fn test_table_reader_checksum() {
+        let (mut src, size) = build_table();
+        println!("{}", size);
+
+        src[10] += 1;
+
+        let mut table = Table::new_raw(Options::default(),
+                                       Cursor::new(&src as &[u8]),
+                                       size,
+                                       BloomPolicy::new(4))
+            .unwrap();
+
+        assert!(table.filters.is_some());
+        assert_eq!(table.filters.as_ref().unwrap().num(), 1);
+
+        {
+            let iter = table.iter();
+            // first block is skipped
+            assert_eq!(iter.count(), 4);
+        }
+
+        {
+            let iter = table.iter();
+
+            for (k, _) in iter {
+                if k == build_data()[5].0.as_bytes() {
+                    return;
+                }
+            }
+
+            panic!("Should have hit 5th record in table!");
+        }
+    }
+
 }