Mercurial > lbo > hg > localmr
changeset 37:bda56504a12c
Add ShardMergeIterator type.
This type is used to merge the shards that are the input to the reduce phase. See https://drive.google.com/open?id=1grB87a0w9fQ2k7i04N3VJvYlw2BldxWNcHublW_ygJs
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Mon, 01 Feb 2016 20:35:45 +0000 |
parents | e80e00619c81 |
children | 5fc9828af476 |
files | src/lib.rs src/shard_merge.rs |
diffstat | 2 files changed, 142 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/src/lib.rs Sun Jan 31 19:57:17 2016 +0000 +++ b/src/lib.rs Mon Feb 01 20:35:45 2016 +0000 @@ -7,6 +7,7 @@ pub mod map; pub mod mapreducer; pub mod parameters; +pub mod shard_merge; #[test]
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/shard_merge.rs Mon Feb 01 20:35:45 2016 +0000 @@ -0,0 +1,141 @@ +//! Implements a merge tree to merge an arbitrary number of sorted map outputs. See +//! https://drive.google.com/open?id=1grB87a0w9fQ2k7i04N3VJvYlw2BldxWNcHublW_ygJs. +//! Genericized in order to build arbitrary merge trees. + +use std::cmp::PartialOrd; +use std::iter; + +/// See module description. +/// This type uses dynamic instead of static dispatch because it realizes an arbitrary structure +/// and can therefore not work with a single type signature. +pub struct ShardMergeIterator<'a, T: PartialOrd> { + left: Box<Iterator<Item=T> + 'a>, + right: Box<Iterator<Item=T> + 'a>, + + left_peeked: Option<T>, + right_peeked: Option<T>, +} + +impl<'a, T: PartialOrd + Clone> Iterator for ShardMergeIterator<'a, T> { + type Item = T; + fn next(&mut self) -> Option<Self::Item> { + // fill up + match (self.left_peeked.clone(), self.right_peeked.clone()) { + (None, None) => { self.left_peeked = self.left.next(); self.right_peeked = self.right.next() }, + (Some(_), None) => self.right_peeked = self.right.next(), + (None, Some(_)) => self.left_peeked = self.left.next(), + (Some(_), Some(_)) => () + } + + // Consume peeked values + match (self.left_peeked.clone(), self.right_peeked.clone()) { + (None, None) => { return None }, + (l @ Some(_), None) => { self.left_peeked = None; return l }, + (None, r @ Some(_)) => { self.right_peeked = None; return r }, + (l @ Some(_), r @ Some(_)) => { + if l <= r { + self.left_peeked = None; + return l + } else { + self.right_peeked = None; + return r + } + } + } + } +} + +impl<'a, T: PartialOrd + Clone> ShardMergeIterator<'a, T> { + fn default() -> ShardMergeIterator<'a, T> where T: 'a { + ShardMergeIterator { + left: Box::new(iter::empty()), + right: Box::new(iter::empty()), + left_peeked: None, + right_peeked: None, + } + } + /// Takes multiple iterators of type It and generates one ShardedMergeIterator.. + /// (yes, iterator over a collection of iterators). + pub fn build<It: Iterator<Item=T>, ItIt: Iterator<Item=It>>(sources: &mut ItIt) -> ShardMergeIterator<'a, T> + where T: 'a, It: 'a { + let mut merged: Vec<ShardMergeIterator<T>> = Vec::new(); + + // Initial merging: Merge pairs of input iterators together. + loop { + let src1: It; + match sources.next() { + None => break, + Some(src) => src1 = src, + } + match sources.next() { + None => merged.push(ShardMergeIterator { left: Box::new(src1), right: Box::new(iter::empty()), .. ShardMergeIterator::default() }), + Some(src) => merged.push(ShardMergeIterator { left: Box::new(src1), right: Box::new(src), .. ShardMergeIterator::default() }), + } + } + + // Recursively build the merge tree from the leaves. + ShardMergeIterator::merge(merged) + } + + /// Merge multiple ShardMergeIterators, recursively (meaning it will result in a more or less + /// balanced merge sort tree). + fn merge(mut its: Vec<ShardMergeIterator<'a, T>>) -> ShardMergeIterator<'a, T> where T: 'a { + if its.len() == 0 { + ShardMergeIterator::default() + } else if its.len() == 1 { + ShardMergeIterator { left: Box::new(its.remove(0)), .. ShardMergeIterator::default() } + } else if its.len() == 2 { + let it1 = its.remove(0); + let it2 = its.remove(0); + ShardMergeIterator { left: Box::new(it1), right: Box::new(it2), ..ShardMergeIterator::default() } + } else { + // its is left part, right is right part + let split_at = its.len() / 2; + let right = its.split_off(split_at); + println!("{} -- {}", its.len(), right.len()); + ShardMergeIterator { left: Box::new(ShardMergeIterator::merge(its)), right: Box::new(ShardMergeIterator::merge(right)), .. ShardMergeIterator::default() } + } + } +} + +#[cfg(test)] +mod tests { + use std::vec; + use shard_merge::ShardMergeIterator; + + fn get_collection_1() -> vec::IntoIter<i32> { + vec![1, 4, 5, 5, 6, 9, 11, 15, 15, 17, 18, 20].into_iter() + } + fn get_collection_2() -> vec::IntoIter<i32> { + vec![2, 2, 2, 3, 4, 5, 7, 8, 9, 10, 45, 46, 47].into_iter() + } + fn get_collection_3() -> vec::IntoIter<i32> { + vec![5, 8, 9, 10, 22, 25, 30, 37, 41, 46, 71].into_iter() + } + fn get_collection_4() -> vec::IntoIter<i32> { + vec![111, 112, 113, 155].into_iter() + } + fn get_collection_5() -> vec::IntoIter<i32> { + vec![13, 45, 98, 105, 145].into_iter() + } + fn get_collection_6() -> vec::IntoIter<i32> { + vec![14, 67, 99, 111, 222, 566, 643].into_iter() + } + + #[test] + fn test_merge_iterator() { + let it = ShardMergeIterator::build(&mut vec![get_collection_1(), get_collection_2(), get_collection_3(), + get_collection_4(), get_collection_5(), get_collection_6()].into_iter()); + let mut cmp = 0; + let mut cnt = 0; + + for i in it { + assert!(i >= cmp); + cmp = i; + cnt += 1; + } + + assert_eq!(cnt, get_collection_1().len()+get_collection_2().len()+get_collection_3().len()+ + get_collection_4().len()+get_collection_5().len()+get_collection_6().len()); + } +}