Mercurial > lbo > hg > leveldb-rs
changeset 156:94d50060a94a
Introduce RandomAccessFile as abstraction for Table reader.
And TableCache, later.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 09 Jul 2017 17:06:20 +0200 |
parents | 6e84fedbe441 |
children | de83256f4423 |
files | src/disk_env.rs src/env.rs src/error.rs src/table_reader.rs |
diffstat | 4 files changed, 73 insertions(+), 42 deletions(-) [+] |
line wrap: on
line diff
--- a/src/disk_env.rs Sun Jul 09 10:53:26 2017 +0200 +++ b/src/disk_env.rs Sun Jul 09 17:06:20 2017 +0200 @@ -1,5 +1,4 @@ -use env::Env; -use env::Logger; +use env::{Env, Logger, RandomAccessFile}; use error::{from_io_result, Status, StatusCode, Result}; use std::collections::HashSet; @@ -42,8 +41,8 @@ fn open_sequential_file(&self, p: &Path) -> Result<Self::SequentialReader> { from_io_result(fs::OpenOptions::new().read(true).open(p)) } - fn open_random_access_file(&self, p: &Path) -> Result<Self::RandomReader> { - from_io_result(fs::OpenOptions::new().read(true).open(p)) + fn open_random_access_file(&self, p: &Path) -> Result<RandomAccessFile<Self::RandomReader>> { + from_io_result(fs::OpenOptions::new().read(true).open(p)).map(|f| RandomAccessFile::new(f)) } fn open_writable_file(&self, p: &Path) -> Result<Self::Writer> { from_io_result(fs::OpenOptions::new().create(true).write(true).append(false).open(p))
--- a/src/env.rs Sun Jul 09 10:53:26 2017 +0200 +++ b/src/env.rs Sun Jul 09 17:06:20 2017 +0200 @@ -1,10 +1,32 @@ //! An `env` is an abstraction layer that allows the database to run both on different platforms as //! well as persisting data on disk or in memory. -use error::Result; +use error::{self, Result}; + +use std::io::{self, Read, Write, Seek}; +use std::path::Path; +use std::sync::{Arc, Mutex}; + +/// RandomAccessFile wraps a type implementing read and seek to enable atomic random reads +#[derive(Clone)] +pub struct RandomAccessFile<F: Read + Seek> { + f: Arc<Mutex<F>>, +} -use std::io::{Read, Write, Seek}; -use std::path::Path; +impl<F: Read + Seek> RandomAccessFile<F> { + pub fn new(f: F) -> RandomAccessFile<F> { + RandomAccessFile { f: Arc::new(Mutex::new(f)) } + } + + pub fn read_at(&self, off: usize, len: usize) -> Result<Vec<u8>> { + let mut f = try!(error::from_lock_result(self.f.lock())); + try!(error::from_io_result(f.seek(io::SeekFrom::Start(off as u64)))); + + let mut buf = Vec::new(); + buf.resize(len, 0); + error::from_io_result(f.read_exact(&mut buf)).map(|_| buf) + } +} pub trait Env { type SequentialReader: Read; @@ -13,7 +35,7 @@ type FileLock; fn open_sequential_file(&self, &Path) -> Result<Self::SequentialReader>; - fn open_random_access_file(&self, &Path) -> Result<Self::RandomReader>; + fn open_random_access_file(&self, &Path) -> Result<RandomAccessFile<Self::RandomReader>>; fn open_writable_file(&self, &Path) -> Result<Self::Writer>; fn open_appendable_file(&self, &Path) -> Result<Self::Writer>;
--- a/src/error.rs Sun Jul 09 10:53:26 2017 +0200 +++ b/src/error.rs Sun Jul 09 17:06:20 2017 +0200 @@ -14,6 +14,7 @@ IOError, InvalidArgument, InvalidData, + LockError, NotFound, NotSupported, PermissionDenied, @@ -85,3 +86,12 @@ Err(e) => Err(Status::from(e)), } } + +use std::sync; + +pub fn from_lock_result<T>(e: sync::LockResult<T>) -> Result<T> { + match e { + Ok(r) => result::Result::Ok(r), + Err(_) => result::Result::Err(Status::new(StatusCode::LockError, "lock is poisoned")), + } +}
--- a/src/table_reader.rs Sun Jul 09 10:53:26 2017 +0200 +++ b/src/table_reader.rs Sun Jul 09 17:06:20 2017 +0200 @@ -2,6 +2,7 @@ use blockhandle::BlockHandle; use cache; use cmp::InternalKeyCmp; +use env::RandomAccessFile; use error::{Status, StatusCode, Result}; use filter::{BoxedFilterPolicy, InternalFilterPolicy}; use filter_block::FilterBlockReader; @@ -11,29 +12,21 @@ use types::LdbIterator; use std::cmp::Ordering; -use std::io::{Read, Seek, SeekFrom}; +use std::io::{Read, Seek}; use std::sync::Arc; use integer_encoding::{FixedInt, FixedIntWriter}; use crc::crc32::{self, Hasher32}; /// Reads the table footer. -fn read_footer<R: Read + Seek>(f: &mut R, size: usize) -> Result<Footer> { - try!(f.seek(SeekFrom::Start((size - table_builder::FULL_FOOTER_LENGTH) as u64))); - let mut buf = [0; table_builder::FULL_FOOTER_LENGTH]; - try!(f.read_exact(&mut buf)); +fn read_footer<F: Read + Seek>(f: &RandomAccessFile<F>, size: usize) -> Result<Footer> { + let buf = try!(f.read_at(size - table_builder::FULL_FOOTER_LENGTH, + table_builder::FULL_FOOTER_LENGTH)); Ok(Footer::decode(&buf)) } -fn read_bytes<R: Read + Seek>(f: &mut R, location: &BlockHandle) -> Result<Vec<u8>> { - try!(f.seek(SeekFrom::Start(location.offset() as u64))); - - let mut buf = Vec::new(); - buf.resize(location.size(), 0); - - try!(f.read_exact(&mut buf[0..location.size()])); - - Ok(buf) +fn read_bytes<F: Read + Seek>(f: &RandomAccessFile<F>, location: &BlockHandle) -> Result<Vec<u8>> { + f.read_at(location.offset(), location.size()) } #[derive(Clone)] @@ -46,7 +39,7 @@ impl TableBlock { /// Reads a block at location. fn read_block<R: Read + Seek>(opt: Options, - f: &mut R, + f: &RandomAccessFile<R>, location: &BlockHandle) -> Result<TableBlock> { // The block is denoted by offset and length in BlockHandle. A block in an encoded @@ -79,7 +72,7 @@ #[derive(Clone)] pub struct Table<R: Read + Seek> { - file: R, + file: RandomAccessFile<R>, file_size: usize, cache_id: cache::CacheID, @@ -92,12 +85,15 @@ impl<R: Read + Seek> Table<R> { /// Creates a new table reader operating on unformatted keys (i.e., UserKey). - fn new_raw(opt: Options, mut file: R, size: usize, fp: BoxedFilterPolicy) -> Result<Table<R>> { - let footer = try!(read_footer(&mut file, size)); + fn new_raw(opt: Options, + file: RandomAccessFile<R>, + size: usize, + fp: BoxedFilterPolicy) + -> Result<Table<R>> { + let footer = try!(read_footer(&file, size)); - let indexblock = try!(TableBlock::read_block(opt.clone(), &mut file, &footer.index)); - let metaindexblock = - try!(TableBlock::read_block(opt.clone(), &mut file, &footer.meta_index)); + let indexblock = try!(TableBlock::read_block(opt.clone(), &file, &footer.index)); + let metaindexblock = try!(TableBlock::read_block(opt.clone(), &file, &footer.meta_index)); if !indexblock.verify() || !metaindexblock.verify() { return Err(Status::new(StatusCode::InvalidData, @@ -116,7 +112,7 @@ let filter_block_location = BlockHandle::decode(&val).0; if filter_block_location.size() > 0 { - let buf = try!(read_bytes(&mut file, &filter_block_location)); + let buf = try!(read_bytes(&file, &filter_block_location)); filter_block_reader = Some(FilterBlockReader::new_owned(fp, buf)); } } @@ -139,7 +135,10 @@ /// (InternalFilterPolicy) are used. pub fn new(mut opt: Options, file: R, size: usize, fp: BoxedFilterPolicy) -> Result<Table<R>> { opt.cmp = Arc::new(Box::new(InternalKeyCmp(opt.cmp.clone()))); - let t = try!(Table::new_raw(opt, file, size, InternalFilterPolicy::new(fp))); + let t = try!(Table::new_raw(opt, + RandomAccessFile::new(file), + size, + InternalFilterPolicy::new(fp))); Ok(t) } @@ -474,16 +473,17 @@ (d, size) } + fn wrap_buffer<'a>(src: &'a [u8]) -> RandomAccessFile<Cursor<&'a [u8]>> { + RandomAccessFile::new(Cursor::new(src)) + } + #[test] fn test_table_cache_use() { let (src, size) = build_table(); let mut opt = Options::default(); opt.block_size = 32; - let mut table = Table::new_raw(opt.clone(), - Cursor::new(&src as &[u8]), - size, - BloomPolicy::new(4)) + let mut table = Table::new_raw(opt.clone(), wrap_buffer(&src), size, BloomPolicy::new(4)) .unwrap(); let mut iter = table.iter(); @@ -505,7 +505,7 @@ let data = build_data(); let mut table = Table::new_raw(Options::default(), - Cursor::new(&src as &[u8]), + wrap_buffer(&src), size, BloomPolicy::new(4)) .unwrap(); @@ -541,7 +541,7 @@ let (src, size) = build_table(); let mut table = Table::new_raw(Options::default(), - Cursor::new(&src as &[u8]), + wrap_buffer(&src), size, BloomPolicy::new(4)) .unwrap(); @@ -565,7 +565,7 @@ let (src, size) = build_table(); let mut table = Table::new_raw(Options::default(), - Cursor::new(&src as &[u8]), + wrap_buffer(&src), size, BloomPolicy::new(4)) .unwrap(); @@ -599,7 +599,7 @@ let data = build_data(); let mut table = Table::new_raw(Options::default(), - Cursor::new(&src as &[u8]), + wrap_buffer(&src), size, BloomPolicy::new(4)) .unwrap(); @@ -636,7 +636,7 @@ let (src, size) = build_table(); let mut table = Table::new_raw(Options::default(), - Cursor::new(&src as &[u8]), + wrap_buffer(&src), size, BloomPolicy::new(4)) .unwrap(); @@ -657,7 +657,7 @@ let (src, size) = build_table(); let mut table = Table::new_raw(Options::default(), - Cursor::new(&src as &[u8]), + wrap_buffer(&src), size, BloomPolicy::new(4)) .unwrap(); @@ -726,7 +726,7 @@ src[10] += 1; let mut table = Table::new_raw(Options::default(), - Cursor::new(&src as &[u8]), + wrap_buffer(&src), size, BloomPolicy::new(4)) .unwrap();