Mercurial > lbo > hg > leveldb-rs
changeset 325:f39d34cf05e5
db_impl: Make VersionSet shared
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 01 Oct 2017 20:36:48 +0200 |
parents | 4637c87d8b47 |
children | 540851b54189 |
files | src/db_impl.rs |
diffstat | 1 files changed, 66 insertions(+), 64 deletions(-) [+] |
line wrap: on
line diff
--- a/src/db_impl.rs Sun Oct 01 14:17:49 2017 +0000 +++ b/src/db_impl.rs Sun Oct 01 20:36:48 2017 +0200 @@ -3,6 +3,8 @@ #![allow(unused_attributes)] +use db_iter::DBIterator; + use cmp::{Cmp, InternalKeyCmp}; use env::{Env, FileLock}; use error::{err, Status, StatusCode, Result}; @@ -11,6 +13,7 @@ use log::{LogReader, LogWriter}; use key_types::{parse_internal_key, InternalKey, LookupKey, ValueType}; use memtable::MemTable; +use merging_iter::MergingIter; use options::Options; use snapshot::{Snapshot, SnapshotList}; use table_builder::TableBuilder; @@ -25,7 +28,7 @@ use std::cmp::Ordering; use std::io::{self, Write}; use std::mem; -use std::ops::Drop; +use std::ops::{DerefMut, Drop}; use std::path::Path; use std::rc::Rc; @@ -35,7 +38,7 @@ name: String, lock: Option<FileLock>, - cmp: InternalKeyCmp, + internal_cmp: Rc<Box<Cmp>>, fpol: InternalFilterPolicy<BoxedFilterPolicy>, opt: Options, @@ -45,7 +48,7 @@ log: Option<LogWriter<Box<Write>>>, log_num: Option<FileNum>, cache: Shared<TableCache>, - vset: VersionSet, + vset: Shared<VersionSet>, snaps: SnapshotList, cstats: [CompactionStats; NUM_LEVELS], @@ -67,7 +70,7 @@ DB { name: name.to_string(), lock: None, - cmp: InternalKeyCmp(opt.cmp.clone()), + internal_cmp: Rc::new(Box::new(InternalKeyCmp(opt.cmp.clone()))), fpol: InternalFilterPolicy::new(opt.filter_policy.clone()), mem: MemTable::new(opt.cmp.clone()), @@ -78,13 +81,17 @@ log: None, log_num: None, cache: cache, - vset: vset, + vset: share(vset), snaps: SnapshotList::new(), cstats: Default::default(), } } + fn current(&self) -> Shared<Version> { + self.vset.borrow().current() + } + /// Opens or creates* a new or existing database. /// /// *depending on the options set (create_if_missing, error_if_exists). @@ -95,7 +102,7 @@ // Create log file if an old one is not being reused. if db.log.is_none() { - let lognum = db.vset.new_file_number(); + let lognum = db.vset.borrow_mut().new_file_number(); let logfile = db.opt.env.open_writable_file(Path::new(&log_file_name(&db.name, lognum)))?; ve.set_log_num(lognum); @@ -105,7 +112,7 @@ if save_manifest { ve.set_log_num(db.log_num.unwrap_or(0)); - db.vset.log_and_apply(ve)?; + db.vset.borrow_mut().log_and_apply(ve)?; } db.delete_obsolete_files()?; @@ -151,18 +158,18 @@ // If save_manifest is true, the existing manifest is reused and we should log_and_apply() // later. - let mut save_manifest = self.vset.recover()?; + let mut save_manifest = self.vset.borrow_mut().recover()?; // Recover from all log files not in the descriptor. let mut max_seq = 0; let filenames = self.opt.env.children(Path::new(&self.name))?; - let mut expected = self.vset.live_files(); + let mut expected = self.vset.borrow().live_files(); let mut log_files = vec![]; for file in &filenames { if let Ok((num, typ)) = parse_file_name(&file) { expected.remove(&num); - if typ == FileType::Log && num >= self.vset.log_num { + if typ == FileType::Log && num >= self.vset.borrow().log_num { log_files.push(num); } } @@ -182,11 +189,11 @@ if max_seq_ > max_seq { max_seq = max_seq_; } - self.vset.mark_file_number_used(log_files[i]); + self.vset.borrow_mut().mark_file_number_used(log_files[i]); } - if self.vset.last_seq < max_seq { - self.vset.last_seq = max_seq; + if self.vset.borrow().last_seq < max_seq { + self.vset.borrow_mut().last_seq = max_seq; } Ok(save_manifest) @@ -264,19 +271,19 @@ /// delete_obsolete_files removes files that are no longer needed from the file system. fn delete_obsolete_files(&mut self) -> Result<()> { - let files = self.vset.live_files(); + let files = self.vset.borrow().live_files(); let filenames = self.opt.env.children(Path::new(&self.name))?; for name in filenames { if let Ok((num, typ)) = parse_file_name(&name) { log!(self.opt.log, "{} {:?}", num, typ); match typ { FileType::Log => { - if num >= self.vset.log_num { + if num >= self.vset.borrow().log_num { continue; } } FileType::Descriptor => { - if num >= self.vset.manifest_num { + if num >= self.vset.borrow().manifest_num { continue; } } @@ -352,14 +359,14 @@ assert!(self.log.is_some()); let entries = batch.count() as u64; let log = self.log.as_mut().unwrap(); - let next = self.vset.last_seq + 1; + let next = self.vset.borrow().last_seq + 1; batch.insert_into_memtable(next, &mut self.mem); log.add_record(&batch.encode(next))?; if sync { log.flush()?; } - self.vset.last_seq += entries; + self.vset.borrow_mut().last_seq += entries; Ok(()) } @@ -374,7 +381,7 @@ // READ // fn get_internal(&mut self, seq: SequenceNumber, key: &[u8]) -> Result<Option<Vec<u8>>> { - let current = self.vset.current(); + let current = self.current(); let mut current = current.borrow_mut(); let lkey = LookupKey::new(key, seq); @@ -401,17 +408,12 @@ /// get_at reads the value for a given key at or before snapshot. It returns Ok(None) if the /// entry wasn't found, and Err(_) if an error occurred. pub fn get_at(&mut self, snapshot: &Snapshot, key: &[u8]) -> Result<Option<Vec<u8>>> { - if let Some(seq) = self.snaps.sequence_at(snapshot) { - self.get_internal(seq, key) - } else { - err(StatusCode::InvalidArgument, - "get_at: snapshot does not exist") - } + self.get_internal(snapshot.sequence(), key) } /// get is a simplified version of get_at(), translating errors to None. pub fn get(&mut self, key: &[u8]) -> Option<Vec<u8>> { - let seq = self.vset.last_seq; + let seq = self.vset.borrow().last_seq; if let Ok(v) = self.get_internal(seq, key) { v } else { @@ -429,7 +431,7 @@ /// Trigger a compaction based on where this key is located in the different levels. fn record_read_sample<'a>(&mut self, k: InternalKey<'a>) { - let current = self.vset.current(); + let current = self.current(); if current.borrow_mut().record_read_sample(k) { if let Err(e) = self.maybe_do_compaction() { log!(self.opt.log, "record_read_sample: compaction failed: {}", e); @@ -439,15 +441,6 @@ } impl DB { - // SNAPSHOTS // - - /// Returns a snapshot at the current state. The snapshot is released automatically on Drop. - pub fn get_snapshot(&mut self) -> Snapshot { - self.snaps.new_snapshot(self.vset.last_seq) - } -} - -impl DB { // COMPACTIONS // /// make_room_for_write checks if the memtable has become too large, and triggers a compaction /// if it's the case. @@ -456,10 +449,10 @@ Ok(()) } else { // Create new memtable. - let logn = self.vset.new_file_number(); + let logn = self.vset.borrow_mut().new_file_number(); let logf = self.opt.env.open_writable_file(Path::new(&log_file_name(&self.name, logn))); if logf.is_err() { - self.vset.reuse_file_number(logn); + self.vset.borrow_mut().reuse_file_number(logn); Err(logf.err().unwrap()) } else { self.log = Some(LogWriter::new(logf.unwrap())); @@ -475,7 +468,7 @@ /// maybe_do_compaction starts a blocking compaction if it makes sense. fn maybe_do_compaction(&mut self) -> Result<()> { - if self.imm.is_none() && !self.vset.needs_compaction() { + if self.imm.is_none() && !self.vset.borrow().needs_compaction() { return Ok(()); } self.start_compaction() @@ -489,7 +482,8 @@ return self.compact_memtable(); } - let compaction = self.vset.pick_compaction(); + + let compaction = self.vset.borrow_mut().pick_compaction(); if compaction.is_none() { return Ok(()); } @@ -505,7 +499,8 @@ compaction.edit().delete_file(level, num); compaction.edit().add_file(level + 1, f); - if let Err(e) = self.vset.log_and_apply(compaction.into_edit()) { + let r = self.vset.borrow_mut().log_and_apply(compaction.into_edit()); + if let Err(e) = r { log!(self.opt.log, "trivial move failed: {}", e); Err(e) } else { @@ -515,7 +510,9 @@ size, level, level + 1); - log!(self.opt.log, "Summary: {}", self.vset.current_summary()); + log!(self.opt.log, + "Summary: {}", + self.vset.borrow().current_summary()); Ok(()) } } else { @@ -527,7 +524,7 @@ self.install_compaction_results(state)?; log!(self.opt.log, "Compaction finished: {}", - self.vset.current_summary()); + self.vset.borrow().current_summary()); self.delete_obsolete_files() } @@ -535,8 +532,9 @@ fn compact_memtable(&mut self) -> Result<()> { assert!(self.imm.is_some()); + let mut ve = VersionEdit::new(); - let base = self.vset.current(); + let base = self.current(); let imm = self.imm.take().unwrap(); if let Err(e) = self.write_l0_table(&imm, &mut ve, Some(&base.borrow())) { @@ -544,7 +542,7 @@ return Err(e); } ve.set_log_num(self.log_num.unwrap_or(0)); - self.vset.log_and_apply(ve)?; + self.vset.borrow_mut().log_and_apply(ve)?; if let Err(e) = self.delete_obsolete_files() { log!(self.opt.log, "Error deleting obsolete files: {}", e); } @@ -558,14 +556,14 @@ base: Option<&Version>) -> Result<()> { let start_ts = self.opt.env.micros(); - let num = self.vset.new_file_number(); + let num = self.vset.borrow_mut().new_file_number(); log!(self.opt.log, "Start write of L0 table {:06}", num); let fmd = build_table(&self.name, &self.opt, memt.iter(), num)?; log!(self.opt.log, "L0 table {:06} has {} bytes", num, fmd.size); // Wrote empty table. if fmd.size == 0 { - self.vset.reuse_file_number(num); + self.vset.borrow_mut().reuse_file_number(num); return Ok(()); } @@ -603,16 +601,16 @@ cs.compaction.level(), cs.compaction.num_inputs(1), cs.compaction.level() + 1); - assert!(self.vset.num_level_files(cs.compaction.level()) > 0); + assert!(self.vset.borrow().num_level_files(cs.compaction.level()) > 0); assert!(cs.builder.is_none()); cs.smallest_seq = if self.snaps.empty() { - self.vset.last_seq + self.vset.borrow().last_seq } else { self.snaps.oldest() }; - let mut input = self.vset.make_input_iterator(&cs.compaction); + let mut input = self.vset.borrow().make_input_iterator(&cs.compaction); input.seek_to_first(); let (mut key, mut val) = (vec![], vec![]); @@ -654,7 +652,7 @@ } if cs.builder.is_none() { - let fnum = self.vset.new_file_number(); + let fnum = self.vset.borrow_mut().new_file_number(); let mut fmd = FileMetaData::default(); fmd.num = fnum; @@ -714,8 +712,10 @@ cs.current_output().size = bytes; if entries > 0 { - // Verify that table can be used. - if let Err(e) = self.cache.borrow_mut().get_table(output_num) { + // Verify that table can be used. (Separating get_table() because borrowing in an if + // let expression is dangerous). + let r = self.cache.borrow_mut().get_table(output_num); + if let Err(e) = r { log!(self.opt.log, "New table can't be read: {}", e); return Err(e); } @@ -741,7 +741,7 @@ for output in &cs.outputs { cs.compaction.edit().add_file(level + 1, output.clone()); } - self.vset.log_and_apply(cs.compaction.into_edit()) + self.vset.borrow_mut().log_and_apply(cs.compaction.into_edit()) } } @@ -979,7 +979,7 @@ assert!(env.exists(Path::new("db/000003.ldb")).unwrap()); assert!(env.exists(Path::new("db/000004.log")).unwrap()); // Check that entry exists and is correct. Phew, long call chain! - let current = db.vset.current(); + let current = db.current(); log!(opt.log, "files: {:?}", current.borrow().files); assert_eq!("def".as_bytes(), current.borrow_mut() @@ -1130,9 +1130,11 @@ fn test_db_impl_get_from_table() { let mut db = build_db(); - assert_eq!(26, db.vset.last_seq); + assert_eq!(26, db.vset.borrow().last_seq); + let old_ss = db.get_snapshot(); db.put("xyz".as_bytes(), "123".as_bytes()).unwrap(); + assert!(db.get_at(&old_ss, "xyz".as_bytes()).unwrap().is_none()); // memtable get assert_eq!("123".as_bytes(), @@ -1187,7 +1189,7 @@ assert!(db.opt.env.exists(Path::new("db/000004.ldb")).unwrap()); { - let v = db.vset.current(); + let v = db.current(); let mut v = v.borrow_mut(); v.file_to_compact = Some(v.files[2][0].clone()); v.file_to_compact_lvl = 2; @@ -1196,7 +1198,7 @@ db.maybe_do_compaction().unwrap(); { - let v = db.vset.current(); + let v = db.current(); let v = v.borrow_mut(); assert_eq!(1, v.files[3].len()); } @@ -1231,8 +1233,8 @@ v.compaction_level = Some(1); let mut db = DB::new("db", opt.clone()); - db.vset.add_version(v); - db.vset.next_file_num = 10; + db.vset.borrow_mut().add_version(v); + db.vset.borrow_mut().next_file_num = 10; db.start_compaction().unwrap(); @@ -1240,7 +1242,7 @@ assert!(opt.env.exists(Path::new("db/000010.ldb")).unwrap()); assert_eq!(375, opt.env.size_of(Path::new("db/000010.ldb")).unwrap()); - let v = db.vset.current(); + let v = db.current(); assert_eq!(0, v.borrow().files[1].len()); assert_eq!(2, v.borrow().files[2].len()); } @@ -1254,8 +1256,8 @@ v.file_to_compact_lvl = 2; let mut db = DB::new("db", opt.clone()); - db.vset.add_version(v); - db.vset.next_file_num = 10; + db.vset.borrow_mut().add_version(v); + db.vset.borrow_mut().next_file_num = 10; db.start_compaction().unwrap(); @@ -1263,7 +1265,7 @@ assert!(!opt.env.exists(Path::new("db/000010.ldb")).unwrap()); assert_eq!(218, opt.env.size_of(Path::new("db/000006.ldb")).unwrap()); - let v = db.vset.current(); + let v = db.current(); assert_eq!(1, v.borrow().files[2].len()); assert_eq!(3, v.borrow().files[3].len()); }