Mercurial > lbo > hg > leveldb-rs
changeset 225:4b044ef081e0
version_set: Implement Compaction and parts of VersionSet.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 09 Sep 2017 19:27:25 +0200 |
parents | c6790beee801 |
children | 199a993cfe94 |
files | src/version_set.rs |
diffstat | 1 files changed, 207 insertions(+), 1 deletions(-) [+] |
line wrap: on
line diff
--- a/src/version_set.rs Sat Sep 09 19:26:59 2017 +0200 +++ b/src/version_set.rs Sat Sep 09 19:27:25 2017 +0200 @@ -1,2 +1,208 @@ -pub const NUM_LEVELS: usize = 7; +use cmp::{Cmp, InternalKeyCmp}; +use key_types::{parse_internal_key, InternalKey, UserKey}; +use log::LogWriter; +use options::Options; +use table_cache::TableCache; +use types::{share, NUM_LEVELS, FileNum, Shared}; +use version::{FileMetaHandle, Version}; +use version_edit::VersionEdit; + +use std::cmp::Ordering; +use std::collections::HashSet; +use std::io::Write; +use std::rc::Rc; + +struct Compaction { + level: usize, + max_file_size: usize, + input_version: Option<Shared<Version>>, + level_ixs: [usize; NUM_LEVELS], + cmp: Rc<Box<Cmp>>, + + // "parent" inputs from level and level+1. + inputs: [Vec<FileMetaHandle>; 2], + grandparent_ix: usize, + // remaining inputs from level+2..NUM_LEVELS + grandparents: Option<Vec<FileMetaHandle>>, + overlapped_bytes: usize, + seen_key: bool, + pub edit: VersionEdit, +} + +impl Compaction { + // Note: opt.cmp should be the user-supplied or default comparator (not an InternalKeyCmp). + fn new(opt: &Options, level: usize) -> Compaction { + Compaction { + level: level, + max_file_size: opt.max_file_size, + input_version: None, + level_ixs: Default::default(), + cmp: opt.cmp.clone(), + + inputs: Default::default(), + grandparent_ix: 0, + grandparents: Default::default(), + overlapped_bytes: 0, + seen_key: false, + edit: VersionEdit::new(), + } + } + + /// add_input_deletions marks the current input files as deleted in the inner VersionEdit. + fn add_input_deletions(&mut self) { + for parent in 0..2 { + for f in &self.inputs[parent] { + self.edit.delete_file(self.level + parent, f.borrow().num); + } + } + } + + fn input(&self, parent: usize, i: usize) -> FileMetaHandle { + assert!(parent < 2); + assert!(i < self.inputs[parent].len()); + + self.inputs[parent][i].clone() + } + + /// is_base_level_for checks whether the given key may exist in levels higher than this + /// compaction's level plus 2. I.e., whether the levels for this compaction are the last ones + /// to contain the key. + fn is_base_level_for<'a>(&mut self, k: UserKey<'a>) -> bool { + if let Some(ref inp_version) = self.input_version { + for level in self.level + 2..NUM_LEVELS { + let files = &inp_version.borrow().files[level]; + while self.level_ixs[level] < files.len() { + let f = files[self.level_ixs[level]].borrow(); + if self.cmp.cmp(k, parse_internal_key(&f.largest).2) <= Ordering::Equal { + if self.cmp.cmp(k, parse_internal_key(&f.smallest).2) >= Ordering::Equal { + // key is in this file's range, so this is not the base level. + return false; + } + break; + } + self.level_ixs[level] += 1; + } + } + true + } else { + unimplemented!() + } + } + + fn num_inputs(&self, parent: usize) -> usize { + assert!(parent < 2); + self.inputs[parent].len() + } + + fn is_trivial_move(&self) -> bool { + let inputs_size = self.grandparents + .as_ref() + .unwrap_or(&vec![]) + .iter() + .fold(0, |a, f| a + f.borrow().size); + self.num_inputs(0) == 1 && self.num_inputs(1) == 0 && inputs_size < 10 * self.max_file_size + } + + fn should_stop_before<'a>(&mut self, k: InternalKey<'a>) -> bool { + assert!(self.grandparents.is_some()); + let grandparents = self.grandparents.as_ref().unwrap(); + let icmp = InternalKeyCmp(self.cmp.clone()); + while self.grandparent_ix < grandparents.len() && + icmp.cmp(k, &grandparents[self.grandparent_ix].borrow().largest) == Ordering::Greater { + if self.seen_key { + self.overlapped_bytes += grandparents[self.grandparent_ix].borrow().size; + } + self.grandparent_ix += 1; + } + self.seen_key = true; + + if self.overlapped_bytes > 10 * self.max_file_size { + self.overlapped_bytes = 0; + true + } else { + false + } + } +} + +/// VersionSet managed the various versions that are live within a database. A single version +/// contains references to the files on disk as they were at a certain point. +pub struct VersionSet { + dbname: String, + opt: Options, + cmp: InternalKeyCmp, + cache: Shared<TableCache>, + + next_file_num: u64, + manifest_num: u64, + last_seq: u64, + log_num: u64, + prev_log_num: u64, + + log: Option<LogWriter<Box<Write>>>, + versions: Vec<Shared<Version>>, + current: Option<Shared<Version>>, +} + +impl VersionSet { + // Note: opt.cmp should not contain an InternalKeyCmp at this point, but instead the default or + // user-supplied one. + pub fn new(db: String, opt: Options, cache: Shared<TableCache>) -> VersionSet { + VersionSet { + dbname: db, + cmp: InternalKeyCmp(opt.cmp.clone()), + opt: opt, + cache: cache, + + next_file_num: 2, + manifest_num: 2, + last_seq: 0, + log_num: 0, + prev_log_num: 0, + + log: None, + versions: vec![], + current: None, + } + } + + fn live_files(&self) -> Vec<FileNum> { + let mut files = HashSet::new(); + for version in &self.versions { + for level in 0..NUM_LEVELS { + for file in &version.borrow().files[level] { + files.insert(file.borrow().num); + } + } + } + files.into_iter().collect() + } + + fn add_version(&mut self, v: Version) { + let sv = share(v); + self.current = Some(sv.clone()); + self.versions.push(sv); + } + + fn approximate_offset<'a>(&self, v: Shared<Version>, key: InternalKey<'a>) -> usize { + let mut offset = 0; + for level in 0..NUM_LEVELS { + for f in &v.borrow().files[level] { + if self.opt.cmp.cmp(&f.borrow().largest, key) <= Ordering::Equal { + offset += f.borrow().size; + } else if self.opt.cmp.cmp(&f.borrow().smallest, key) == Ordering::Greater { + // In higher levels, files are sorted; we don't need to search further. + if level > 0 { + break; + } + } else { + if let Ok(tbl) = self.cache.borrow_mut().get_table(f.borrow().num) { + offset += tbl.approx_offset_of(key); + } + } + } + } + offset + } +}