Mercurial > lbo > hg > leveldb-rs
changeset 306:e36fd18488c1
db_impl: Propagate errors better and clean up failed transactions.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Wed, 27 Sep 2017 20:39:09 +0200 |
parents | 08c87265d313 |
children | 443bf10ac7a5 |
files | src/db_impl.rs |
diffstat | 1 files changed, 53 insertions(+), 30 deletions(-) [+] |
line wrap: on
line diff
--- a/src/db_impl.rs Wed Sep 27 20:37:21 2017 +0200 +++ b/src/db_impl.rs Wed Sep 27 20:39:09 2017 +0200 @@ -109,8 +109,7 @@ } db.delete_obsolete_files()?; - db.maybe_do_compaction(); - + db.maybe_do_compaction()?; Ok(db) } @@ -361,7 +360,6 @@ log.flush()?; } self.vset.last_seq += entries; - Ok(()) } @@ -383,7 +381,9 @@ fn record_read_sample<'a>(&mut self, k: InternalKey<'a>) { let current = self.vset.current(); if current.borrow_mut().record_read_sample(k) { - self.maybe_do_compaction(); + if let Err(e) = self.maybe_do_compaction() { + log!(self.opt.log, "record_read_sample: compaction failed: {}", e); + } } } } @@ -405,14 +405,14 @@ /// if it's the case. fn make_room_for_write(&mut self) -> Result<()> { if self.mem.approx_mem_usage() < self.opt.write_buffer_size { - return Ok(()); + Ok(()) } else { // Create new memtable. let logn = self.vset.new_file_number(); let logf = self.opt.env.open_writable_file(Path::new(&log_file_name(&self.name, logn))); if logf.is_err() { self.vset.reuse_file_number(logn); - logf?; + Err(logf.err().unwrap()) } else { self.log = Some(LogWriter::new(logf.unwrap())); self.log_num = Some(logn); @@ -420,35 +420,30 @@ let mut imm = MemTable::new(self.opt.cmp.clone()); mem::swap(&mut imm, &mut self.mem); self.imm = Some(imm); - self.maybe_do_compaction(); + self.maybe_do_compaction() } - - return Ok(()); } } /// maybe_do_compaction starts a blocking compaction if it makes sense. - fn maybe_do_compaction(&mut self) { + fn maybe_do_compaction(&mut self) -> Result<()> { if self.imm.is_none() && !self.vset.needs_compaction() { - return; + return Ok(()); } - self.start_compaction(); + self.start_compaction() } /// start_compaction dispatches the different kinds of compactions depending on the current /// state of the database. - fn start_compaction(&mut self) { + fn start_compaction(&mut self) -> Result<()> { // TODO (maybe): Support manual compactions. if self.imm.is_some() { - if let Err(e) = self.compact_memtable() { - log!(self.opt.log, "Error while compacting memtable: {}", e); - } - return; + return self.compact_memtable(); } let compaction = self.vset.pick_compaction(); if compaction.is_none() { - return; + return Ok(()); } let mut compaction = compaction.unwrap(); @@ -464,6 +459,7 @@ if let Err(e) = self.vset.log_and_apply(compaction.into_edit()) { log!(self.opt.log, "trivial move failed: {}", e); + Err(e) } else { log!(self.opt.log, "Moved num={} bytes={} from L{} to L{}", @@ -472,13 +468,20 @@ level, level + 1); log!(self.opt.log, "Summary: {}", self.vset.current_summary()); + Ok(()) } } else { - let state = CompactionState::new(compaction); - if let Err(e) = self.do_compaction_work(state) { + let mut state = CompactionState::new(compaction); + if let Err(e) = self.do_compaction_work(&mut state) { + state.cleanup(&self.opt.env, &self.name); log!(self.opt.log, "Compaction work failed: {}", e); } - self.delete_obsolete_files().is_ok(); + self.install_compaction_results(state)?; + log!(self.opt.log, + "Compaction finished: {}", + self.vset.current_summary()); + + self.delete_obsolete_files() } } @@ -544,7 +547,7 @@ Ok(()) } - fn do_compaction_work(&mut self, mut cs: CompactionState) -> Result<()> { + fn do_compaction_work(&mut self, cs: &mut CompactionState) -> Result<()> { let start_ts = self.opt.env.micros(); log!(self.opt.log, "Compacting {} files at L{} and {} files at L{}", @@ -575,7 +578,7 @@ // case. assert!(input.current(&mut key, &mut val)); if cs.compaction.should_stop_before(&key) && cs.builder.is_none() { - self.finish_compaction_output(&mut cs, key.clone())?; + self.finish_compaction_output(cs, key.clone())?; } let (ktyp, seq, ukey) = parse_internal_key(&key); if seq == 0 { @@ -618,14 +621,14 @@ cs.builder.as_mut().unwrap().add(&key, &val)?; // NOTE: Adjust max file size based on level. if cs.builder.as_ref().unwrap().size_estimate() > self.opt.max_file_size { - self.finish_compaction_output(&mut cs, key.clone())?; + self.finish_compaction_output(cs, key.clone())?; } input.advance(); } if cs.builder.is_some() { - self.finish_compaction_output(&mut cs, key)?; + self.finish_compaction_output(cs, key)?; } let mut stats = CompactionStats::default(); @@ -639,11 +642,6 @@ stats.written += output.size; } self.cstats[cs.compaction.level()].add(stats); - self.install_compaction_results(cs)?; - log!(self.opt.log, - "Compaction finished: {}", - self.vset.current_summary()); - Ok(()) } @@ -727,6 +725,14 @@ let len = self.outputs.len(); &mut self.outputs[len - 1] } + + /// cleanup cleans up after an aborted compaction. + fn cleanup(&mut self, env: &Box<Env>, name: &str) { + for o in self.outputs.drain(..) { + let name = table_file_name(name, o.num); + env.delete(Path::new(&name)).is_ok(); + } + } } #[derive(Debug, Default)] @@ -1074,4 +1080,21 @@ assert_eq!(1, v.borrow().files[2].len()); assert_eq!(3, v.borrow().files[3].len()); } + + #[test] + fn test_db_impl_compaction_state_cleanup() { + let env: Box<Env> = Box::new(MemEnv::new()); + let name = "db"; + + let stuff = "abcdefghijkl".as_bytes(); + env.open_writable_file(Path::new("db/000001.ldb")).unwrap().write_all(stuff).unwrap(); + let mut fmd = FileMetaData::default(); + fmd.num = 1; + + let mut cs = CompactionState::new(Compaction::new(&options::for_test(), 2, None)); + cs.outputs = vec![fmd]; + cs.cleanup(&env, name); + + assert!(!env.exists(Path::new("db/000001.ldb")).unwrap()); + } }