changeset 331:82d9390989c7

db_iter: Add initial implementation. No passing tests yet.
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 01 Oct 2017 20:39:00 +0200
parents cb3090f42af3
children 75217e781ddd
files src/db_iter.rs src/lib.rs
diffstat 2 files changed, 273 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/db_iter.rs	Sun Oct 01 20:39:00 2017 +0200
@@ -0,0 +1,272 @@
+
+use cmp::Cmp;
+use key_types::{parse_internal_key, truncate_to_userkey, InternalKey, LookupKey, ValueType};
+use merging_iter::MergingIter;
+use snapshot::Snapshot;
+use types::{Direction, LdbIterator, Shared};
+use version_set::VersionSet;
+
+use std::cmp::Ordering;
+use std::rc::Rc;
+
+use rand;
+
+const READ_BYTES_PERIOD: isize = 1048576;
+
+pub struct DBIterator {
+    // A user comparator.
+    cmp: Rc<Box<Cmp>>,
+    vset: Shared<VersionSet>,
+    iter: MergingIter,
+    // By holding onto a snapshot, we make sure that the iterator iterates over the state at the
+    // point of its creation.
+    ss: Snapshot,
+    dir: Direction,
+    byte_count: isize,
+
+    valid: bool,
+    // temporarily stored user key.
+    savedkey: Vec<u8>,
+    // buffer for reading internal keys
+    buf: Vec<u8>,
+    savedval: Vec<u8>,
+}
+
+impl DBIterator {
+    pub fn new(cmp: Rc<Box<Cmp>>,
+               vset: Shared<VersionSet>,
+               iter: MergingIter,
+               ss: Snapshot)
+               -> DBIterator {
+        DBIterator {
+            cmp: cmp,
+            vset: vset,
+            iter: iter,
+            ss: ss,
+            dir: Direction::Forward,
+            byte_count: random_period(),
+
+            valid: false,
+            savedkey: vec![],
+            buf: vec![],
+            savedval: vec![],
+        }
+    }
+
+    /// record_read_sample records a read sample using the current contents of self.buf, which
+    /// should be an InternalKey.
+    fn record_read_sample<'a>(&mut self) {
+        if self.byte_count < 0 {
+            let vset = self.vset.borrow().current();
+            vset.borrow_mut().record_read_sample(&self.buf);
+            self.byte_count += random_period();
+        }
+    }
+
+    /// find_next_user_entry skips to the next user entry after the one saved in self.savedkey.
+    fn find_next_user_entry(&mut self, mut skipping: bool) -> bool {
+        assert!(self.iter.valid());
+        assert!(self.dir == Direction::Forward);
+
+        while self.iter.valid() {
+            self.iter.current(&mut self.buf, &mut self.savedval);
+            self.record_read_sample();
+            let (typ, seq, ukey) = parse_internal_key(&self.buf);
+
+            // Skip keys with a sequence number after our snapshot.
+            if seq <= self.ss.sequence() {
+                if typ == ValueType::TypeDeletion {
+                    // Mark current (deleted) key to be skipped.
+                    self.savedkey.clear();
+                    self.savedkey.extend_from_slice(ukey);
+                    skipping = true;
+                } else if typ == ValueType::TypeValue {
+                    if skipping && self.cmp.cmp(ukey, &self.savedkey) <= Ordering::Equal {
+                        // Entry hidden, because it's smaller than the key to be skipped.
+                    } else {
+                        self.valid = true;
+                        self.savedkey.clear();
+                        return true;
+                    }
+                }
+            }
+            self.iter.advance();
+        }
+        self.savedkey.clear();
+        self.valid = false;
+        false
+    }
+
+    /// find_prev_user_entry finds the next smaller entry before self.savedkey.
+    fn find_prev_user_entry(&mut self) -> bool {
+        assert!(self.dir == Direction::Reverse);
+        let mut value_type = ValueType::TypeDeletion;
+        while self.iter.valid() {
+            self.iter.current(&mut self.buf, &mut self.savedval);
+            self.record_read_sample();
+            let (typ, seq, ukey) = parse_internal_key(&self.buf);
+
+            if seq > 0 && seq <= self.ss.sequence() {
+                if value_type != ValueType::TypeDeletion &&
+                   self.cmp.cmp(ukey, &self.savedkey) == Ordering::Less {
+                    // We found a non-deleted entry for a previous key (in the previous iteration)
+                    break;
+                }
+                value_type = typ;
+                if value_type == ValueType::TypeDeletion {
+                    self.savedkey.clear();
+                    self.savedval.clear();
+                } else {
+                    self.savedkey.clear();
+                    self.savedkey.extend_from_slice(&ukey);
+                }
+            }
+            self.iter.prev();
+        }
+
+        if value_type == ValueType::TypeDeletion {
+            self.valid = false;
+            self.savedkey.clear();
+            self.savedval.clear();
+            self.dir = Direction::Forward;
+        } else {
+            self.valid = true;
+        }
+        true
+    }
+}
+
+impl LdbIterator for DBIterator {
+    fn advance(&mut self) -> bool {
+        if !self.valid() {
+            self.seek_to_first();
+            if !self.valid() {
+                return false;
+            }
+        }
+
+        if self.dir == Direction::Reverse {
+            self.dir = Direction::Forward;
+            if !self.iter.valid() {
+                self.iter.seek_to_first();
+            } else {
+                self.iter.advance();
+            }
+            if !self.iter.valid() {
+                self.valid = false;
+                self.savedkey.clear();
+                return false;
+            }
+        } else {
+            // Save current user key.
+            assert!(self.iter.current(&mut self.buf, &mut self.savedval));
+            let ukey = parse_internal_key(&self.buf).2;
+            self.savedkey.clear();
+            self.savedkey.extend_from_slice(ukey);
+        }
+        self.find_next_user_entry(// skipping=
+                                  true)
+    }
+    fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool {
+        if !self.valid() {
+            return false;
+        }
+        if self.dir == Direction::Forward {
+            self.iter.current(key, val);
+            truncate_to_userkey(key);
+            true
+        } else {
+            key.clear();
+            key.extend_from_slice(&self.savedkey);
+            val.clear();
+            val.extend_from_slice(&self.savedval);
+            true
+        }
+    }
+    fn prev(&mut self) -> bool {
+        if !self.valid() {
+            return false;
+        }
+
+        let mut newsavedkey = vec![];
+
+        if self.dir == Direction::Forward {
+            // scan backwards until we hit a different key; then use the normal scanning procedure.
+            self.iter.current(&mut self.buf, &mut self.savedval);
+            self.savedkey.clear();
+            self.savedkey.extend_from_slice(parse_internal_key(&self.buf).2);
+            loop {
+                self.iter.prev();
+                if !self.iter.valid() {
+                    self.valid = false;
+                    self.savedkey.clear();
+                    self.savedval.clear();
+                    return false;
+                }
+                // Scan until we hit the next-smaller key.
+                self.iter.current(&mut self.buf, &mut self.savedval);
+                newsavedkey.clear();
+                newsavedkey.extend_from_slice(parse_internal_key(&self.buf).2);
+                if self.cmp.cmp(&newsavedkey, &self.savedkey) == Ordering::Less {
+                    break;
+                }
+            }
+            self.dir = Direction::Reverse;
+        }
+        self.find_prev_user_entry()
+    }
+    fn valid(&self) -> bool {
+        self.valid
+    }
+    fn seek(&mut self, to: &[u8]) {
+        self.dir = Direction::Forward;
+        self.savedkey.clear();
+        self.savedval.clear();
+        self.buf.clear();
+        self.buf.extend_from_slice(LookupKey::new(to, self.ss.sequence()).internal_key());
+        self.iter.seek(&self.savedkey);
+        if self.iter.valid() {
+            self.find_next_user_entry(// skipping=
+                                      false);
+        } else {
+            self.valid = false;
+        }
+    }
+    fn seek_to_first(&mut self) {
+        self.dir = Direction::Forward;
+        self.savedval.clear();
+        self.iter.seek_to_first();
+        if self.iter.valid() {
+            self.find_next_user_entry(// skipping=
+                                      false);
+        } else {
+            self.valid = false;
+        }
+    }
+    fn reset(&mut self) {
+        self.iter.reset();
+        self.valid = false;
+        self.savedkey.clear();
+        self.savedval.clear();
+        self.buf.clear();
+    }
+}
+
+fn random_period() -> isize {
+    rand::random::<isize>() % 2 * READ_BYTES_PERIOD
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use db_impl::testutil::*;
+
+    // not yet passing
+    //#[test]
+    fn db_iter_basic_test() {
+        let mut db = build_db();
+        let mut iter = db.new_iter().unwrap();
+
+        assert!(iter.advance());
+    }
+}
--- a/src/lib.rs	Sun Oct 01 20:38:38 2017 +0200
+++ b/src/lib.rs	Sun Oct 01 20:39:00 2017 +0200
@@ -41,3 +41,4 @@
 mod write_batch;
 
 mod db_impl;
+mod db_iter;