Mercurial > lbo > hg > localmr
changeset 81:36ca534fa940
Cosmetics in reduce.rs
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 07 Feb 2016 09:52:50 +0000 |
parents | fe31ac3158c3 |
children | 0557a5c0d28d |
files | src/reduce.rs |
diffstat | 1 files changed, 17 insertions(+), 13 deletions(-) [+] |
line wrap: on
line diff
--- a/src/reduce.rs Sun Feb 07 09:51:18 2016 +0000 +++ b/src/reduce.rs Sun Feb 07 09:52:50 2016 +0000 @@ -9,12 +9,15 @@ use record_types::{Record, MultiRecord, REmitter}; use shard_merge::ShardMergeIterator; -struct ReducePartition<MR: MapReducer, InputIt: Iterator<Item = Record>, SinkGen: MRSinkGenerator> { +pub struct ReducePartition<MR: MapReducer, + InputIt: Iterator<Item = Record>, + SinkGen: MRSinkGenerator> +{ mr: MR, params: MRParameters, // Maybe we want to genericize this to an Iterator<Item=Read> or so? This defers opening // the files to the reduce shard itself. - srcfiles: Vec<InputIt>, + srcs: Vec<InputIt>, dstfilegen: SinkGen, } @@ -22,18 +25,18 @@ /// Create a new Reduce partition for the given MR; source and destination I/O. /// mr is the map/reduce functions. /// params is generic MR parameters as well as some applying directly to this reduce partition. -/// srcfiles is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's +/// srcs is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's /// outputs. /// dstfiles is a SinkGen (as known from the mapping phase) that is used to create the output /// file (there is one output file per reduce partition, currently). - pub fn new(mr: MR, params: MRParameters, srcfiles: Vec<InputIt>, dstfiles: SinkGen) -> ReducePartition<MR, InputIt, SinkGen> { - ReducePartition { mr: mr, params: params, srcfiles: srcfiles, dstfilegen: dstfiles } + pub fn new(mr: MR, params: MRParameters, srcs: Vec<InputIt>, dstfiles: SinkGen) -> ReducePartition<MR, InputIt, SinkGen> { + ReducePartition { mr: mr, params: params, srcs: srcs, dstfilegen: dstfiles } } /// Run the Reduce partition. pub fn _run(mut self) { let mut inputs = Vec::new(); - inputs.append(&mut self.srcfiles); + inputs.append(&mut self.srcs); let mut it = inputs.into_iter(); let params = self.params.clone(); @@ -72,14 +75,14 @@ /// identical items are adjacent). pub struct RecordsToMultiRecords<It: Iterator<Item = Record>> { it: Peekable<It>, - settings: MRParameters, + params: MRParameters, } impl<It: Iterator<Item = Record>> RecordsToMultiRecords<It> { - fn new(it: It, settings: MRParameters) -> RecordsToMultiRecords<It> { + fn new(it: It, params: MRParameters) -> RecordsToMultiRecords<It> { RecordsToMultiRecords { it: it.peekable(), - settings: settings, + params: params, } } } @@ -88,12 +91,12 @@ type Item = MultiRecord; fn next(&mut self) -> Option<Self::Item> { use std::ascii::AsciiExt; - let mut collection = Vec::with_capacity(self.settings.reduce_group_prealloc_size); + let mut collection = Vec::with_capacity(self.params.reduce_group_prealloc_size); let key: String; match self.it.next() { None => return None, Some(r) => { - if self.settings.reduce_group_insensitive { + if self.params.reduce_group_insensitive { key = r.key[..].to_ascii_lowercase(); } else { key = r.key @@ -105,9 +108,9 @@ match self.it.peek() { None => break, Some(r) => { - if !self.settings.reduce_group_insensitive && r.key != key { + if !self.params.reduce_group_insensitive && r.key != key { break; - } else if self.settings.reduce_group_insensitive && + } else if self.params.reduce_group_insensitive && r.key[..].to_ascii_lowercase() != key { break; } @@ -124,6 +127,7 @@ use super::*; use parameters::MRParameters; use record_types::*; + use std::vec; fn get_records() -> Vec<Record> {