Mercurial > lbo > hg > localmr
changeset 71:2a58864abf08
Implement actual reducing
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 06 Feb 2016 18:37:22 +0000 |
parents | a9f8ff5da6e8 |
children | 98b6d1e37f6a |
files | src/reduce.rs |
diffstat | 1 files changed, 35 insertions(+), 20 deletions(-) [+] |
line wrap: on
line diff
--- a/src/reduce.rs Sat Feb 06 18:37:00 2016 +0000 +++ b/src/reduce.rs Sat Feb 06 18:37:22 2016 +0000 @@ -30,19 +30,38 @@ /// Run the Reduce partition. pub fn _run(mut self) { - let sorted_input = self.open_sorted_input(); - // reduce input and write results. - } - - /// Create an iterator that merges all input sources. Leaves self.srcfiles empty. - fn open_sorted_input(&mut self) -> ShardMergeIterator<Record> { let mut inputs = Vec::new(); inputs.append(&mut self.srcfiles); let mut it = inputs.into_iter(); - ShardMergeIterator::build(&mut it) + let params = self.params.clone(); + + self.reduce(RecordsToMultiRecords::new(ShardMergeIterator::build(&mut it), params)) + } + + fn get_output_name(&self) -> String { + use std::fmt; + let mut name = String::new(); + name.push_str(&self.params.reduce_output_shard_prefix[..]); + name.push_str(&fmt::format(format_args!("{}", self.params.shard_id))[..]); + name } + fn reduce<RecIt: Iterator<Item=Record>>(mut self, inp: RecordsToMultiRecords<RecIt>) { + use std::io::Write; + + let name = self.get_output_name(); + let mut outp = self.dstfilegen.new_output(&name); + + for multirec in inp { + let mut emitter = REmitter::new(); + self.mr.reduce(&mut emitter, multirec); + + for result in emitter._get().into_iter() { + outp.write(result.as_bytes()); + } + } + } } use std::iter::Peekable; @@ -106,9 +125,8 @@ use record_types::*; use std::vec; - #[test] - fn test_grouping_iterator() { - let records = vec![ + fn get_records() -> Vec<Record>{ + vec![ mk_rcrd("aaa", "def"), mk_rcrd("abb", "111"), mk_rcrd("Abb", "112"), @@ -116,7 +134,12 @@ mk_rcrd("abc", "xyz"), mk_rcrd("xyz", "___"), mk_rcrd("xyz", "__foo"), - mk_rcrd("xyz", "---")]; + mk_rcrd("xyz", "---")] + } + + #[test] + fn test_grouping_iterator() { + let records = get_records(); let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> = RecordsToMultiRecords::new(records.into_iter(), MRParameters::new().set_reduce_group_opts(2, true)); let lengths = vec![1, 2, 1, 1, 3]; @@ -130,15 +153,7 @@ #[test] fn test_grouping_iterator_sensitive() { - let records = vec![ - mk_rcrd("aaa", "def"), - mk_rcrd("abb", "111"), - mk_rcrd("Abb", "112"), - mk_rcrd("abbb", "113"), - mk_rcrd("abc", "xyz"), - mk_rcrd("xyz", "___"), - mk_rcrd("xyz", "__foo"), - mk_rcrd("xyz", "---")]; + let records = get_records(); let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> = RecordsToMultiRecords::new(records.into_iter(), MRParameters::new().set_reduce_group_opts(2, false)); let lengths = vec![1, 1, 1, 1, 1, 3];