Mercurial > lbo > hg > leveldb-rs
changeset 273:c0381df8ed4f
db_impl: Implement compaction logic.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Thu, 21 Sep 2017 16:05:04 +0200 |
parents | 8b70eadcd6d2 |
children | 5d65cde43e38 |
files | src/db_impl.rs |
diffstat | 1 files changed, 257 insertions(+), 8 deletions(-) [+] |
line wrap: on
line diff
--- a/src/db_impl.rs Thu Sep 21 16:04:44 2017 +0200 +++ b/src/db_impl.rs Thu Sep 21 16:05:04 2017 +0200 @@ -9,16 +9,18 @@ use filter::{BoxedFilterPolicy, InternalFilterPolicy}; use infolog::Logger; use log::LogWriter; -use key_types::parse_internal_key; +use key_types::{parse_internal_key, ValueType}; use memtable::MemTable; use options::Options; use table_builder::TableBuilder; use table_cache::{table_file_name, TableCache}; -use types::{share, FileMetaData, FileNum, LdbIterator, NUM_LEVELS, Shared}; +use types::{parse_file_name, share, FileMetaData, FileNum, FileType, LdbIterator, + MAX_SEQUENCE_NUMBER, NUM_LEVELS, SequenceNumber, Shared}; use version_edit::VersionEdit; -use version_set::VersionSet; +use version_set::{Compaction, VersionSet}; use version::Version; +use std::cmp::Ordering; use std::io::{self, Write}; use std::mem; use std::path::Path; @@ -111,8 +113,8 @@ self.start_compaction(); } - /// compaction dispatches the different kinds of compactions depending on the current state of - /// the database. + /// start_compaction dispatches the different kinds of compactions depending on the current + /// state of the database. fn start_compaction(&mut self) { // TODO (maybe): Support manual compactions. if self.imm.is_some() { @@ -121,7 +123,41 @@ } return; } - // TODO: rest + + let compaction = self.vset.pick_compaction(); + if compaction.is_none() { + return; + } + let mut compaction = compaction.unwrap(); + + if compaction.is_trivial_move() { + assert_eq!(1, compaction.num_inputs(0)); + let f = compaction.input(0, 0); + let num = f.num; + let size = f.size; + let level = compaction.level(); + + compaction.edit().delete_file(level, num); + compaction.edit().add_file(level + 1, f); + + if let Err(e) = self.vset.log_and_apply(compaction.into_edit()) { + log!(self.opt.log, "trivial move failed: {}", e); + } else { + log!(self.opt.log, + "Moved num={} bytes={} from L{} to L{}", + num, + size, + level, + level + 1); + log!(self.opt.log, "Summary: {}", self.vset.current_summary()); + } + } else { + let state = CompactionState::new(compaction); + if let Err(e) = self.do_compaction_work(state) { + log!(self.opt.log, "Compaction work failed: {}", e); + } + self.delete_obsolete_files().is_ok(); + } } fn compact_memtable(&mut self) -> Result<()> { @@ -180,12 +216,226 @@ Ok(()) } + fn do_compaction_work(&mut self, mut cs: CompactionState) -> Result<()> { + let start_ts = self.opt.env.micros(); + log!(self.opt.log, + "Compacting {} files at L{} and {} files at L{}", + cs.compaction.num_inputs(0), + cs.compaction.level(), + cs.compaction.num_inputs(1), + cs.compaction.level() + 1); + assert!(self.vset.num_level_files(cs.compaction.level()) > 0); + assert!(cs.builder.is_none()); + + let mut input = self.vset.make_input_iterator(&cs.compaction); + input.seek_to_first(); + + let (mut key, mut val) = (vec![], vec![]); + let mut last_seq_for_key = MAX_SEQUENCE_NUMBER; + + let mut have_ukey = false; + let mut current_ukey = vec![]; + + while input.valid() { + // TODO: Do we need to do a memtable compaction here? Probably not, in the sequential + // case. + assert!(input.current(&mut key, &mut val)); + if cs.compaction.should_stop_before(&key) && cs.builder.is_none() { + self.finish_compaction_output(&mut cs, key.clone())?; + } + let (ktyp, seq, ukey) = parse_internal_key(&key); + if seq == 0 { + // Parsing failed. + log!(self.opt.log, "Encountered seq=0 in key: {:?}", &key); + last_seq_for_key = MAX_SEQUENCE_NUMBER; + continue; + } + + if !have_ukey || self.opt.cmp.cmp(ukey, ¤t_ukey) != Ordering::Equal { + // First occurrence of this key. + current_ukey.clear(); + current_ukey.extend_from_slice(ukey); + have_ukey = true; + last_seq_for_key = MAX_SEQUENCE_NUMBER; + } + + // We can omit the key under the following conditions: + if last_seq_for_key <= cs.smallest_seq { + continue; + } + if ktyp == ValueType::TypeDeletion && seq <= cs.smallest_seq && + cs.compaction.is_base_level_for(ukey) { + continue; + } + + if cs.builder.is_none() { + let fnum = self.vset.new_file_number(); + let mut fmd = FileMetaData::default(); + fmd.num = fnum; + + let fname = table_file_name(&self.name, fnum); + let f = self.opt.env.open_writable_file(Path::new(&fname))?; + cs.builder = Some(TableBuilder::new(self.opt.clone(), f)); + cs.outputs.push(fmd); + } + if cs.builder.as_ref().unwrap().entries() == 0 { + cs.current_output().smallest = key.clone(); + } + cs.builder.as_mut().unwrap().add(&key, &val)?; + // NOTE: Adjust max file size based on level. + if cs.builder.as_ref().unwrap().size_estimate() > self.opt.max_file_size { + self.finish_compaction_output(&mut cs, key.clone())?; + } + + input.advance(); + } + + if cs.builder.is_some() { + self.finish_compaction_output(&mut cs, key)?; + } + + let mut stats = CompactionStats::default(); + stats.micros = self.opt.env.micros() - start_ts; + for parent in 0..2 { + for inp in 0..cs.compaction.num_inputs(parent) { + stats.read += cs.compaction.input(parent, inp).size; + } + } + for output in &cs.outputs { + stats.written += output.size; + } + self.cstats[cs.compaction.level()].add(stats); + self.install_compaction_results(cs)?; + log!(self.opt.log, + "Compaction finished with {}", + self.vset.current_summary()); + + Ok(()) + } + + fn finish_compaction_output(&mut self, + cs: &mut CompactionState, + largest: Vec<u8>) + -> Result<()> { + assert!(cs.builder.is_some()); + let output_num = cs.current_output().num; + assert!(output_num > 0); + + // The original checks if the input iterator has an OK status. For this, we'd need to + // extend the LdbIterator interface though -- let's see if we can without for now. + // (it's not good for corruptions, in any case) + let b = cs.builder.take().unwrap(); + let entries = b.entries(); + let bytes = b.finish()?; + cs.total_bytes += bytes; + + cs.current_output().largest = largest; + cs.current_output().size = bytes; + + if entries > 0 { + // Verify that table can be used. + if let Err(e) = self.cache.borrow_mut().get_table(output_num) { + log!(self.opt.log, "New table can't be read: {}", e); + return Err(e); + } + log!(self.opt.log, + "New table num={}: keys={} size={}", + output_num, + entries, + bytes); + } + Ok(()) + } + + fn install_compaction_results(&mut self, mut cs: CompactionState) -> Result<()> { + log!(self.opt.log, + "Compacted {} L{} files + {} L{} files => {}B", + cs.compaction.num_inputs(0), + cs.compaction.level(), + cs.compaction.num_inputs(1), + cs.compaction.level() + 1, + cs.total_bytes); + cs.compaction.add_input_deletions(); + let level = cs.compaction.level(); + for output in &cs.outputs { + cs.compaction.edit().add_file(level + 1, output.clone()); + } + self.vset.log_and_apply(cs.compaction.into_edit()) + } + fn delete_obsolete_files(&mut self) -> Result<()> { - // TODO: implement + let files = self.vset.live_files(); + let filenames = self.opt.env.children(Path::new(&self.name))?; + + for name in filenames { + if let Ok((num, typ)) = parse_file_name(&name) { + match typ { + FileType::Log => { + if num >= self.vset.log_num { + continue; + } + } + FileType::Descriptor => { + if num >= self.vset.manifest_num { + continue; + } + } + FileType::Table => { + if files.contains(&num) { + continue; + } + } + // NOTE: In this non-concurrent implementation, we likely never find temp + // files. + FileType::Temp => { + if files.contains(&num) { + continue; + } + } + FileType::Current | FileType::DBLock | FileType::InfoLog => continue, + } + + // If we're here, delete this file. + if typ == FileType::Table { + self.cache.borrow_mut().evict(num).is_ok(); + } + log!(self.opt.log, "Deleting file type={:?} num={}", typ, num); + if let Err(e) = self.opt + .env + .delete(Path::new(&format!("{}/{}", &self.name, &name))) { + log!(self.opt.log, "Deleting file num={} failed: {}", num, e); + } + } + } Ok(()) } } +struct CompactionState { + compaction: Compaction, + smallest_seq: SequenceNumber, + outputs: Vec<FileMetaData>, + builder: Option<TableBuilder<Box<Write>>>, + total_bytes: usize, +} + +impl CompactionState { + fn new(c: Compaction) -> CompactionState { + CompactionState { + compaction: c, + smallest_seq: 0, + outputs: vec![], + builder: None, + total_bytes: 0, + } + } + + fn current_output(&mut self) -> &mut FileMetaData { + let len = self.outputs.len(); + &mut self.outputs[len - 1] + } +} + #[derive(Debug, Default)] struct CompactionStats { micros: u64, @@ -273,7 +523,6 @@ use key_types::LookupKey; use mem_env::MemEnv; use test_util::LdbIteratorIter; - use types::ValueType; #[test] fn test_db_impl_open_info_log() {