changeset 273:c0381df8ed4f

db_impl: Implement compaction logic.
author Lewin Bormann <lbo@spheniscida.de>
date Thu, 21 Sep 2017 16:05:04 +0200
parents 8b70eadcd6d2
children 5d65cde43e38
files src/db_impl.rs
diffstat 1 files changed, 257 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/src/db_impl.rs	Thu Sep 21 16:04:44 2017 +0200
+++ b/src/db_impl.rs	Thu Sep 21 16:05:04 2017 +0200
@@ -9,16 +9,18 @@
 use filter::{BoxedFilterPolicy, InternalFilterPolicy};
 use infolog::Logger;
 use log::LogWriter;
-use key_types::parse_internal_key;
+use key_types::{parse_internal_key, ValueType};
 use memtable::MemTable;
 use options::Options;
 use table_builder::TableBuilder;
 use table_cache::{table_file_name, TableCache};
-use types::{share, FileMetaData, FileNum, LdbIterator, NUM_LEVELS, Shared};
+use types::{parse_file_name, share, FileMetaData, FileNum, FileType, LdbIterator,
+            MAX_SEQUENCE_NUMBER, NUM_LEVELS, SequenceNumber, Shared};
 use version_edit::VersionEdit;
-use version_set::VersionSet;
+use version_set::{Compaction, VersionSet};
 use version::Version;
 
+use std::cmp::Ordering;
 use std::io::{self, Write};
 use std::mem;
 use std::path::Path;
@@ -111,8 +113,8 @@
         self.start_compaction();
     }
 
-    /// compaction dispatches the different kinds of compactions depending on the current state of
-    /// the database.
+    /// start_compaction dispatches the different kinds of compactions depending on the current
+    /// state of the database.
     fn start_compaction(&mut self) {
         // TODO (maybe): Support manual compactions.
         if self.imm.is_some() {
@@ -121,7 +123,41 @@
             }
             return;
         }
-        // TODO: rest
+
+        let compaction = self.vset.pick_compaction();
+        if compaction.is_none() {
+            return;
+        }
+        let mut compaction = compaction.unwrap();
+
+        if compaction.is_trivial_move() {
+            assert_eq!(1, compaction.num_inputs(0));
+            let f = compaction.input(0, 0);
+            let num = f.num;
+            let size = f.size;
+            let level = compaction.level();
+
+            compaction.edit().delete_file(level, num);
+            compaction.edit().add_file(level + 1, f);
+
+            if let Err(e) = self.vset.log_and_apply(compaction.into_edit()) {
+                log!(self.opt.log, "trivial move failed: {}", e);
+            } else {
+                log!(self.opt.log,
+                     "Moved num={} bytes={} from L{} to L{}",
+                     num,
+                     size,
+                     level,
+                     level + 1);
+                log!(self.opt.log, "Summary: {}", self.vset.current_summary());
+            }
+        } else {
+            let state = CompactionState::new(compaction);
+            if let Err(e) = self.do_compaction_work(state) {
+                log!(self.opt.log, "Compaction work failed: {}", e);
+            }
+            self.delete_obsolete_files().is_ok();
+        }
     }
 
     fn compact_memtable(&mut self) -> Result<()> {
@@ -180,12 +216,226 @@
         Ok(())
     }
 
