changeset 156:94d50060a94a

Introduce RandomAccessFile as abstraction for Table reader. And TableCache, later.
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 09 Jul 2017 17:06:20 +0200
parents 6e84fedbe441
children de83256f4423
files src/disk_env.rs src/env.rs src/error.rs src/table_reader.rs
diffstat 4 files changed, 73 insertions(+), 42 deletions(-) [+]
line wrap: on
line diff
--- a/src/disk_env.rs	Sun Jul 09 10:53:26 2017 +0200
+++ b/src/disk_env.rs	Sun Jul 09 17:06:20 2017 +0200
@@ -1,5 +1,4 @@
-use env::Env;
-use env::Logger;
+use env::{Env, Logger, RandomAccessFile};
 use error::{from_io_result, Status, StatusCode, Result};
 
 use std::collections::HashSet;
@@ -42,8 +41,8 @@
     fn open_sequential_file(&self, p: &Path) -> Result<Self::SequentialReader> {
         from_io_result(fs::OpenOptions::new().read(true).open(p))
     }
-    fn open_random_access_file(&self, p: &Path) -> Result<Self::RandomReader> {
-        from_io_result(fs::OpenOptions::new().read(true).open(p))
+    fn open_random_access_file(&self, p: &Path) -> Result<RandomAccessFile<Self::RandomReader>> {
+        from_io_result(fs::OpenOptions::new().read(true).open(p)).map(|f| RandomAccessFile::new(f))
     }
     fn open_writable_file(&self, p: &Path) -> Result<Self::Writer> {
         from_io_result(fs::OpenOptions::new().create(true).write(true).append(false).open(p))
--- a/src/env.rs	Sun Jul 09 10:53:26 2017 +0200
+++ b/src/env.rs	Sun Jul 09 17:06:20 2017 +0200
@@ -1,10 +1,32 @@
 //! An `env` is an abstraction layer that allows the database to run both on different platforms as
 //! well as persisting data on disk or in memory.
 
-use error::Result;
+use error::{self, Result};
+
+use std::io::{self, Read, Write, Seek};
+use std::path::Path;
+use std::sync::{Arc, Mutex};
+
+/// RandomAccessFile wraps a type implementing read and seek to enable atomic random reads
+#[derive(Clone)]
+pub struct RandomAccessFile<F: Read + Seek> {
+    f: Arc<Mutex<F>>,
+}
 
-use std::io::{Read, Write, Seek};
-use std::path::Path;
+impl<F: Read + Seek> RandomAccessFile<F> {
+    pub fn new(f: F) -> RandomAccessFile<F> {
+        RandomAccessFile { f: Arc::new(Mutex::new(f)) }
+    }
+
+    pub fn read_at(&self, off: usize, len: usize) -> Result<Vec<u8>> {
+        let mut f = try!(error::from_lock_result(self.f.lock()));
+        try!(error::from_io_result(f.seek(io::SeekFrom::Start(off as u64))));
+
+        let mut buf = Vec::new();
+        buf.resize(len, 0);
+        error::from_io_result(f.read_exact(&mut buf)).map(|_| buf)
+    }
+}
 
 pub trait Env {
     type SequentialReader: Read;
@@ -13,7 +35,7 @@
     type FileLock;
 
     fn open_sequential_file(&self, &Path) -> Result<Self::SequentialReader>;
-    fn open_random_access_file(&self, &Path) -> Result<Self::RandomReader>;
+    fn open_random_access_file(&self, &Path) -> Result<RandomAccessFile<Self::RandomReader>>;
     fn open_writable_file(&self, &Path) -> Result<Self::Writer>;
     fn open_appendable_file(&self, &Path) -> Result<Self::Writer>;
 
--- a/src/error.rs	Sun Jul 09 10:53:26 2017 +0200
+++ b/src/error.rs	Sun Jul 09 17:06:20 2017 +0200
@@ -14,6 +14,7 @@
     IOError,
     InvalidArgument,
     InvalidData,
+    LockError,
     NotFound,
     NotSupported,
     PermissionDenied,
@@ -85,3 +86,12 @@
         Err(e) => Err(Status::from(e)),
     }
 }
+
+use std::sync;
+
+pub fn from_lock_result<T>(e: sync::LockResult<T>) -> Result<T> {
+    match e {
+        Ok(r) => result::Result::Ok(r),
+        Err(_) => result::Result::Err(Status::new(StatusCode::LockError, "lock is poisoned")),
+    }
+}
--- a/src/table_reader.rs	Sun Jul 09 10:53:26 2017 +0200
+++ b/src/table_reader.rs	Sun Jul 09 17:06:20 2017 +0200
@@ -2,6 +2,7 @@
 use blockhandle::BlockHandle;
 use cache;
 use cmp::InternalKeyCmp;
+use env::RandomAccessFile;
 use error::{Status, StatusCode, Result};
 use filter::{BoxedFilterPolicy, InternalFilterPolicy};
 use filter_block::FilterBlockReader;
@@ -11,29 +12,21 @@
 use types::LdbIterator;
 
 use std::cmp::Ordering;
-use std::io::{Read, Seek, SeekFrom};
+use std::io::{Read, Seek};
 use std::sync::Arc;
 
 use integer_encoding::{FixedInt, FixedIntWriter};
 use crc::crc32::{self, Hasher32};
 
 /// Reads the table footer.
-fn read_footer<R: Read + Seek>(f: &mut R, size: usize) -> Result<Footer> {
-    try!(f.seek(SeekFrom::Start((size - table_builder::FULL_FOOTER_LENGTH) as u64)));
-    let mut buf = [0; table_builder::FULL_FOOTER_LENGTH];
-    try!(f.read_exact(&mut buf));
+fn read_footer<F: Read + Seek>(f: &RandomAccessFile<F>, size: usize) -> Result<Footer> {
+    let buf = try!(f.read_at(size - table_builder::FULL_FOOTER_LENGTH,
+                             table_builder::FULL_FOOTER_LENGTH));
     Ok(Footer::decode(&buf))
 }
 
-fn read_bytes<R: Read + Seek>(f: &mut R, location: &BlockHandle) -> Result<Vec<u8>> {
-    try!(f.seek(SeekFrom::Start(location.offset() as u64)));
-
-    let mut buf = Vec::new();
-    buf.resize(location.size(), 0);
-
-    try!(f.read_exact(&mut buf[0..location.size()]));
-
-    Ok(buf)
+fn read_bytes<F: Read + Seek>(f: &RandomAccessFile<F>, location: &BlockHandle) -> Result<Vec<u8>> {
+    f.read_at(location.offset(), location.size())
 }
 
 #[derive(Clone)]
@@ -46,7 +39,7 @@
 impl TableBlock {
     /// Reads a block at location.
     fn read_block<R: Read + Seek>(opt: Options,
-                                  f: &mut R,
+                                  f: &RandomAccessFile<R>,
                                   location: &BlockHandle)
                                   -> Result<TableBlock> {
         // The block is denoted by offset and length in BlockHandle. A block in an encoded
@@ -79,7 +72,7 @@
 
 #[derive(Clone)]
 pub struct Table<R: Read + Seek> {
-    file: R,
+    file: RandomAccessFile<R>,
     file_size: usize,
     cache_id: cache::CacheID,
 
@@ -92,12 +85,15 @@
 
 impl<R: Read + Seek> Table<R> {
     /// Creates a new table reader operating on unformatted keys (i.e., UserKey).
-    fn new_raw(opt: Options, mut file: R, size: usize, fp: BoxedFilterPolicy) -> Result<Table<R>> {
-        let footer = try!(read_footer(&mut file, size));
+    fn new_raw(opt: Options,
+               file: RandomAccessFile<R>,
+               size: usize,
+               fp: BoxedFilterPolicy)
+               -> Result<Table<R>> {
+        let footer = try!(read_footer(&file, size));
 
-        let indexblock = try!(TableBlock::read_block(opt.clone(), &mut file, &footer.index));
-        let metaindexblock =
-            try!(TableBlock::read_block(opt.clone(), &mut file, &footer.meta_index));
+        let indexblock = try!(TableBlock::read_block(opt.clone(), &file, &footer.index));
+        let metaindexblock = try!(TableBlock::read_block(opt.clone(), &file, &footer.meta_index));
 
         if !indexblock.verify() || !metaindexblock.verify() {
             return Err(Status::new(StatusCode::InvalidData,
@@ -116,7 +112,7 @@
             let filter_block_location = BlockHandle::decode(&val).0;
 
             if filter_block_location.size() > 0 {
-                let buf = try!(read_bytes(&mut file, &filter_block_location));
+                let buf = try!(read_bytes(&file, &filter_block_location));
                 filter_block_reader = Some(FilterBlockReader::new_owned(fp, buf));
             }
         }
@@ -139,7 +135,10 @@
     /// (InternalFilterPolicy) are used.
     pub fn new(mut opt: Options, file: R, size: usize, fp: BoxedFilterPolicy) -> Result<Table<R>> {
         opt.cmp = Arc::new(Box::new(InternalKeyCmp(opt.cmp.clone())));
-        let t = try!(Table::new_raw(opt, file, size, InternalFilterPolicy::new(fp)));
+        let t = try!(Table::new_raw(opt,
+                                    RandomAccessFile::new(file),
+                                    size,
+                                    InternalFilterPolicy::new(fp)));
         Ok(t)
     }
 
@@ -474,16 +473,17 @@
         (d, size)
     }
 
+    fn wrap_buffer<'a>(src: &'a [u8]) -> RandomAccessFile<Cursor<&'a [u8]>> {
+        RandomAccessFile::new(Cursor::new(src))
+    }
+
     #[test]
     fn test_table_cache_use() {
         let (src, size) = build_table();
         let mut opt = Options::default();
         opt.block_size = 32;
 
-        let mut table = Table::new_raw(opt.clone(),
-                                       Cursor::new(&src as &[u8]),
-                                       size,
-                                       BloomPolicy::new(4))
+        let mut table = Table::new_raw(opt.clone(), wrap_buffer(&src), size, BloomPolicy::new(4))
             .unwrap();
         let mut iter = table.iter();
 
@@ -505,7 +505,7 @@
         let data = build_data();
 
         let mut table = Table::new_raw(Options::default(),
-                                       Cursor::new(&src as &[u8]),
+                                       wrap_buffer(&src),
                                        size,
                                        BloomPolicy::new(4))
             .unwrap();
@@ -541,7 +541,7 @@
         let (src, size) = build_table();
 
         let mut table = Table::new_raw(Options::default(),
-                                       Cursor::new(&src as &[u8]),
+                                       wrap_buffer(&src),
                                        size,
                                        BloomPolicy::new(4))
             .unwrap();
@@ -565,7 +565,7 @@
         let (src, size) = build_table();
 
         let mut table = Table::new_raw(Options::default(),
-                                       Cursor::new(&src as &[u8]),
+                                       wrap_buffer(&src),
                                        size,
                                        BloomPolicy::new(4))
             .unwrap();
@@ -599,7 +599,7 @@
         let data = build_data();
 
         let mut table = Table::new_raw(Options::default(),
-                                       Cursor::new(&src as &[u8]),
+                                       wrap_buffer(&src),
                                        size,
                                        BloomPolicy::new(4))
             .unwrap();
@@ -636,7 +636,7 @@
         let (src, size) = build_table();
 
         let mut table = Table::new_raw(Options::default(),
-                                       Cursor::new(&src as &[u8]),
+                                       wrap_buffer(&src),
                                        size,
                                        BloomPolicy::new(4))
             .unwrap();
@@ -657,7 +657,7 @@
         let (src, size) = build_table();
 
         let mut table = Table::new_raw(Options::default(),
-                                       Cursor::new(&src as &[u8]),
+                                       wrap_buffer(&src),
                                        size,
                                        BloomPolicy::new(4))
             .unwrap();
@@ -726,7 +726,7 @@
         src[10] += 1;
 
         let mut table = Table::new_raw(Options::default(),
-                                       Cursor::new(&src as &[u8]),
+                                       wrap_buffer(&src),
                                        size,
                                        BloomPolicy::new(4))
             .unwrap();