changeset 306:e36fd18488c1

db_impl: Propagate errors better and clean up failed transactions.
author Lewin Bormann <lbo@spheniscida.de>
date Wed, 27 Sep 2017 20:39:09 +0200
parents 08c87265d313
children 443bf10ac7a5
files src/db_impl.rs
diffstat 1 files changed, 53 insertions(+), 30 deletions(-) [+]
line wrap: on
line diff
--- a/src/db_impl.rs	Wed Sep 27 20:37:21 2017 +0200
+++ b/src/db_impl.rs	Wed Sep 27 20:39:09 2017 +0200
@@ -109,8 +109,7 @@
         }
 
         db.delete_obsolete_files()?;
-        db.maybe_do_compaction();
-
+        db.maybe_do_compaction()?;
         Ok(db)
     }
 
@@ -361,7 +360,6 @@
             log.flush()?;
         }
         self.vset.last_seq += entries;
-
         Ok(())
     }
 
@@ -383,7 +381,9 @@
     fn record_read_sample<'a>(&mut self, k: InternalKey<'a>) {
         let current = self.vset.current();
         if current.borrow_mut().record_read_sample(k) {
-            self.maybe_do_compaction();
+            if let Err(e) = self.maybe_do_compaction() {
+                log!(self.opt.log, "record_read_sample: compaction failed: {}", e);
+            }
         }
     }
 }
@@ -405,14 +405,14 @@
     /// if it's the case.
     fn make_room_for_write(&mut self) -> Result<()> {
         if self.mem.approx_mem_usage() < self.opt.write_buffer_size {
-            return Ok(());
+            Ok(())
         } else {
             // Create new memtable.
             let logn = self.vset.new_file_number();
             let logf = self.opt.env.open_writable_file(Path::new(&log_file_name(&self.name, logn)));
             if logf.is_err() {
                 self.vset.reuse_file_number(logn);
-                logf?;
+                Err(logf.err().unwrap())
             } else {
                 self.log = Some(LogWriter::new(logf.unwrap()));
                 self.log_num = Some(logn);
@@ -420,35 +420,30 @@
                 let mut imm = MemTable::new(self.opt.cmp.clone());
                 mem::swap(&mut imm, &mut self.mem);
                 self.imm = Some(imm);
-                self.maybe_do_compaction();
+                self.maybe_do_compaction()
             }
-
-            return Ok(());
         }
     }
 
     /// maybe_do_compaction starts a blocking compaction if it makes sense.
-    fn maybe_do_compaction(&mut self) {
+    fn maybe_do_compaction(&mut self) -> Result<()> {
         if self.imm.is_none() && !self.vset.needs_compaction() {
-            return;
+            return Ok(());
         }
-        self.start_compaction();
+        self.start_compaction()
     }
 
     /// start_compaction dispatches the different kinds of compactions depending on the current
     /// state of the database.
-    fn start_compaction(&mut self) {
+    fn start_compaction(&mut self) -> Result<()> {
         // TODO (maybe): Support manual compactions.
         if self.imm.is_some() {
-            if let Err(e) = self.compact_memtable() {
-                log!(self.opt.log, "Error while compacting memtable: {}", e);
-            }
-            return;
+            return self.compact_memtable();
         }
 
         let compaction = self.vset.pick_compaction();
         if compaction.is_none() {
-            return;
+            return Ok(());
         }
         let mut compaction = compaction.unwrap();
 
@@ -464,6 +459,7 @@
 
             if let Err(e) = self.vset.log_and_apply(compaction.into_edit()) {
                 log!(self.opt.log, "trivial move failed: {}", e);
+                Err(e)
             } else {
                 log!(self.opt.log,
                      "Moved num={} bytes={} from L{} to L{}",
@@ -472,13 +468,20 @@
                      level,
                      level + 1);
                 log!(self.opt.log, "Summary: {}", self.vset.current_summary());
+                Ok(())
             }
         } else {
-            let state = CompactionState::new(compaction);
-            if let Err(e) = self.do_compaction_work(state) {
+            let mut state = CompactionState::new(compaction);
+            if let Err(e) = self.do_compaction_work(&mut state) {
+                state.cleanup(&self.opt.env, &self.name);
                 log!(self.opt.log, "Compaction work failed: {}", e);
             }
-            self.delete_obsolete_files().is_ok();
+            self.install_compaction_results(state)?;
+            log!(self.opt.log,
+                 "Compaction finished: {}",
+                 self.vset.current_summary());
+
+            self.delete_obsolete_files()
         }
     }
 
@@ -544,7 +547,7 @@
         Ok(())
     }
 
-    fn do_compaction_work(&mut self, mut cs: CompactionState) -> Result<()> {
+    fn do_compaction_work(&mut self, cs: &mut CompactionState) -> Result<()> {
         let start_ts = self.opt.env.micros();
         log!(self.opt.log,
              "Compacting {} files at L{} and {} files at L{}",
@@ -575,7 +578,7 @@
             // case.
             assert!(input.current(&mut key, &mut val));
             if cs.compaction.should_stop_before(&key) && cs.builder.is_none() {
-                self.finish_compaction_output(&mut cs, key.clone())?;
+                self.finish_compaction_output(cs, key.clone())?;
             }
             let (ktyp, seq, ukey) = parse_internal_key(&key);
             if seq == 0 {
@@ -618,14 +621,14 @@
             cs.builder.as_mut().unwrap().add(&key, &val)?;
             // NOTE: Adjust max file size based on level.
             if cs.builder.as_ref().unwrap().size_estimate() > self.opt.max_file_size {
-                self.finish_compaction_output(&mut cs, key.clone())?;
+                self.finish_compaction_output(cs, key.clone())?;
             }
 
             input.advance();
         }
 
         if cs.builder.is_some() {
-            self.finish_compaction_output(&mut cs, key)?;
+            self.finish_compaction_output(cs, key)?;
         }
 
         let mut stats = CompactionStats::default();
@@ -639,11 +642,6 @@
             stats.written += output.size;
         }
         self.cstats[cs.compaction.level()].add(stats);
-        self.install_compaction_results(cs)?;
-        log!(self.opt.log,
-             "Compaction finished: {}",
-             self.vset.current_summary());
-
         Ok(())
     }
 
@@ -727,6 +725,14 @@
         let len = self.outputs.len();
         &mut self.outputs[len - 1]
     }
+
+    /// cleanup cleans up after an aborted compaction.
+    fn cleanup(&mut self, env: &Box<Env>, name: &str) {
+        for o in self.outputs.drain(..) {
+            let name = table_file_name(name, o.num);
+            env.delete(Path::new(&name)).is_ok();
+        }
+    }
 }
 
 #[derive(Debug, Default)]
@@ -1074,4 +1080,21 @@
         assert_eq!(1, v.borrow().files[2].len());
         assert_eq!(3, v.borrow().files[3].len());
     }
+
+    #[test]
+    fn test_db_impl_compaction_state_cleanup() {
+        let env: Box<Env> = Box::new(MemEnv::new());
+        let name = "db";
+
+        let stuff = "abcdefghijkl".as_bytes();
+        env.open_writable_file(Path::new("db/000001.ldb")).unwrap().write_all(stuff).unwrap();
+        let mut fmd = FileMetaData::default();
+        fmd.num = 1;
+
+        let mut cs = CompactionState::new(Compaction::new(&options::for_test(), 2, None));
+        cs.outputs = vec![fmd];
+        cs.cleanup(&env, name);
+
+        assert!(!env.exists(Path::new("db/000001.ldb")).unwrap());
+    }
 }