+    fn do_compaction_work(&mut self, mut cs: CompactionState) -> Result<()> {
+        let start_ts = self.opt.env.micros();
+        log!(self.opt.log,
+             "Compacting {} files at L{} and {} files at L{}",
+             cs.compaction.num_inputs(0),
+             cs.compaction.level(),
+             cs.compaction.num_inputs(1),
+             cs.compaction.level() + 1);
+        assert!(self.vset.num_level_files(cs.compaction.level()) > 0);
+        assert!(cs.builder.is_none());
+
+        let mut input = self.vset.make_input_iterator(&cs.compaction);
+        input.seek_to_first();
+
+        let (mut key, mut val) = (vec![], vec![]);
+        let mut last_seq_for_key = MAX_SEQUENCE_NUMBER;
+
+        let mut have_ukey = false;
+        let mut current_ukey = vec![];
+
+        while input.valid() {
+            // TODO: Do we need to do a memtable compaction here? Probably not, in the sequential
+            // case.
+            assert!(input.current(&mut key, &mut val));
+            if cs.compaction.should_stop_before(&key) && cs.builder.is_none() {
+                self.finish_compaction_output(&mut cs, key.clone())?;
+            }
+            let (ktyp, seq, ukey) = parse_internal_key(&key);
+            if seq == 0 {
+                // Parsing failed.
+                log!(self.opt.log, "Encountered seq=0 in key: {:?}", &key);
+                last_seq_for_key = MAX_SEQUENCE_NUMBER;
+                continue;
+            }
+
+            if !have_ukey || self.opt.cmp.cmp(ukey, &current_ukey) != Ordering::Equal {
+                // First occurrence of this key.
+                current_ukey.clear();
+                current_ukey.extend_from_slice(ukey);
+                have_ukey = true;
+                last_seq_for_key = MAX_SEQUENCE_NUMBER;
+            }
+
+            // We can omit the key under the following conditions:
+            if last_seq_for_key <= cs.smallest_seq {
+                continue;
+            }
+            if ktyp == ValueType::TypeDeletion && seq <= cs.smallest_seq &&
+               cs.compaction.is_base_level_for(ukey) {
+                continue;
+            }
+
+            if cs.builder.is_none() {
+                let fnum = self.vset.new_file_number();
+                let mut fmd = FileMetaData::default();
+                fmd.num = fnum;
+
+                let fname = table_file_name(&self.name, fnum);
+                let f = self.opt.env.open_writable_file(Path::new(&fname))?;
+                cs.builder = Some(TableBuilder::new(self.opt.clone(), f));
+                cs.outputs.push(fmd);
+            }
+            if cs.builder.as_ref().unwrap().entries() == 0 {
+                cs.current_output().smallest = key.clone();
+            }
+            cs.builder.as_mut().unwrap().add(&key, &val)?;
+            // NOTE: Adjust max file size based on level.
+            if cs.builder.as_ref().unwrap().size_estimate() > self.opt.max_file_size {
+                self.finish_compaction_output(&mut cs, key.clone())?;
+            }
+
+            input.advance();
+        }
+
+        if cs.builder.is_some() {
+            self.finish_compaction_output(&mut cs, key)?;
+        }
+
+        let mut stats = CompactionStats::default();
+        stats.micros = self.opt.env.micros() - start_ts;
+        for parent in 0..2 {
+            for inp in 0..cs.compaction.num_inputs(parent) {
+                stats.read += cs.compaction.input(parent, inp).size;
+            }
+        }
+        for output in &cs.outputs {
+            stats.written += output.size;
+        }
+        self.cstats[cs.compaction.level()].add(stats);
+        self.install_compaction_results(cs)?;
+        log!(self.opt.log,
+             "Compaction finished with {}",
+             self.vset.current_summary());
+
+        Ok(())
+    }
+
+    fn finish_compaction_output(&mut self,
+                                cs: &mut CompactionState,
+                                largest: Vec<u8>)
+                                -> Result<()> {
+        assert!(cs.builder.is_some());
+        let output_num = cs.current_output().num;
+        assert!(output_num > 0);
+
+        // The original checks if the input iterator has an OK status. For this, we'd need to
+        // extend the LdbIterator interface though -- let's see if we can without for now.
+        // (it's not good for corruptions, in any case)
+        let b = cs.builder.take().unwrap();
+        let entries = b.entries();
+        let bytes = b.finish()?;
+        cs.total_bytes += bytes;
+
+        cs.current_output().largest = largest;
+        cs.current_output().size = bytes;
+
+        if entries > 0 {
+            // Verify that table can be used.
+            if let Err(e) = self.cache.borrow_mut().get_table(output_num) {
+                log!(self.opt.log, "New table can't be read: {}", e);
+                return Err(e);
+            }
+            log!(self.opt.log,
+                 "New table num={}: keys={} size={}",
+                 output_num,
+                 entries,
+                 bytes);
+        }
+        Ok(())
+    }
+
+    fn install_compaction_results(&mut self, mut cs: CompactionState) -> Result<()> {
+        log!(self.opt.log,
+             "Compacted {} L{} files + {} L{} files => {}B",
+             cs.compaction.num_inputs(0),
+             cs.compaction.level(),
+             cs.compaction.num_inputs(1),
+             cs.compaction.level() + 1,
+             cs.total_bytes);
+        cs.compaction.add_input_deletions();
+        let level = cs.compaction.level();
+        for output in &cs.outputs {
+            cs.compaction.edit().add_file(level + 1, output.clone());
+        }
+        self.vset.log_and_apply(cs.compaction.into_edit())
+    }
+
     fn delete_obsolete_files(&mut self) -> Result<()> {
-        // TODO: implement
+        let files = self.vset.live_files();
+        let filenames = self.opt.env.children(Path::new(&self.name))?;
+
+        for name in filenames {
+            if let Ok((num, typ)) = parse_file_name(&name) {
+                match typ {
+                    FileType::Log => {
+                        if num >= self.vset.log_num {
+                            continue;
+                        }
+                    }
+                    FileType::Descriptor => {
+                        if num >= self.vset.manifest_num {
+                            continue;
+                        }
+                    }
+                    FileType::Table => {
+                        if files.contains(&num) {
+                            continue;
+                        }
+                    }
+                    // NOTE: In this non-concurrent implementation, we likely never find temp
+                    // files.
+                    FileType::Temp => {
+                        if files.contains(&num) {
+                            continue;
+                        }
+                    }
+                    FileType::Current | FileType::DBLock | FileType::InfoLog => continue,
+                }
+
+                // If we're here, delete this file.
+                if typ == FileType::Table {
+                    self.cache.borrow_mut().evict(num).is_ok();
+                }
+                log!(self.opt.log, "Deleting file type={:?} num={}", typ, num);
+                if let Err(e) = self.opt
+                    .env
+                    .delete(Path::new(&format!("{}/{}", &self.name, &name))) {
+                    log!(self.opt.log, "Deleting file num={} failed: {}", num, e);
+                }
+            }
+        }
         Ok(())
     }
 }
 
+struct CompactionState {
+    compaction: Compaction,
+    smallest_seq: SequenceNumber,
+    outputs: Vec<FileMetaData>,
+    builder: Option<TableBuilder<Box<Write>>>,
+    total_bytes: usize,
+}
+
+impl CompactionState {
+    fn new(c: Compaction) -> CompactionState {
+        CompactionState {
+            compaction: c,
+            smallest_seq: 0,
+            outputs: vec![],
+            builder: None,
+            total_bytes: 0,
+        }
+    }
+
+    fn current_output(&mut self) -> &mut FileMetaData {
+        let len = self.outputs.len();
+        &mut self.outputs[len - 1]
+    }
+}
+
 #[derive(Debug, Default)]
 struct CompactionStats {
     micros: u64,
@@ -273,7 +523,6 @@
     use key_types::LookupKey;
     use mem_env::MemEnv;
     use test_util::LdbIteratorIter;
-    use types::ValueType;
 
     #[test]
     fn test_db_impl_open_info_log() {