Mercurial > lbo > hg > localmr
view src/reduce.rs @ 69:ddd39d8ba118
Add some tests for the grouping iterator
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 06 Feb 2016 17:08:16 +0000 |
parents | 9d7c1468f1b2 |
children | 2a58864abf08 |
line wrap: on
line source
//! Implements the Reduce phase. //! use mapreducer::MapReducer; use parameters::MRParameters; use formats::util::MRSinkGenerator; use record_types::{Record, MultiRecord, REmitter}; use shard_merge::ShardMergeIterator; struct ReducePartition<MR: MapReducer, InputIt: Iterator<Item = Record>, SinkGen: MRSinkGenerator> { mr: MR, params: MRParameters, // Maybe we want to genericize this to an Iterator<Item=Read> or so? This defers opening // the files to the reduce shard itself. srcfiles: Vec<InputIt>, dstfilegen: SinkGen, } impl<MR: MapReducer, InputIt: Iterator<Item=Record>, SinkGen: MRSinkGenerator> ReducePartition<MR, InputIt, SinkGen> { /// Create a new Reduce partition for the given MR; source and destination I/O. /// mr is the map/reduce functions. /// params is generic MR parameters as well as some applying directly to this reduce partition. /// srcfiles is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's /// outputs. /// dstfiles is a SinkGen (as known from the mapping phase) that is used to create the output /// file (there is one output file per reduce partition, currently). pub fn new(mr: MR, params: MRParameters, srcfiles: Vec<InputIt>, dstfiles: SinkGen) -> ReducePartition<MR, InputIt, SinkGen> { ReducePartition { mr: mr, params: params, srcfiles: srcfiles, dstfilegen: dstfiles } } /// Run the Reduce partition. pub fn _run(mut self) { let sorted_input = self.open_sorted_input(); // reduce input and write results. } /// Create an iterator that merges all input sources. Leaves self.srcfiles empty. fn open_sorted_input(&mut self) -> ShardMergeIterator<Record> { let mut inputs = Vec::new(); inputs.append(&mut self.srcfiles); let mut it = inputs.into_iter(); ShardMergeIterator::build(&mut it) } } use std::iter::Peekable; /// Iterator adapter: Converts an Iterator<Item=Record> into an Iterator<Item=MultiRecord> by /// grouping subsequent records with identical key. /// The original iterator must yield records in sorted order (or at least in an order where /// identical items are adjacent). pub struct RecordsToMultiRecords<It: Iterator<Item = Record>> { it: Peekable<It>, settings: MRParameters, } impl<It: Iterator<Item = Record>> RecordsToMultiRecords<It> { fn new(it: It, settings: MRParameters) -> RecordsToMultiRecords<It> { RecordsToMultiRecords { it: it.peekable(), settings: settings, } } } impl<It: Iterator<Item = Record>> Iterator for RecordsToMultiRecords<It> { type Item = MultiRecord; fn next(&mut self) -> Option<Self::Item> { use std::ascii::AsciiExt; let mut collection = Vec::with_capacity(self.settings.reduce_group_prealloc_size); let key: String; match self.it.next() { None => return None, Some(r) => { if self.settings.reduce_group_insensitive { key = r.key[..].to_ascii_lowercase(); } else { key = r.key } collection.push(r.value) } } loop { match self.it.peek() { None => break, Some(r) => { if !self.settings.reduce_group_insensitive && r.key != key { break; } else if self.settings.reduce_group_insensitive && r.key[..].to_ascii_lowercase() != key { break; } } } collection.push(self.it.next().unwrap().value); } return Some(MultiRecord::new(key, collection)); } } #[cfg(test)] mod tests { use super::*; use parameters::MRParameters; use record_types::*; use std::vec; #[test] fn test_grouping_iterator() { let records = vec![ mk_rcrd("aaa", "def"), mk_rcrd("abb", "111"), mk_rcrd("Abb", "112"), mk_rcrd("abbb", "113"), mk_rcrd("abc", "xyz"), mk_rcrd("xyz", "___"), mk_rcrd("xyz", "__foo"), mk_rcrd("xyz", "---")]; let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> = RecordsToMultiRecords::new(records.into_iter(), MRParameters::new().set_reduce_group_opts(2, true)); let lengths = vec![1, 2, 1, 1, 3]; let mut i = 0; for multirec in group_it { assert_eq!(multirec.into_iter().count(), lengths[i]); i += 1; } } #[test] fn test_grouping_iterator_sensitive() { let records = vec![ mk_rcrd("aaa", "def"), mk_rcrd("abb", "111"), mk_rcrd("Abb", "112"), mk_rcrd("abbb", "113"), mk_rcrd("abc", "xyz"), mk_rcrd("xyz", "___"), mk_rcrd("xyz", "__foo"), mk_rcrd("xyz", "---")]; let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> = RecordsToMultiRecords::new(records.into_iter(), MRParameters::new().set_reduce_group_opts(2, false)); let lengths = vec![1, 1, 1, 1, 1, 3]; let mut i = 0; for multirec in group_it { assert_eq!(multirec.into_iter().count(), lengths[i]); i += 1; } } }