Mercurial > lbo > hg > leveldb-rs
changeset 190:50642ad716ea
everything: Update all iterators to implement the new LdbIterator.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 27 Aug 2017 19:23:20 +0200 |
parents | b731f7b7c1f3 |
children | 0786015427be |
files | src/block.rs src/memtable.rs src/merging_iter.rs src/skipmap.rs src/table_cache.rs src/table_reader.rs src/test_util.rs src/types.rs |
diffstat | 8 files changed, 373 insertions(+), 279 deletions(-) [+] |
line wrap: on
line diff
--- a/src/block.rs Thu Aug 24 20:09:47 2017 +0200 +++ b/src/block.rs Sun Aug 27 19:23:20 2017 +0200 @@ -143,7 +143,7 @@ i += valsizelen; self.val_offset = self.offset + i + non_shared; - self.offset = self.offset + i + non_shared + valsize; + self.offset = self.val_offset + valsize; (shared, non_shared, valsize, i) } @@ -168,26 +168,24 @@ self.reset(); } - while let Some((_, _)) = self.next() { + while self.advance() { } } } -impl Iterator for BlockIter { - type Item = (Vec<u8>, Vec<u8>); - - fn next(&mut self) -> Option<Self::Item> { +impl LdbIterator for BlockIter { + fn advance(&mut self) -> bool { if self.offset >= self.restarts_off { self.offset = self.restarts_off; // current_entry_offset is left at the offset of the last entry - return None; + return false; } else { self.current_entry_offset = self.offset; } let current_off = self.current_entry_offset; - let (shared, non_shared, valsize, entry_head_len) = self.parse_entry_and_advance(); + let (shared, non_shared, _valsize, entry_head_len) = self.parse_entry_and_advance(); self.assemble_key(current_off + entry_head_len, shared, non_shared); // Adjust current_restart_ix @@ -196,12 +194,8 @@ self.get_restart_point(self.current_restart_ix + 1) < self.current_entry_offset { self.current_restart_ix += 1; } - - Some((self.key.clone(), Vec::from(&self.block[self.val_offset..self.val_offset + valsize]))) + true } -} - -impl LdbIterator for BlockIter { fn reset(&mut self) { self.offset = 0; self.val_offset = 0; @@ -209,14 +203,14 @@ self.key.clear(); } - fn prev(&mut self) -> Option<Self::Item> { + fn prev(&mut self) -> bool { // as in the original implementation -- seek to last restart point, then look for key let orig_offset = self.current_entry_offset; // At the beginning, can't go further back if orig_offset == 0 { self.reset(); - return None; + return false; } while self.get_restart_point(self.current_restart_ix) >= orig_offset { @@ -237,8 +231,7 @@ // Stop if the next entry would be the original one (self.offset always points to the start // of the next entry) loop { - result = self.next(); - + result = self.advance(); if self.offset >= orig_offset { break; } @@ -288,9 +281,13 @@ fn current(&self) -> Option<Self::Item> { if self.valid() { - Some((self.key.clone(), Vec::from(&self.block[self.val_offset..self.offset]))) + key.clear(); + val.clear(); + key.extend_from_slice(&self.key); + val.extend_from_slice(&self.block[self.val_offset..self.offset]); + true } else { - None + false } } } @@ -404,7 +401,8 @@ mod tests { use super::*; use options::*; - use types::LdbIterator; + use test_util::LdbIteratorIter; + use types::{current_key_val, LdbIterator}; fn get_data() -> Vec<(&'static [u8], &'static [u8])> { vec![("key1".as_bytes(), "value1".as_bytes()), @@ -444,7 +442,7 @@ let block = Block::new(Options::default(), blockc); - for _ in block.iter() { + for _ in LdbIteratorIter::wrap(&mut block.iter()) { panic!("expected 0 iterations"); } } @@ -459,12 +457,12 @@ } let block_contents = builder.finish(); - let block = Block::new(Options::default(), block_contents).iter(); + let mut block = Block::new(Options::default(), block_contents).iter(); let mut i = 0; assert!(!block.valid()); - for (k, v) in block { + for (k, v) in LdbIteratorIter::wrap(&mut block) { assert_eq!(&k[..], data[i].0); assert_eq!(v, data[i].1); i += 1; @@ -494,7 +492,7 @@ assert!(block.valid()); block.prev(); assert!(block.valid()); - assert_eq!(block.current(), + assert_eq!(current_key_val(&block), Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec()))); block.prev(); assert!(!block.valid()); @@ -506,7 +504,7 @@ block.prev(); assert!(block.valid()); - assert_eq!(block.current(), + assert_eq!(current_key_val(&block), Some(("prefix_key2".as_bytes().to_vec(), "value".as_bytes().to_vec()))); } @@ -528,27 +526,27 @@ block.seek(&"prefix_key2".as_bytes()); assert!(block.valid()); - assert_eq!(block.current(), + assert_eq!(current_key_val(&block), Some(("prefix_key2".as_bytes().to_vec(), "value".as_bytes().to_vec()))); block.seek(&"prefix_key0".as_bytes()); assert!(block.valid()); - assert_eq!(block.current(), + assert_eq!(current_key_val(&block), Some(("prefix_key1".as_bytes().to_vec(), "value".as_bytes().to_vec()))); block.seek(&"key1".as_bytes()); assert!(block.valid()); - assert_eq!(block.current(), + assert_eq!(current_key_val(&block), Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec()))); block.seek(&"prefix_key3".as_bytes()); assert!(block.valid()); - assert_eq!(block.current(), + assert_eq!(current_key_val(&block), Some(("prefix_key3".as_bytes().to_vec(), "value".as_bytes().to_vec()))); block.seek(&"prefix_key8".as_bytes()); assert!(block.valid()); - assert_eq!(block.current(), + assert_eq!(current_key_val(&block), Some(("prefix_key3".as_bytes().to_vec(), "value".as_bytes().to_vec()))); } @@ -573,7 +571,7 @@ block.seek_to_last(); assert!(block.valid()); - assert_eq!(block.current(), + assert_eq!(current_key_val(&block), Some(("prefix_key3".as_bytes().to_vec(), "value".as_bytes().to_vec()))); } }
--- a/src/memtable.rs Thu Aug 24 20:09:47 2017 +0200 +++ b/src/memtable.rs Sun Aug 27 19:23:20 2017 +0200 @@ -1,13 +1,15 @@ -use key_types::{LookupKey, UserKey, InternalKey, MemtableKey}; +use key_types::{LookupKey, UserKey}; use cmp::MemtableKeyCmp; use error::{Status, StatusCode, Result}; use key_types::{parse_memtable_key, build_memtable_key}; -use types::{ValueType, SequenceNumber, LdbIterator}; +use types::{current_key_val, LdbIterator, SequenceNumber, ValueType}; use skipmap::{SkipMap, SkipMapIter}; use options::Options; use std::sync::Arc; +use integer_encoding::FixedInt; + /// Provides Insert/Get/Iterate, based on the SkipMap implementation. /// MemTable uses MemtableKeys internally, that is, it stores key and value in the [Skipmap] key. pub struct MemTable { @@ -45,10 +47,9 @@ let mut iter = self.map.iter(); iter.seek(key.memtable_key()); - if let Some(e) = iter.current() { - let foundkey: MemtableKey = e.0; + if let Some((foundkey, _)) = current_key_val(&iter) { // let (lkeylen, lkeyoff, _, _, _) = parse_memtable_key(key.memtable_key()); - let (fkeylen, fkeyoff, tag, vallen, valoff) = parse_memtable_key(foundkey); + let (fkeylen, fkeyoff, tag, vallen, valoff) = parse_memtable_key(&foundkey); // Compare user key -- if equal, proceed // We only care about user key equality here @@ -73,64 +74,72 @@ pub struct MemtableIterator<'a> { _tbl: &'a MemTable, - skipmapiter: SkipMapIter<'a>, + skipmapiter: SkipMapIter, } -impl<'a> Iterator for MemtableIterator<'a> { - type Item = (InternalKey<'a>, &'a [u8]); - - fn next(&mut self) -> Option<Self::Item> { +impl<'a> LdbIterator for MemtableIterator<'a> { + fn advance(&mut self) -> bool { + // Make sure this is actually needed. + let (mut key, mut val) = (vec![], vec![]); loop { - if let Some((foundkey, _)) = self.skipmapiter.next() { - let (keylen, keyoff, tag, vallen, valoff) = parse_memtable_key(foundkey); + if !self.skipmapiter.advance() { + return false; + } + if self.skipmapiter.current(&mut key, &mut val) { + let (_, _, tag, _, _) = parse_memtable_key(&key); if tag & 0xff == ValueType::TypeValue as u64 { - return Some((&foundkey[keyoff..keyoff + keylen], - &foundkey[valoff..valoff + vallen])); + return true; } else { continue; } } else { - return None; + return false; } } } -} - -impl<'a> LdbIterator for MemtableIterator<'a> { fn reset(&mut self) { self.skipmapiter.reset(); } - fn prev(&mut self) -> Option<Self::Item> { + fn prev(&mut self) -> bool { + // Make sure this is actually needed (skipping deleted values?). + let (mut key, mut val) = (vec![], vec![]); loop { - if let Some((foundkey, _)) = self.skipmapiter.prev() { - let (keylen, keyoff, tag, vallen, valoff) = parse_memtable_key(foundkey); + if !self.skipmapiter.prev() { + return false; + } + if self.skipmapiter.current(&mut key, &mut val) { + let (_, _, tag, _, _) = parse_memtable_key(&key); if tag & 0xff == ValueType::TypeValue as u64 { - return Some((&foundkey[keyoff..keyoff + keylen], - &foundkey[valoff..valoff + vallen])); + return true; } else { continue; } } else { - return None; + return false; } } } fn valid(&self) -> bool { self.skipmapiter.valid() } - fn current(&self) -> Option<Self::Item> { + fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool { if !self.valid() { - return None; + return false; } - if let Some((foundkey, _)) = self.skipmapiter.current() { - let (keylen, keyoff, tag, vallen, valoff) = parse_memtable_key(foundkey); + if self.skipmapiter.current(key, val) { + let (keylen, keyoff, tag, vallen, valoff) = parse_memtable_key(&key); if tag & 0xff == ValueType::TypeValue as u64 { - return Some((&foundkey[keyoff..keyoff + keylen + 8], - &foundkey[valoff..valoff + vallen])); + val.clear(); + val.extend_from_slice(&key[valoff..valoff + vallen]); + // zero-allocation truncation. + shift_left(key, keyoff); + // Truncate key to key+tag. + key.truncate(keylen + u64::required_space()); + return true; } else { panic!("should not happen"); } @@ -143,14 +152,35 @@ } } +/// shift_left moves s[mid..] to s[0..s.len()-mid]. The new size is s.len()-mid. +fn shift_left(s: &mut Vec<u8>, mid: usize) { + for i in mid..s.len() { + s.swap(i, i - mid); + } + let newlen = s.len() - mid; + s.truncate(newlen); +} + #[cfg(test)] #[allow(unused_variables)] mod tests { use super::*; use key_types::*; + use test_util::LdbIteratorIter; use types::*; use options::Options; + #[test] + fn test_shift_left() { + let mut v = vec![1, 2, 3, 4, 5]; + shift_left(&mut v, 1); + assert_eq!(v, vec![2, 3, 4, 5]); + + let mut v = vec![1, 2, 3, 4, 5]; + shift_left(&mut v, 4); + assert_eq!(v, vec![5]); + } + fn get_memtable() -> MemTable { let mut mt = MemTable::new(Options::default()); let entries = vec![(115, "abc", "122"), @@ -233,7 +263,7 @@ assert!(!iter.valid()); iter.next(); assert!(iter.valid()); - assert_eq!(iter.current().unwrap().0, + assert_eq!(current_key_val(&iter).unwrap().0, vec![97, 98, 99, 1, 120, 0, 0, 0, 0, 0, 0].as_slice()); iter.reset(); assert!(!iter.valid()); @@ -242,7 +272,7 @@ #[test] fn test_memtable_iterator_fwd_seek() { let mt = get_memtable(); - let iter = mt.iter(); + let mut iter = mt.iter(); let expected = vec!["123".as_bytes(), /* i.e., the abc entry with * higher sequence number comes first */ @@ -252,7 +282,7 @@ "126".as_bytes()]; let mut i = 0; - for (k, v) in iter { + for (k, v) in LdbIteratorIter::wrap(&mut iter) { assert_eq!(v, expected[i]); i += 1; } @@ -266,27 +296,27 @@ // Bigger sequence number comes first iter.next(); assert!(iter.valid()); - assert_eq!(iter.current().unwrap().0, + assert_eq!(current_key_val(&iter).unwrap().0, vec![97, 98, 99, 1, 120, 0, 0, 0, 0, 0, 0].as_slice()); iter.next(); assert!(iter.valid()); - assert_eq!(iter.current().unwrap().0, + assert_eq!(current_key_val(&iter).unwrap().0, vec![97, 98, 99, 1, 115, 0, 0, 0, 0, 0, 0].as_slice()); iter.next(); assert!(iter.valid()); - assert_eq!(iter.current().unwrap().0, + assert_eq!(current_key_val(&iter).unwrap().0, vec![97, 98, 100, 1, 121, 0, 0, 0, 0, 0, 0].as_slice()); iter.prev(); assert!(iter.valid()); - assert_eq!(iter.current().unwrap().0, + assert_eq!(current_key_val(&iter).unwrap().0, vec![97, 98, 99, 1, 115, 0, 0, 0, 0, 0, 0].as_slice()); iter.prev(); assert!(iter.valid()); - assert_eq!(iter.current().unwrap().0, + assert_eq!(current_key_val(&iter).unwrap().0, vec![97, 98, 99, 1, 120, 0, 0, 0, 0, 0, 0].as_slice()); iter.prev();
--- a/src/merging_iter.rs Thu Aug 24 20:09:47 2017 +0200 +++ b/src/merging_iter.rs Sun Aug 27 19:23:20 2017 +0200 @@ -1,6 +1,6 @@ use cmp::Cmp; use options::Options; -use types::LdbIterator; +use types::{current_key_val, LdbIterator}; use std::cmp::Ordering; use std::sync::Arc; @@ -23,18 +23,16 @@ Rvrs, } -pub struct MergingIter<'a, 'b: 'a> { - iters: Vec<&'a mut LdbIterator<Item = (&'b [u8], &'b [u8])>>, +pub struct MergingIter { + iters: Vec<Box<LdbIterator>>, current: Option<usize>, direction: Direction, cmp: Arc<Box<Cmp>>, } -impl<'a, 'b: 'a> MergingIter<'a, 'b> { +impl MergingIter { /// Construct a new merging iterator. - pub fn new(opt: Options, - iters: Vec<&'a mut LdbIterator<Item = (&'b [u8], &'b [u8])>>) - -> MergingIter<'a, 'b> { + pub fn new(opt: Options, iters: Vec<Box<LdbIterator>>) -> MergingIter { let mi = MergingIter { iters: iters, current: None, @@ -47,7 +45,7 @@ fn init(&mut self) { for i in 0..self.iters.len() { self.iters[i].reset(); - self.iters[i].next(); + self.iters[i].advance(); assert!(self.iters[i].valid()); } self.find_smallest(); @@ -57,17 +55,18 @@ /// call was next() or prev(). This basically sets all iterators to one /// entry after (Fwd) or one entry before (Rvrs) the current() entry. fn update_direction(&mut self, d: Direction) { - if let Some((key, _)) = self.current() { + if let Some((key, _)) = current_key_val(self) { if let Some(current) = self.current { match d { Direction::Fwd if self.direction == Direction::Rvrs => { self.direction = Direction::Fwd; for i in 0..self.iters.len() { if i != current { - self.iters[i].seek(key); - if let Some((current_key, _)) = self.iters[i].current() { - if self.cmp.cmp(current_key, key) == Ordering::Equal { - self.iters[i].next(); + self.iters[i].seek(&key); + if let Some((current_key, _)) = current_key_val(self.iters[i] + .as_ref()) { + if self.cmp.cmp(¤t_key, &key) == Ordering::Equal { + self.iters[i].advance(); } } } @@ -77,7 +76,7 @@ self.direction = Direction::Rvrs; for i in 0..self.iters.len() { if i != current { - self.iters[i].seek(key); + self.iters[i].seek(&key); self.iters[i].prev(); } } @@ -109,9 +108,9 @@ let mut next_ix = 0; for i in 1..self.iters.len() { - if let Some(current) = self.iters[i].current() { - if let Some(smallest) = self.iters[next_ix].current() { - if self.cmp.cmp(current.0, smallest.0) == ord { + if let Some((current, _)) = current_key_val(self.iters[i].as_ref()) { + if let Some((smallest, _)) = current_key_val(self.iters[next_ix].as_ref()) { + if self.cmp.cmp(¤t, &smallest) == ord { next_ix = i; } } else { @@ -127,13 +126,11 @@ } } -impl<'a, 'b: 'a> Iterator for MergingIter<'a, 'b> { - type Item = (&'b [u8], &'b [u8]); - - fn next(&mut self) -> Option<Self::Item> { +impl LdbIterator for MergingIter { + fn advance(&mut self) -> bool { if let Some(current) = self.current { self.update_direction(Direction::Fwd); - if let None = self.iters[current].next() { + if !self.iters[current].advance() { // Take this iterator out of rotation; this will return None // for every call to current() and thus it will be ignored // from here on. @@ -143,14 +140,16 @@ } else { self.init(); } - - self.iters[self.current.unwrap()].current() + self.valid() } -} - -impl<'a, 'b: 'a> LdbIterator for MergingIter<'a, 'b> { fn valid(&self) -> bool { - return self.current.is_some() && self.iters.iter().any(|it| it.valid()); + if let Some(ix) = self.current { + // TODO: second clause is unnecessary, because first asserts that at least one iterator + // is valid. + self.iters[ix].valid() && self.iters.iter().any(|it| it.valid()) + } else { + false + } } fn seek(&mut self, key: &[u8]) { for i in 0..self.iters.len() { @@ -163,25 +162,25 @@ self.iters[i].reset(); } } - fn current(&self) -> Option<Self::Item> { + fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool { if let Some(ix) = self.current { - self.iters[ix].current() + self.iters[ix].current(key, val) } else { - None + false } } - fn prev(&mut self) -> Option<Self::Item> { + fn prev(&mut self) -> bool { if let Some(current) = self.current { - if let Some((_, _)) = self.current() { + if self.iters[current].valid() { self.update_direction(Direction::Rvrs); self.iters[current].prev(); self.find_largest(); - self.current() + true } else { - None + false } } else { - None + false } } } @@ -191,17 +190,18 @@ use super::*; use options::Options; - use test_util::TestLdbIter; - use types::LdbIterator; + use test_util::{LdbIteratorIter, TestLdbIter}; + use types::{current_key_val, LdbIterator}; use skipmap::tests; #[test] fn test_merging_one() { let skm = tests::make_skipmap(); - let mut iter = skm.iter(); + let iter = skm.iter(); let mut iter2 = skm.iter(); - let mut miter = MergingIter::new(Options::default(), vec![&mut iter]); + // TODO - use a non-lifetimed iterator. Or rewrite f'ing MergingIter. + let mut miter = MergingIter::new(Options::default(), vec![Box::new(iter)]); loop { if let Some((k, v)) = miter.next() { @@ -220,10 +220,10 @@ #[test] fn test_merging_two() { let skm = tests::make_skipmap(); - let mut iter = skm.iter(); - let mut iter2 = skm.iter(); + let iter = skm.iter(); + let iter2 = skm.iter(); - let mut miter = MergingIter::new(Options::default(), vec![&mut iter, &mut iter2]); + let mut miter = MergingIter::new(Options::default(), vec![Box::new(iter), Box::new(iter2)]); loop { if let Some((k, v)) = miter.next() { @@ -242,17 +242,18 @@ #[test] fn test_merging_fwd_bckwd() { let skm = tests::make_skipmap(); - let mut iter = skm.iter(); - let mut iter2 = skm.iter(); + let iter = skm.iter(); + let iter2 = skm.iter(); - let mut miter = MergingIter::new(Options::default(), vec![&mut iter, &mut iter2]); + let mut miter = MergingIter::new(Options::default(), vec![Box::new(iter), Box::new(iter2)]); let first = miter.next(); miter.next(); let third = miter.next(); assert!(first != third); - let second = miter.prev(); + assert!(miter.prev()); + let second = current_key_val(&miter); assert_eq!(first, second); } @@ -264,14 +265,14 @@ fn test_merging_real() { let val = "def".as_bytes(); - let mut it1 = TestLdbIter::new(vec![(b("aba"), val), (b("abc"), val), (b("abe"), val)]); - let mut it2 = TestLdbIter::new(vec![(b("abb"), val), (b("abd"), val)]); + let it1 = TestLdbIter::new(vec![(&b("aba"), val), (&b("abc"), val), (&b("abe"), val)]); + let it2 = TestLdbIter::new(vec![(&b("abb"), val), (&b("abd"), val)]); let expected = vec![b("aba"), b("abb"), b("abc"), b("abd"), b("abe")]; - let iter = MergingIter::new(Options::default(), vec![&mut it1, &mut it2]); + let mut iter = MergingIter::new(Options::default(), vec![Box::new(it1), Box::new(it2)]); let mut i = 0; - for (k, _) in iter { + for (k, _) in LdbIteratorIter::wrap(&mut iter) { assert_eq!(k, expected[i]); i += 1; } @@ -282,27 +283,30 @@ fn test_merging_seek_reset() { let val = "def".as_bytes(); - let mut it1 = TestLdbIter::new(vec![(b("aba"), val), (b("abc"), val), (b("abe"), val)]); - let mut it2 = TestLdbIter::new(vec![(b("abb"), val), (b("abd"), val)]); + let it1 = TestLdbIter::new(vec![(b("aba"), val), (b("abc"), val), (b("abe"), val)]); + let it2 = TestLdbIter::new(vec![(b("abb"), val), (b("abd"), val)]); - let mut iter = MergingIter::new(Options::default(), vec![&mut it1, &mut it2]); + let mut iter = MergingIter::new(Options::default(), vec![Box::new(it1), Box::new(it2)]); assert!(!iter.valid()); - iter.next(); + iter.advance(); assert!(iter.valid()); - assert!(iter.current().is_some()); + assert!(current_key_val(&iter).is_some()); iter.seek("abc".as_bytes()); - assert_eq!(iter.current(), Some((b("abc"), val))); + assert_eq!(current_key_val(&iter), + Some((b("abc").to_vec(), val.to_vec()))); iter.seek("ab0".as_bytes()); - assert_eq!(iter.current(), Some((b("aba"), val))); + assert_eq!(current_key_val(&iter), + Some((b("aba").to_vec(), val.to_vec()))); iter.seek("abx".as_bytes()); - assert_eq!(iter.current(), None); + assert_eq!(current_key_val(&iter), None); iter.reset(); assert!(!iter.valid()); iter.next(); - assert_eq!(iter.current(), Some((b("aba"), val))); + assert_eq!(current_key_val(&iter), + Some((b("aba").to_vec(), val.to_vec()))); } // oomph... TODO: fix behavior here @@ -310,10 +314,10 @@ fn test_merging_fwd_bckwd_2() { let val = "def".as_bytes(); - let mut it1 = TestLdbIter::new(vec![(b("aba"), val), (b("abc"), val), (b("abe"), val)]); - let mut it2 = TestLdbIter::new(vec![(b("abb"), val), (b("abd"), val)]); + let it1 = TestLdbIter::new(vec![(b("aba"), val), (b("abc"), val), (b("abe"), val)]); + let it2 = TestLdbIter::new(vec![(b("abb"), val), (b("abd"), val)]); - let mut iter = MergingIter::new(Options::default(), vec![&mut it1, &mut it2]); + let mut iter = MergingIter::new(Options::default(), vec![Box::new(it1), Box::new(it2)]); iter.next(); iter.next();
--- a/src/skipmap.rs Thu Aug 24 20:09:47 2017 +0200 +++ b/src/skipmap.rs Sun Aug 27 19:23:20 2017 +0200 @@ -6,6 +6,7 @@ use std::cmp::Ordering; use std::mem::{replace, size_of}; +use std::rc::Rc; use std::sync::Arc; const MAX_HEIGHT: usize = 12; @@ -24,7 +25,7 @@ /// `contains()`; in order to get full key and value for an entry, use a `SkipMapIter` instance, /// `seek()` to the key to look up (this is as fast as any lookup in a skip map), and then call /// `current()`. -pub struct SkipMap { +struct InnerSkipMap { head: Box<Node>, rand: StdRng, len: usize, @@ -33,12 +34,15 @@ opt: Options, } +pub struct SkipMap { + map: Rc<InnerSkipMap>, +} + impl SkipMap { /// Returns a SkipMap that wraps the comparator from opt inside a MemtableKeyCmp pub fn new_memtable_map(mut opt: Options) -> SkipMap { opt.cmp = Arc::new(Box::new(MemtableKeyCmp(opt.cmp.clone()))); - let skm = SkipMap::new(opt); - skm + SkipMap::new(opt) } /// Returns a SkipMap that uses the comparator from opt @@ -47,26 +51,43 @@ s.resize(MAX_HEIGHT, None); SkipMap { - head: Box::new(Node { - skips: s, - next: None, - key: Vec::new(), - value: Vec::new(), + map: Rc::new(InnerSkipMap { + head: Box::new(Node { + skips: s, + next: None, + key: Vec::new(), + value: Vec::new(), + }), + rand: StdRng::from_seed(&[0xde, 0xad, 0xbe, 0xef]), + len: 0, + approx_mem: size_of::<Self>() + MAX_HEIGHT * size_of::<Option<*mut Node>>(), + opt: opt, }), - rand: StdRng::from_seed(&[0xde, 0xad, 0xbe, 0xef]), - len: 0, - approx_mem: size_of::<Self>() + MAX_HEIGHT * size_of::<Option<*mut Node>>(), - opt: opt, } } pub fn len(&self) -> usize { - self.len + self.map.len } pub fn approx_memory(&self) -> usize { - self.approx_mem + self.map.approx_mem + } + pub fn contains(&self, key: &[u8]) -> bool { + self.map.contains(key) + } + pub fn insert(&mut self, key: Vec<u8>, val: Vec<u8>) { + Rc::get_mut(&mut self.map).unwrap().insert(key, val); } + pub fn iter(&self) -> SkipMapIter { + SkipMapIter { + map: self.map.clone(), + current: self.map.head.as_ref() as *const Node, + } + } +} + +impl InnerSkipMap { fn random_height(&mut self) -> usize { let mut height = 1; @@ -77,7 +98,7 @@ height } - pub fn contains(&self, key: &[u8]) -> bool { + fn contains(&self, key: &[u8]) -> bool { if let Some(n) = self.get_greater_or_equal(key) { n.key.starts_with(&key) } else { @@ -167,7 +188,7 @@ } } - pub fn insert(&mut self, key: Vec<u8>, val: Vec<u8>) { + fn insert(&mut self, key: Vec<u8>, val: Vec<u8>) { assert!(!key.is_empty()); // Keeping track of skip entries that will need to be updated @@ -238,14 +259,6 @@ // ...and then setting the previous element's next field to the new node unsafe { replace(&mut (*current).next, Some(new)) }; } - - pub fn iter<'a>(&'a self) -> SkipMapIter<'a> { - SkipMapIter { - map: self, - current: self.head.as_ref() as *const Node, - } - } - /// Runs through the skipmap and prints everything including addresses fn dbg_print(&self) { let mut current = self.head.as_ref() as *const Node; @@ -266,66 +279,71 @@ } } -pub struct SkipMapIter<'a> { - map: &'a SkipMap, +pub struct SkipMapIter { + map: Rc<InnerSkipMap>, current: *const Node, } -impl<'a> Iterator for SkipMapIter<'a> { - type Item = (&'a [u8], &'a [u8]); - - fn next(&mut self) -> Option<Self::Item> { +impl LdbIterator for SkipMapIter { + fn advance(&mut self) -> bool { // we first go to the next element, then return that -- in order to skip the head node unsafe { - (*self.current).next.as_ref().map(|next| { - self.current = next.as_ref() as *const Node; - ((*self.current).key.as_slice(), (*self.current).value.as_slice()) - }) + (*self.current) + .next + .as_ref() + .map(|next| { + self.current = next.as_ref() as *const Node; + true + }) + .unwrap_or(false) } } -} - -impl<'a> LdbIterator for SkipMapIter<'a> { fn reset(&mut self) { - let new = self.map.iter(); - self.current = new.current; + self.current = self.map.head.as_ref(); } fn seek(&mut self, key: &[u8]) { if let Some(node) = self.map.get_greater_or_equal(key) { - self.current = node as *const Node - } else { - self.reset(); + self.current = node as *const Node; + return; } + self.reset(); } fn valid(&self) -> bool { unsafe { !(*self.current).key.is_empty() } } - fn current(&self) -> Option<Self::Item> { + fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool { if self.valid() { - Some(unsafe { (&(*self.current).key, &(*self.current).value) }) + key.clear(); + val.clear(); + unsafe { + key.extend_from_slice(&(*self.current).key); + val.extend_from_slice(&(*self.current).value); + } + true } else { - None + false } } - fn prev(&mut self) -> Option<Self::Item> { + fn prev(&mut self) -> bool { // Going after the original implementation here; we just seek to the node before current(). - if let Some(current) = self.current() { - if let Some(prev) = self.map.get_next_smaller(current.0) { + if self.valid() { + if let Some(prev) = self.map.get_next_smaller(unsafe { &(*self.current).key }) { self.current = prev as *const Node; - if !prev.key.is_empty() { - return Some(unsafe { (&(*self.current).key, &(*self.current).value) }); + return true; } } } self.reset(); - None + false } } #[cfg(test)] pub mod tests { use super::*; + use test_util::LdbIteratorIter; + use types::current_key_val; use options::Options; pub fn make_skipmap() -> SkipMap { @@ -344,7 +362,7 @@ fn test_insert() { let skm = make_skipmap(); assert_eq!(skm.len(), 26); - skm.dbg_print(); + skm.map.dbg_print(); } #[test] @@ -371,19 +389,19 @@ #[test] fn test_find() { let skm = make_skipmap(); - assert_eq!(skm.get_greater_or_equal(&"abf".as_bytes().to_vec()).unwrap().key, + assert_eq!(skm.map.get_greater_or_equal(&"abf".as_bytes().to_vec()).unwrap().key, "abf".as_bytes().to_vec()); - assert!(skm.get_greater_or_equal(&"ab{".as_bytes().to_vec()).is_none()); - assert_eq!(skm.get_greater_or_equal(&"aaa".as_bytes().to_vec()).unwrap().key, + assert!(skm.map.get_greater_or_equal(&"ab{".as_bytes().to_vec()).is_none()); + assert_eq!(skm.map.get_greater_or_equal(&"aaa".as_bytes().to_vec()).unwrap().key, "aba".as_bytes().to_vec()); - assert_eq!(skm.get_greater_or_equal(&"ab".as_bytes()).unwrap().key.as_slice(), + assert_eq!(skm.map.get_greater_or_equal(&"ab".as_bytes()).unwrap().key.as_slice(), "aba".as_bytes()); - assert_eq!(skm.get_greater_or_equal(&"abc".as_bytes()).unwrap().key.as_slice(), + assert_eq!(skm.map.get_greater_or_equal(&"abc".as_bytes()).unwrap().key.as_slice(), "abc".as_bytes()); - assert!(skm.get_next_smaller(&"ab0".as_bytes()).is_none()); - assert_eq!(skm.get_next_smaller(&"abd".as_bytes()).unwrap().key.as_slice(), + assert!(skm.map.get_next_smaller(&"ab0".as_bytes()).is_none()); + assert_eq!(skm.map.get_next_smaller(&"abd".as_bytes()).unwrap().key.as_slice(), "abc".as_bytes()); - assert_eq!(skm.get_next_smaller(&"ab{".as_bytes()).unwrap().key.as_slice(), + assert_eq!(skm.map.get_next_smaller(&"ab{".as_bytes()).unwrap().key.as_slice(), "abz".as_bytes()); } @@ -392,7 +410,7 @@ let skm = SkipMap::new(Options::default()); let mut i = 0; - for (_, _) in skm.iter() { + for (_, _) in LdbIteratorIter::wrap(&mut skm.iter()) { i += 1; } @@ -422,7 +440,7 @@ let skm = make_skipmap(); let mut i = 0; - for (k, v) in skm.iter() { + for (k, v) in LdbIteratorIter::wrap(&mut skm.iter()) { assert!(!k.is_empty()); assert!(!v.is_empty()); i += 1; @@ -437,14 +455,14 @@ iter.next(); assert!(iter.valid()); - assert_eq!(iter.current().unwrap().0, "aba".as_bytes()); + assert_eq!(current_key_val(&iter).unwrap().0, "aba".as_bytes().to_vec()); iter.seek(&"abz".as_bytes().to_vec()); - assert_eq!(iter.current().unwrap(), - ("abz".as_bytes(), "def".as_bytes())); + assert_eq!(current_key_val(&iter).unwrap(), + ("abz".as_bytes().to_vec(), "def".as_bytes().to_vec())); // go back to beginning iter.seek(&"aba".as_bytes().to_vec()); - assert_eq!(iter.current().unwrap(), - ("aba".as_bytes(), "def".as_bytes())); + assert_eq!(current_key_val(&iter).unwrap(), + ("aba".as_bytes().to_vec(), "def".as_bytes().to_vec())); iter.seek(&"".as_bytes().to_vec()); assert!(iter.valid()); @@ -459,7 +477,9 @@ } } assert_eq!(iter.next(), None); - assert_eq!(iter.prev(), Some(("aby".as_bytes(), "def".as_bytes()))); + assert!(iter.prev()); + assert_eq!(current_key_val(&iter), + Some(("aby".as_bytes().to_vec(), "def".as_bytes().to_vec()))); } #[test] @@ -473,7 +493,7 @@ assert!(!iter.valid()); iter.seek(&"abc".as_bytes()); iter.prev(); - assert_eq!(iter.current().unwrap(), - ("abb".as_bytes(), "def".as_bytes())); + assert_eq!(current_key_val(&iter).unwrap(), + ("abb".as_bytes().to_vec(), "def".as_bytes().to_vec())); } }
--- a/src/table_cache.rs Thu Aug 24 20:09:47 2017 +0200 +++ b/src/table_cache.rs Sun Aug 27 19:23:20 2017 +0200 @@ -69,6 +69,7 @@ use cache; use mem_env::MemEnv; use table_builder::TableBuilder; + use test_util::LdbIteratorIter; #[test] fn test_table_name() { @@ -114,8 +115,10 @@ assert!(opt.env.size_of(tblpath).unwrap() > 20); let mut cache = TableCache::new(dbname, opt.clone(), 10); - assert_eq!(cache.get_table(123).unwrap().iter().count(), 4); + assert_eq!(LdbIteratorIter::wrap(&mut cache.get_table(123).unwrap().iter()).count(), + 4); // Test cached table. - assert_eq!(cache.get_table(123).unwrap().iter().count(), 4); + assert_eq!(LdbIteratorIter::wrap(&mut cache.get_table(123).unwrap().iter()).count(), + 4); } }
--- a/src/table_reader.rs Thu Aug 24 20:09:47 2017 +0200 +++ b/src/table_reader.rs Sun Aug 27 19:23:20 2017 +0200 @@ -9,7 +9,7 @@ use key_types::InternalKey; use options::{self, CompressionType, Options}; use table_builder::{self, Footer}; -use types::LdbIterator; +use types::{current_key_val, LdbIterator}; use std::cmp::Ordering; use std::sync::Arc; @@ -102,7 +102,7 @@ metaindexiter.seek(&filter_name); - if let Some((_key, val)) = metaindexiter.current() { + if let Some((_key, val)) = current_key_val(&metaindexiter) { let filter_block_location = BlockHandle::decode(&val).0; if filter_block_location.size() > 0 { @@ -173,7 +173,7 @@ iter.seek(key); - if let Some((_, val)) = iter.current() { + if let Some((_, val)) = current_key_val(&iter) { let location = BlockHandle::decode(&val).0; return location.offset(); } @@ -201,7 +201,7 @@ index_iter.seek(key); let handle; - if let Some((last_in_block, h)) = index_iter.current() { + if let Some((last_in_block, h)) = current_key_val(&index_iter) { if self.opt.cmp.cmp(key, &last_in_block) == Ordering::Less { handle = BlockHandle::decode(&h).0; } else { @@ -230,7 +230,7 @@ // Go to entry and check if it's the wanted entry. iter.seek(key); - if let Some((k, v)) = iter.current() { + if let Some((k, v)) = current_key_val(&iter) { if self.opt.cmp.cmp(key, &k) == Ordering::Equal { Some(v) } else { @@ -287,21 +287,19 @@ } } -impl Iterator for TableIterator { - type Item = (Vec<u8>, Vec<u8>); - - fn next(&mut self) -> Option<Self::Item> { +impl LdbIterator for TableIterator { + fn advance(&mut self) -> bool { // init essentially means that `current_block` is a data block (it's initially filled with // an index block as filler). if self.init { - if let Some((key, val)) = self.current_block.next() { - Some((key, val)) + if self.current_block.advance() { + true } else { match self.skip_to_next_entry() { - Ok(true) => self.next(), - Ok(false) => None, + Ok(true) => self.advance(), + Ok(false) => false, // try next block, this might be corruption - Err(_) => self.next(), + Err(_) => self.advance(), } } } else { @@ -309,17 +307,14 @@ Ok(true) => { // Only initialize if we got an entry self.init = true; - self.next() + self.advance() } - Ok(false) => None, + Ok(false) => false, // try next block from index, this might be corruption - Err(_) => self.next(), + Err(_) => self.advance(), } } } -} - -impl LdbIterator for TableIterator { // A call to valid() after seeking is necessary to ensure that the seek worked (e.g., no error // while reading from disk) fn seek(&mut self, to: &[u8]) { @@ -328,7 +323,7 @@ self.index_block.seek(to); - if let Some((past_block, handle)) = self.index_block.current() { + if let Some((past_block, handle)) = current_key_val(&self.index_block) { if self.table.opt.cmp.cmp(to, &past_block) <= Ordering::Equal { // ok, found right block: continue if let Ok(()) = self.load_block(&handle) { @@ -347,22 +342,26 @@ } } - fn prev(&mut self) -> Option<Self::Item> { + fn prev(&mut self) -> bool { // happy path: current block contains previous entry - if let Some(result) = self.current_block.prev() { - Some(result) + if self.current_block.prev() { + true } else { // Go back one block and look for the last entry in the previous block - if let Some((_, handle)) = self.index_block.prev() { - if self.load_block(&handle).is_ok() { - self.current_block.seek_to_last(); - self.current_block.current() + if self.index_block.prev() { + if let Some((_, handle)) = current_key_val(&self.index_block) { + if self.load_block(&handle).is_ok() { + self.current_block.seek_to_last(); + self.current_block.valid() + } else { + self.reset(); + false + } } else { - self.reset(); - None + false } } else { - None + false } } } @@ -372,17 +371,17 @@ self.init = false; } - // This iterator is special in that it's valid even before the first call to next(). It behaves - // correctly, though. + // This iterator is special in that it's valid even before the first call to advance(). It + // behaves correctly, though. fn valid(&self) -> bool { self.init && (self.current_block.valid() || self.index_block.valid()) } - fn current(&self) -> Option<Self::Item> { + fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool { if self.init { - self.current_block.current() + self.current_block.current(key, val) } else { - None + false } } } @@ -392,7 +391,8 @@ use filter::BloomPolicy; use options::Options; use table_builder::TableBuilder; - use types::LdbIterator; + use test_util::LdbIteratorIter; + use types::{current_key_val, LdbIterator}; use key_types::LookupKey; use super::*; @@ -515,11 +515,16 @@ // backwards count let mut j = 0; - while let Some((k, v)) = iter.prev() { - j += 1; - assert_eq!((data[data.len() - 1 - j].0.as_bytes(), - data[data.len() - 1 - j].1.as_bytes()), - (k.as_ref(), v.as_ref())); + loop { + iter.prev(); + if let Some((k, v)) = current_key_val(&iter) { + j += 1; + assert_eq!((data[data.len() - 1 - j].0.as_bytes(), + data[data.len() - 1 - j].1.as_bytes()), + (k.as_ref(), v.as_ref())); + } else { + break; + } } // expecting 7 - 1, because the last entry that the iterator stopped on is the last entry @@ -558,21 +563,21 @@ // See comment on valid() assert!(!iter.valid()); - assert!(iter.current().is_none()); - assert!(iter.prev().is_none()); + assert!(current_key_val(&iter).is_none()); + assert!(!iter.prev()); - assert!(iter.next().is_some()); - let first = iter.current(); + assert!(iter.advance()); + let first = current_key_val(&iter); assert!(iter.valid()); - assert!(iter.current().is_some()); + assert!(current_key_val(&iter).is_some()); - assert!(iter.next().is_some()); - assert!(iter.prev().is_some()); - assert!(iter.current().is_some()); + assert!(iter.advance()); + assert!(iter.prev()); + assert!(iter.valid()); iter.reset(); assert!(!iter.valid()); - assert!(iter.current().is_none()); + assert!(current_key_val(&iter).is_none()); assert_eq!(first, iter.next()); } @@ -593,7 +598,7 @@ loop { iter.prev(); - if let Some((k, v)) = iter.current() { + if let Some((k, v)) = current_key_val(&iter) { assert_eq!((data[i].0.as_bytes(), data[i].1.as_bytes()), (k.as_ref(), v.as_ref())); } else { @@ -619,11 +624,11 @@ iter.seek("bcd".as_bytes()); assert!(iter.valid()); - assert_eq!(iter.current(), + assert_eq!(current_key_val(&iter), Some(("bcd".as_bytes().to_vec(), "asa".as_bytes().to_vec()))); iter.seek("abc".as_bytes()); assert!(iter.valid()); - assert_eq!(iter.current(), + assert_eq!(current_key_val(&iter), Some(("abc".as_bytes().to_vec(), "def".as_bytes().to_vec()))); } @@ -634,8 +639,9 @@ let table = Table::new_raw(Options::default(), wrap_buffer(src), size).unwrap(); let table2 = table.clone(); + let mut _iter = table.iter(); // Test that all of the table's entries are reachable via get() - for (k, v) in table.iter() { + for (k, v) in LdbIteratorIter::wrap(&mut _iter) { assert_eq!(table2.get(&k), Some(v)); } @@ -665,7 +671,8 @@ let filter_reader = table.filters.clone().unwrap(); // Check that we're actually using internal keys - for (ref k, _) in table.iter() { + let mut _iter = table.iter(); + for (ref k, _) in LdbIteratorIter::wrap(&mut _iter) { assert_eq!(k.len(), 3 + 8); } @@ -698,13 +705,15 @@ assert_eq!(table.filters.as_ref().unwrap().num(), 1); { - let iter = table.iter(); + let mut _iter = table.iter(); + let iter = LdbIteratorIter::wrap(&mut _iter); // first block is skipped assert_eq!(iter.count(), 4); } { - let iter = table.iter(); + let mut _iter = table.iter(); + let iter = LdbIteratorIter::wrap(&mut _iter); for (k, _) in iter { if k == build_data()[5].0.as_bytes() {
--- a/src/test_util.rs Thu Aug 24 20:09:47 2017 +0200 +++ b/src/test_util.rs Sun Aug 27 19:23:20 2017 +0200 @@ -18,32 +18,31 @@ } } -impl<'a> Iterator for TestLdbIter<'a> { - type Item = (&'a [u8], &'a [u8]); - - fn next(&mut self) -> Option<Self::Item> { +impl<'a> LdbIterator for TestLdbIter<'a> { + fn advance(&mut self) -> bool { if self.ix == self.v.len() { - return None; + false } else if !self.init { self.init = true; - Some(self.v[self.ix]) + true } else { self.ix += 1; - Some(self.v[self.ix - 1]) + true } } -} - -impl<'a> LdbIterator for TestLdbIter<'a> { fn reset(&mut self) { self.ix = 0; self.init = false; } - fn current(&self) -> Option<Self::Item> { + fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool { if self.init && self.ix < self.v.len() { - Some(self.v[self.ix]) + key.clear(); + val.clear(); + key.extend_from_slice(self.v[self.ix].0); + val.extend_from_slice(self.v[self.ix].1); + true } else { - None + false } } fn valid(&self) -> bool { @@ -55,12 +54,43 @@ self.ix += 1; } } - fn prev(&mut self) -> Option<Self::Item> { + fn prev(&mut self) -> bool { if !self.init || self.ix == 0 { - None + false } else { self.ix -= 1; - Some(self.v[self.ix]) + true } } } + +/// LdbIteratorIter implements std::iter::Iterator for an LdbIterator. +pub struct LdbIteratorIter<'a, It: 'a> { + inner: &'a mut It, +} + +impl<'a, It: LdbIterator> LdbIteratorIter<'a, It> { + pub fn wrap(it: &'a mut It) -> LdbIteratorIter<'a, It> { + LdbIteratorIter { inner: it } + } +} + +impl<'a, It: LdbIterator> Iterator for LdbIteratorIter<'a, It> { + type Item = (Vec<u8>, Vec<u8>); + fn next(&mut self) -> Option<Self::Item> { + LdbIterator::next(self.inner) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_test_util() { + let v = vec![("abc".as_bytes(), "def".as_bytes()), ("abd".as_bytes(), "deg".as_bytes())]; + let mut iter = TestLdbIter::new(v); + assert_eq!(iter.next(), + Some((Vec::from("abc".as_bytes()), Vec::from("def".as_bytes())))); + } +}
--- a/src/types.rs Thu Aug 24 20:09:47 2017 +0200 +++ b/src/types.rs Sun Aug 27 19:23:20 2017 +0200 @@ -63,7 +63,7 @@ /// current_key_val is a helper allocating two vectors and filling them with the current key/value /// of the specified iterator. -pub fn current_key_val<It: LdbIterator>(it: &It) -> Option<(Vec<u8>, Vec<u8>)> { +pub fn current_key_val<It: LdbIterator + ?Sized>(it: &It) -> Option<(Vec<u8>, Vec<u8>)> { let (mut k, mut v) = (vec![], vec![]); if it.current(&mut k, &mut v) { Some((k, v))