Mercurial > lbo > hg > localmr
changeset 69:ddd39d8ba118
Add some tests for the grouping iterator
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 06 Feb 2016 17:08:16 +0000 |
parents | edbfc8c3ff56 |
children | a9f8ff5da6e8 |
files | src/reduce.rs |
diffstat | 1 files changed, 77 insertions(+), 22 deletions(-) [+] |
line wrap: on
line diff
--- a/src/reduce.rs Sat Feb 06 17:07:59 2016 +0000 +++ b/src/reduce.rs Sat Feb 06 17:08:16 2016 +0000 @@ -17,24 +17,24 @@ } impl<MR: MapReducer, InputIt: Iterator<Item=Record>, SinkGen: MRSinkGenerator> ReducePartition<MR, InputIt, SinkGen> { -/// Create a new Reduce partition for the given MR; source and destination I/O. -/// mr is the map/reduce functions. -/// params is generic MR parameters as well as some applying directly to this reduce partition. -/// srcfiles is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's -/// outputs. -/// dstfiles is a SinkGen (as known from the mapping phase) that is used to create the output -/// file (there is one output file per reduce partition, currently). + /// Create a new Reduce partition for the given MR; source and destination I/O. + /// mr is the map/reduce functions. + /// params is generic MR parameters as well as some applying directly to this reduce partition. + /// srcfiles is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's + /// outputs. + /// dstfiles is a SinkGen (as known from the mapping phase) that is used to create the output + /// file (there is one output file per reduce partition, currently). pub fn new(mr: MR, params: MRParameters, srcfiles: Vec<InputIt>, dstfiles: SinkGen) -> ReducePartition<MR, InputIt, SinkGen> { ReducePartition { mr: mr, params: params, srcfiles: srcfiles, dstfilegen: dstfiles } } -/// Run the Reduce partition. + /// Run the Reduce partition. pub fn _run(mut self) { - let mut sorted_input = self.open_sorted_input(); -// reduce input and write results. + let sorted_input = self.open_sorted_input(); + // reduce input and write results. } -/// Create an iterator that merges all input sources. Leaves self.srcfiles empty. + /// Create an iterator that merges all input sources. Leaves self.srcfiles empty. fn open_sorted_input(&mut self) -> ShardMergeIterator<Record> { let mut inputs = Vec::new(); inputs.append(&mut self.srcfiles); @@ -51,17 +51,16 @@ /// grouping subsequent records with identical key. /// The original iterator must yield records in sorted order (or at least in an order where /// identical items are adjacent). -struct RecordsToMultiRecords<It: Iterator<Item = Record>> { +pub struct RecordsToMultiRecords<It: Iterator<Item = Record>> { it: Peekable<It>, - /// Efficiency knob: How big groups of records are expected to be. Default is 1. - expected_group_size: usize, + settings: MRParameters, } impl<It: Iterator<Item = Record>> RecordsToMultiRecords<It> { - fn new(it: It, egs: usize) -> RecordsToMultiRecords<It> { + fn new(it: It, settings: MRParameters) -> RecordsToMultiRecords<It> { RecordsToMultiRecords { it: it.peekable(), - expected_group_size: egs, + settings: settings, } } } @@ -69,29 +68,85 @@ impl<It: Iterator<Item = Record>> Iterator for RecordsToMultiRecords<It> { type Item = MultiRecord; fn next(&mut self) -> Option<Self::Item> { - let mut collection = Vec::with_capacity(self.expected_group_size); + use std::ascii::AsciiExt; + let mut collection = Vec::with_capacity(self.settings.reduce_group_prealloc_size); let key: String; - match self.it.next() { None => return None, Some(r) => { - key = r.key; + if self.settings.reduce_group_insensitive { + key = r.key[..].to_ascii_lowercase(); + } else { + key = r.key + } collection.push(r.value) } } - loop { match self.it.peek() { None => break, Some(r) => { - if r.key != key { + if !self.settings.reduce_group_insensitive && r.key != key { + break; + } else if self.settings.reduce_group_insensitive && r.key[..].to_ascii_lowercase() != key { break; } } } collection.push(self.it.next().unwrap().value); } - return Some(MultiRecord::new(key, collection)); } } + +#[cfg(test)] +mod tests { + use super::*; + use parameters::MRParameters; + use record_types::*; + use std::vec; + + #[test] + fn test_grouping_iterator() { + let records = vec![ + mk_rcrd("aaa", "def"), + mk_rcrd("abb", "111"), + mk_rcrd("Abb", "112"), + mk_rcrd("abbb", "113"), + mk_rcrd("abc", "xyz"), + mk_rcrd("xyz", "___"), + mk_rcrd("xyz", "__foo"), + mk_rcrd("xyz", "---")]; + let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> = RecordsToMultiRecords::new(records.into_iter(), MRParameters::new().set_reduce_group_opts(2, true)); + + let lengths = vec![1, 2, 1, 1, 3]; + let mut i = 0; + + for multirec in group_it { + assert_eq!(multirec.into_iter().count(), lengths[i]); + i += 1; + } + } + + #[test] + fn test_grouping_iterator_sensitive() { + let records = vec![ + mk_rcrd("aaa", "def"), + mk_rcrd("abb", "111"), + mk_rcrd("Abb", "112"), + mk_rcrd("abbb", "113"), + mk_rcrd("abc", "xyz"), + mk_rcrd("xyz", "___"), + mk_rcrd("xyz", "__foo"), + mk_rcrd("xyz", "---")]; + let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> = RecordsToMultiRecords::new(records.into_iter(), MRParameters::new().set_reduce_group_opts(2, false)); + + let lengths = vec![1, 1, 1, 1, 1, 3]; + let mut i = 0; + + for multirec in group_it { + assert_eq!(multirec.into_iter().count(), lengths[i]); + i += 1; + } + } +}