Mercurial > lbo > hg > leveldb-rs
changeset 266:fefc1414185e
db_impl: Add initial implementations of some DB methods.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Tue, 19 Sep 2017 20:20:29 +0200 |
parents | 6851fe8d6ed6 |
children | 38e6d3133b46 |
files | src/db_impl.rs src/lib.rs |
diffstat | 2 files changed, 373 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/db_impl.rs Tue Sep 19 20:20:29 2017 +0200 @@ -0,0 +1,371 @@ +//! db_impl contains the implementation of the database interface and high-level compaction and +//! maintenance logic. + +#![allow(unused_attributes)] + +use cmp::InternalKeyCmp; +use env::{Env, FileLock}; +use error::{err, StatusCode, Result}; +use filter::{BoxedFilterPolicy, InternalFilterPolicy}; +use infolog::Logger; +use log::LogWriter; +use key_types::parse_internal_key; +use memtable::MemTable; +use options::Options; +use table_builder::TableBuilder; +use table_cache::{table_file_name, TableCache}; +use types::{share, FileMetaData, FileNum, LdbIterator, NUM_LEVELS, Shared}; +use version_edit::VersionEdit; +use version_set::VersionSet; +use version::Version; + +use std::io::{self, Write}; +use std::mem; +use std::path::Path; + +/// DB contains the actual database implemenation. As opposed to the original, this implementation +/// is not concurrent (yet). +pub struct DB { + name: String, + lock: Option<FileLock>, + + cmp: InternalKeyCmp, + fpol: InternalFilterPolicy<BoxedFilterPolicy>, + opt: Options, + + mem: MemTable, + imm: Option<MemTable>, + + log: Option<LogWriter<Box<Write>>>, + log_num: Option<FileNum>, + cache: Shared<TableCache>, + vset: VersionSet, + + cstats: [CompactionStats; NUM_LEVELS], +} + +impl DB { + fn new(name: &str, mut opt: Options) -> DB { + let cache = share(TableCache::new(&name, opt.clone(), opt.max_open_files - 10)); + let vset = VersionSet::new(&name, opt.clone(), cache.clone()); + + let log = open_info_log(opt.env.as_ref().as_ref(), &name); + opt.log = share(log); + + DB { + name: name.to_string(), + lock: None, + cmp: InternalKeyCmp(opt.cmp.clone()), + fpol: InternalFilterPolicy::new(opt.filter_policy.clone()), + + mem: MemTable::new(opt.cmp.clone()), + imm: None, + + opt: opt, + + log: None, + log_num: None, + cache: cache, + vset: vset, + + cstats: Default::default(), + } + } + + fn add_stats(&mut self, level: usize, cs: CompactionStats) { + assert!(level < NUM_LEVELS); + self.cstats[level].add(cs); + } + + /// make_room_for_write checks if the memtable has become too large, and triggers a compaction + /// if it's the case. + fn make_room_for_write(&mut self) -> Result<()> { + if self.mem.approx_mem_usage() < self.opt.write_buffer_size { + return Ok(()); + } else { + // Create new memtable. + let logn = self.vset.new_file_number(); + let logf = self.opt.env.open_writable_file(Path::new(&log_file_name(&self.name, logn))); + if logf.is_err() { + self.vset.reuse_file_number(logn); + logf?; + } else { + self.log = Some(LogWriter::new(logf.unwrap())); + self.log_num = Some(logn); + + let mut imm = MemTable::new(self.opt.cmp.clone()); + mem::swap(&mut imm, &mut self.mem); + self.imm = Some(imm); + self.maybe_do_compaction(); + } + + return Ok(()); + } + } + + /// maybe_do_compaction starts a blocking compaction if it makes sense. + fn maybe_do_compaction(&mut self) { + if self.imm.is_none() && !self.vset.needs_compaction() { + return; + } + self.start_compaction(); + } + + /// compaction dispatches the different kinds of compactions depending on the current state of + /// the database. + fn start_compaction(&mut self) { + // TODO (maybe): Support manual compactions. + if self.imm.is_some() { + if let Err(e) = self.compact_memtable() { + log!(self.opt.log, "Error while compacting memtable: {}", e); + } + return; + } + // TODO: rest + } + + fn compact_memtable(&mut self) -> Result<()> { + assert!(self.imm.is_some()); + let mut ve = VersionEdit::new(); + let base = self.vset.current(); + + let imm = self.imm.take().unwrap(); + if let Err(e) = self.write_l0_table(&imm, &mut ve, Some(&base.borrow())) { + self.imm = Some(imm); + return Err(e); + } + ve.set_log_num(self.log_num.unwrap_or(0)); + self.vset.log_and_apply(ve)?; + if let Err(e) = self.delete_obsolete_files() { + log!(self.opt.log, "Error deleting obsolete files: {}", e); + } + Ok(()) + } + + /// write_l0_table writes the given memtable to a table file. + fn write_l0_table(&mut self, + memt: &MemTable, + ve: &mut VersionEdit, + base: Option<&Version>) + -> Result<()> { + let start_ts = self.opt.env.micros(); + let num = self.vset.new_file_number(); + log!(self.opt.log, "Start write of L0 table {:06}", num); + let fmd = build_table(&self.name, &self.opt, memt.iter(), num)?; + log!(self.opt.log, "L0 table {:06} has {} bytes", num, fmd.size); + + let cache_result = self.cache.borrow_mut().get_table(num); + if let Err(e) = cache_result { + log!(self.opt.log, + "L0 table {:06} not returned by cache: {}", + num, + e); + self.opt.env.delete(Path::new(&table_file_name(&self.name, num))).is_ok(); + return Err(e); + } + + let mut stats = CompactionStats::default(); + stats.micros = self.opt.env.micros() - start_ts; + stats.written = fmd.size; + + let mut level = 0; + if let Some(b) = base { + level = b.pick_memtable_output_level(parse_internal_key(&fmd.smallest).2, + parse_internal_key(&fmd.largest).2); + } + + self.add_stats(level, stats); + ve.add_file(level, fmd); + + Ok(()) + } + + fn delete_obsolete_files(&mut self) -> Result<()> { + // TODO: implement + Ok(()) + } +} + +#[derive(Debug, Default)] +struct CompactionStats { + micros: u64, + read: usize, + written: usize, +} + +impl CompactionStats { + fn add(&mut self, cs: CompactionStats) { + self.micros += cs.micros; + self.read += cs.read; + self.written += cs.written; + } +} + +pub fn build_table<I: LdbIterator>(dbname: &str, + opt: &Options, + mut from: I, + num: FileNum) + -> Result<FileMetaData> { + from.reset(); + let filename = table_file_name(dbname, num); + let mut md = FileMetaData::default(); + + let (mut kbuf, mut vbuf) = (vec![], vec![]); + let mut firstkey = None; + // lastkey is what remains in kbuf. + + // Clean up file if write fails at any point. + // + // TODO: Replace with catch {} when available. + let r = (|| -> Result<()> { + let f = opt.env.open_writable_file(Path::new(&filename))?; + let mut builder = TableBuilder::new(opt.clone(), f); + while from.advance() { + assert!(from.current(&mut kbuf, &mut vbuf)); + if firstkey.is_none() { + firstkey = Some(kbuf.clone()); + } + builder.add(&kbuf, &vbuf)?; + } + builder.finish()?; + Ok(()) + })(); + + if let Err(e) = r { + opt.env.delete(Path::new(&filename)).is_ok(); + return Err(e); + } + + md.num = num; + md.size = opt.env.size_of(Path::new(&filename))?; + md.smallest = firstkey.unwrap(); + md.largest = kbuf; + Ok(md) +} + +fn log_file_name(db: &str, num: FileNum) -> String { + format!("{}/{:06}.log", db, num) +} + +/// open_info_log opens an info log file in the given database. It transparently returns a +/// /dev/null logger in case the open fails. +fn open_info_log<E: Env + ?Sized>(env: &E, db: &str) -> Logger { + let logfilename = format!("{}/LOG", db); + let oldlogfilename = format!("{}/LOG.old", db); + env.mkdir(Path::new(db)).is_ok(); + if let Ok(e) = env.exists(Path::new(&logfilename)) { + if e { + env.rename(Path::new(&logfilename), Path::new(&oldlogfilename)).is_ok(); + } + } + if let Ok(w) = env.open_writable_file(Path::new(&logfilename)) { + Logger(w) + } else { + Logger(Box::new(io::sink())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use options; + use key_types::LookupKey; + use mem_env::MemEnv; + use test_util::LdbIteratorIter; + use types::ValueType; + + #[test] + fn test_db_impl_open_info_log() { + let e = MemEnv::new(); + { + let l = share(open_info_log(&e, "abc")); + assert!(e.exists(Path::new("abc/LOG")).unwrap()); + log!(l, "hello {}", "world"); + assert_eq!(12, e.size_of(Path::new("abc/LOG")).unwrap()); + } + { + let l = share(open_info_log(&e, "abc")); + assert!(e.exists(Path::new("abc/LOG.old")).unwrap()); + assert!(e.exists(Path::new("abc/LOG")).unwrap()); + assert_eq!(12, e.size_of(Path::new("abc/LOG.old")).unwrap()); + assert_eq!(0, e.size_of(Path::new("abc/LOG")).unwrap()); + log!(l, "something else"); + log!(l, "and another {}", 1); + + let mut s = String::new(); + let mut r = e.open_sequential_file(Path::new("abc/LOG")).unwrap(); + r.read_to_string(&mut s).unwrap(); + assert_eq!("something else\nand another 1\n", &s); + } + } + + fn build_memtable() -> MemTable { + let mut mt = MemTable::new(options::for_test().cmp); + let mut i = 1; + for k in ["abc", "def", "ghi", "jkl", "mno", "aabc", "test123"].iter() { + mt.add(i, + ValueType::TypeValue, + k.as_bytes(), + "looooongval".as_bytes()); + i += 1; + } + mt + } + + #[test] + fn test_db_impl_build_table() { + let mut opt = options::for_test(); + opt.block_size = 128; + let mt = build_memtable(); + + let f = build_table("db", &opt, mt.iter(), 123).unwrap(); + let path = Path::new("db/000123.ldb"); + + assert_eq!(LookupKey::new("aabc".as_bytes(), 6).internal_key(), + f.smallest.as_slice()); + assert_eq!(LookupKey::new("test123".as_bytes(), 7).internal_key(), + f.largest.as_slice()); + assert_eq!(379, f.size); + assert_eq!(123, f.num); + assert!(opt.env.exists(path).unwrap()); + + { + // Read table back in. + let mut tc = TableCache::new("db", opt.clone(), 100); + let tbl = tc.get_table(123).unwrap(); + assert_eq!(mt.len(), LdbIteratorIter::wrap(&mut tbl.iter()).count()); + } + + { + // Corrupt table; make sure it doesn't load fully. + let mut buf = vec![]; + opt.env.open_sequential_file(path).unwrap().read_to_end(&mut buf).unwrap(); + buf[150] += 1; + opt.env.open_writable_file(path).unwrap().write_all(&buf).unwrap(); + + let mut tc = TableCache::new("db", opt.clone(), 100); + let tbl = tc.get_table(123).unwrap(); + // The last two entries are skipped due to the corruption above. + assert_eq!(5, + LdbIteratorIter::wrap(&mut tbl.iter()).map(|v| println!("{:?}", v)).count()); + } + } + + #[test] + fn test_db_impl_make_room_for_write() { + let mut opt = options::for_test(); + opt.write_buffer_size = 25; + let mut db = DB::new("db", opt); + + // Fill up memtable. + db.mem = build_memtable(); + + // Trigger memtable compaction. + db.make_room_for_write().unwrap(); + assert_eq!(0, db.mem.len()); + assert!(db.opt.env.exists(Path::new("db/000002.log")).unwrap()); + assert!(db.opt.env.exists(Path::new("db/000003.ldb")).unwrap()); + assert_eq!(351, db.opt.env.size_of(Path::new("db/000003.ldb")).unwrap()); + } +}