Mercurial > lbo > hg > leveldb-rs
view src/block.rs @ 40:30947e6a9cee
Remove C : Comparator type params from all types
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Tue, 14 Jun 2016 06:24:38 +0000 |
parents | 41695091f5b1 |
children |
line wrap: on
line source
#![allow(dead_code)] use std::cmp::Ordering; use types::{LdbIterator, Options}; use types::{Comparator, StandardComparator}; use integer_encoding::FixedInt; use integer_encoding::VarInt; pub type BlockContents = Vec<u8>; /// A block is a list of ENTRIES followed by a list of RESTARTS, terminated by a fixed u32 /// N_RESTARTS. /// /// An ENTRY consists of three varints, SHARED, NON_SHARED, VALSIZE, a KEY and a VALUE. /// /// SHARED denotes how many bytes the entry's key shares with the previous one. /// /// NON_SHARED is the size of the key minus SHARED. /// /// VALSIZE is the size of the value. /// /// KEY and VALUE are byte strings; the length of KEY is NON_SHARED. /// /// A RESTART is a fixed u32 pointing to the beginning of an ENTRY. /// /// N_RESTARTS contains the number of restarts. pub struct Block { data: BlockContents, restarts_off: usize, } impl Block { pub fn new(contents: BlockContents) -> Block { assert!(contents.len() > 4); let restarts = u32::decode_fixed(&contents[contents.len() - 4..]); let restart_offset = contents.len() - 4 - 4 * restarts as usize; Block { data: contents, restarts_off: restart_offset, } } fn number_restarts(&self) -> usize { ((self.data.len() - self.restarts_off) / 4) - 1 } fn get_restart_point(&self, ix: usize) -> usize { let restart = self.restarts_off + 4 * ix; u32::decode_fixed(&self.data[restart..restart + 4]) as usize } pub fn iter<'a>(&'a self, cmp: Box<Comparator>) -> BlockIter<'a> { BlockIter { block: self, cmp: cmp, current_restart_ix: 0, offset: 0, key: Vec::new(), val_offset: 0, current_entry_offset: 0, } } } pub struct BlockIter<'a> { block: &'a Block, cmp: Box<Comparator>, // start of next entry offset: usize, // start of current entry current_entry_offset: usize, // tracks the last restart we encountered current_restart_ix: usize, // We assemble the key from two parts usually, so we keep the current full key here. key: Vec<u8>, val_offset: usize, } impl<'a> BlockIter<'a> { // Returns SHARED, NON_SHARED and VALSIZE from the current position. Advances self.offset. fn parse_entry(&mut self) -> (usize, usize, usize) { let mut i = 0; let (shared, sharedlen) = usize::decode_var(&self.block.data[self.offset..]); i += sharedlen; let (non_shared, non_sharedlen) = usize::decode_var(&self.block.data[self.offset + i..]); i += non_sharedlen; let (valsize, valsizelen) = usize::decode_var(&self.block.data[self.offset + i..]); i += valsizelen; self.offset += i; (shared, non_shared, valsize) } /// offset is assumed to be at the beginning of the non-shared key part. /// offset is not advanced. fn assemble_key(&mut self, shared: usize, non_shared: usize) { self.key.resize(shared, 0); self.key.extend_from_slice(&self.block.data[self.offset..self.offset + non_shared]); } fn reset(&mut self) { self.offset = 0; self.current_restart_ix = 0; self.key.clear(); self.val_offset = 0; } } impl<'a> Iterator for BlockIter<'a> { type Item = (Vec<u8>, &'a [u8]); fn next(&mut self) -> Option<Self::Item> { self.current_entry_offset = self.offset; if self.current_entry_offset >= self.block.restarts_off { return None; } let (shared, non_shared, valsize) = self.parse_entry(); self.assemble_key(shared, non_shared); self.val_offset = self.offset + non_shared; self.offset = self.val_offset + valsize; let num_restarts = self.block.number_restarts(); while self.current_restart_ix + 1 < num_restarts && self.block.get_restart_point(self.current_restart_ix + 1) < self.current_entry_offset { self.current_restart_ix += 1; } Some((self.key.clone(), &self.block.data[self.val_offset..self.val_offset + valsize])) } } impl<'a> LdbIterator<'a> for BlockIter<'a> { fn prev(&mut self) -> Option<Self::Item> { // as in the original implementation -- seek to last restart point, then look for key let current_offset = self.current_entry_offset; // At the beginning, can't go further back if current_offset == 0 { self.reset(); return None; } while self.block.get_restart_point(self.current_restart_ix) >= current_offset { self.current_restart_ix -= 1; } self.offset = self.block.get_restart_point(self.current_restart_ix); assert!(self.offset < current_offset); let mut result; loop { result = self.next(); println!("next! {:?}", (self.current_entry_offset, self.offset)); if self.offset >= current_offset { break; } } result } fn seek(&mut self, to: &[u8]) { self.reset(); let mut left = 0; let mut right = self.block.number_restarts() - 1; // Do a binary search over the restart points. while left < right { let middle = (left + right + 1) / 2; self.offset = self.block.get_restart_point(middle); // advances self.offset let (shared, non_shared, _) = self.parse_entry(); // At a restart, the shared part is supposed to be 0. assert_eq!(shared, 0); let cmp = self.cmp.cmp(to, &self.block.data[self.offset..self.offset + non_shared]); if cmp == Ordering::Less { right = middle - 1; } else { left = middle; } } assert_eq!(left, right); self.current_restart_ix = left; self.offset = self.block.get_restart_point(left); // Linear search from here on while let Some((k, _)) = self.next() { if self.cmp.cmp(k.as_slice(), to) >= Ordering::Equal { return; } } } fn valid(&self) -> bool { !self.key.is_empty() && self.val_offset > 0 && self.val_offset < self.block.restarts_off } fn current(&self) -> Self::Item { assert!(self.valid()); (self.key.clone(), &self.block.data[self.val_offset..self.offset]) } } pub struct BlockBuilder { opt: Options, buffer: Vec<u8>, restarts: Vec<u32>, last_key: Vec<u8>, counter: usize, } impl BlockBuilder { fn new(o: Options) -> BlockBuilder { let mut restarts = vec![0]; restarts.reserve(1023); BlockBuilder { buffer: Vec::with_capacity(o.block_size), opt: o, restarts: restarts, last_key: Vec::new(), counter: 0, } } pub fn reset(&mut self) { self.buffer.clear(); self.restarts.clear(); self.last_key.clear(); self.counter = 0; } pub fn add(&mut self, key: &[u8], val: &[u8]) { assert!(self.counter <= self.opt.block_restart_interval); assert!(self.buffer.is_empty() || self.opt.cmp.cmp(self.last_key.as_slice(), key) == Ordering::Less); let mut shared = 0; if self.counter < self.opt.block_restart_interval { let smallest = if self.last_key.len() < key.len() { self.last_key.len() } else { key.len() }; while shared < smallest && self.last_key[shared] == key[shared] { shared += 1; } } else { self.restarts.push(self.buffer.len() as u32); self.last_key.resize(0, 0); self.counter = 0; } let non_shared = key.len() - shared; let mut buf = [0 as u8; 4]; let mut sz = shared.encode_var(&mut buf[..]); self.buffer.extend_from_slice(&buf[0..sz]); sz = non_shared.encode_var(&mut buf[..]); self.buffer.extend_from_slice(&buf[0..sz]); sz = val.len().encode_var(&mut buf[0..sz]); self.buffer.extend_from_slice(&buf[0..sz]); self.buffer.extend_from_slice(&key[shared..]); self.buffer.extend_from_slice(val); // Update key self.last_key.resize(shared, 0); self.last_key.extend_from_slice(&key[shared..]); // assert_eq!(&self.last_key[..], key); self.counter += 1; } pub fn finish(mut self) -> BlockContents { // 1. Append RESTARTS let mut buf = [0 as u8; 4]; self.buffer.reserve(self.restarts.len() * 4 + 4); for r in self.restarts.iter() { r.encode_fixed(&mut buf[..]); self.buffer.extend_from_slice(&buf[..]); } // 2. Append N_RESTARTS (self.restarts.len() as u32).encode_fixed(&mut buf[..]); self.buffer.extend_from_slice(&buf[..]); // done self.buffer } } #[cfg(test)] mod tests { use super::*; use types::*; fn get_data() -> Vec<(&'static [u8], &'static [u8])> { vec![("key1".as_bytes(), "value1".as_bytes()), ("loooooooooooooooooooooooooooooooooongerkey1".as_bytes(), "shrtvl1".as_bytes()), ("medium length key 1".as_bytes(), "some value 2".as_bytes()), ("prefix_key1".as_bytes(), "value".as_bytes()), ("prefix_key2".as_bytes(), "value".as_bytes()), ("prefix_key3".as_bytes(), "value".as_bytes())] } #[test] fn test_block_builder() { let mut o = Options::default(); o.block_restart_interval = 3; let mut builder = BlockBuilder::new(o); for &(k, v) in get_data().iter() { builder.add(k, v); assert!(builder.counter <= 3); } let block = builder.finish(); assert_eq!(block.len(), 149); } #[test] fn test_build_iterate() { let data = get_data(); let mut builder = BlockBuilder::new(Options::default()); for &(k, v) in data.iter() { builder.add(k, v); } let block_contents = builder.finish(); let block = Block::new(block_contents); let block_iter = block.iter(); let mut i = 0; for (k, v) in block_iter { assert_eq!(&k[..], data[i].0); assert_eq!(v, data[i].1); i += 1; } assert_eq!(i, data.len()); } #[test] fn test_iterate_reverse() { let mut o = Options::default(); o.block_restart_interval = 3; let data = get_data(); let mut builder = BlockBuilder::new(o); for &(k, v) in data.iter() { builder.add(k, v); } let block_contents = builder.finish(); let block = Block::new(block_contents); let mut block_iter = block.iter(); assert!(!block_iter.valid()); assert_eq!(block_iter.next(), Some(("key1".as_bytes().to_vec(), "value1".as_bytes()))); assert!(block_iter.valid()); block_iter.next(); assert!(block_iter.valid()); block_iter.prev(); assert!(block_iter.valid()); assert_eq!(block_iter.current(), ("key1".as_bytes().to_vec(), "value1".as_bytes())); block_iter.prev(); assert!(!block_iter.valid()); } #[test] fn test_seek() { let mut o = Options::default(); o.block_restart_interval = 3; let data = get_data(); let mut builder = BlockBuilder::new(o); for &(k, v) in data.iter() { builder.add(k, v); } let block_contents = builder.finish(); let block = Block::new(block_contents); let mut iter = block.iter(); iter.seek(&"prefix_key2".as_bytes()); assert!(iter.valid()); assert_eq!(iter.current(), ("prefix_key2".as_bytes().to_vec(), "value".as_bytes())); iter.seek(&"key1".as_bytes()); assert!(iter.valid()); assert_eq!(iter.current(), ("key1".as_bytes().to_vec(), "value1".as_bytes())); } }