Mercurial > lbo > hg > localmr
changeset 88:f35dc4bc24eb
rust fmt
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 07 Feb 2016 15:14:32 +0000 |
parents | 168e8777de73 |
children | dc8d7e8f4ba1 |
files | src/formats/writelog.rs src/reduce.rs |
diffstat | 2 files changed, 32 insertions(+), 19 deletions(-) [+] |
line wrap: on
line diff
--- a/src/formats/writelog.rs Sun Feb 07 15:14:22 2016 +0000 +++ b/src/formats/writelog.rs Sun Feb 07 15:14:32 2016 +0000 @@ -141,7 +141,7 @@ } impl WriteLogReader { - pub fn new(src: Box<Read+Send>) -> WriteLogReader { + pub fn new(src: Box<Read + Send>) -> WriteLogReader { WriteLogReader { src: src, records_read: 0,
--- a/src/reduce.rs Sun Feb 07 15:14:22 2016 +0000 +++ b/src/reduce.rs Sun Feb 07 15:14:32 2016 +0000 @@ -9,10 +9,7 @@ use record_types::{Record, MultiRecord, REmitter}; use shard_merge::ShardMergeIterator; -pub struct ReducePartition<MR: MapReducer, - InputIt: Iterator<Item = Record>, - Sink: io::Write> -{ +pub struct ReducePartition<MR: MapReducer, InputIt: Iterator<Item = Record>, Sink: io::Write> { mr: MR, params: MRParameters, // Maybe we want to genericize this to an Iterator<Item=Read> or so? This defers opening @@ -21,19 +18,31 @@ dstfile: Sink, } -impl<MR: MapReducer, InputIt: Iterator<Item=Record>, Sink: io::Write> ReducePartition<MR, InputIt, Sink> { -/// Create a new Reduce partition for the given MR; source and destination I/O. -/// mr is the map/reduce functions. -/// params is generic MR parameters as well as some applying directly to this reduce partition. -/// srcs is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's -/// outputs. -/// dstfiles is a Sink (as known from the mapping phase) that is used to create the output -/// file (there is one output file per reduce partition, currently). - pub fn new(mr: MR, params: MRParameters, srcs: Vec<InputIt>, outp: Sink) -> ReducePartition<MR, InputIt, Sink> { - ReducePartition { mr: mr, params: params, srcs: srcs, dstfile: outp} +impl<MR: MapReducer, InputIt: Iterator<Item = Record>, Sink: io::Write> ReducePartition<MR, + InputIt, + Sink> + { + /// Create a new Reduce partition for the given MR; source and destination I/O. + /// mr is the map/reduce functions. + /// params is generic MR parameters as well as some applying directly to this reduce partition. + /// srcs is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's + /// outputs. + /// dstfiles is a Sink (as known from the mapping phase) that is used to create the output + /// file (there is one output file per reduce partition, currently). + pub fn new(mr: MR, + params: MRParameters, + srcs: Vec<InputIt>, + outp: Sink) + -> ReducePartition<MR, InputIt, Sink> { + ReducePartition { + mr: mr, + params: params, + srcs: srcs, + dstfile: outp, + } } -/// Run the Reduce partition. + /// Run the Reduce partition. pub fn _run(mut self) { let mut inputs = Vec::new(); inputs.append(&mut self.srcs); @@ -44,7 +53,7 @@ self.reduce(RecordsToMultiRecords::new(ShardMergeIterator::build(&mut it), params)) } - fn reduce<RecIt: Iterator<Item=Record>>(mut self, inp: RecordsToMultiRecords<RecIt>) { + fn reduce<RecIt: Iterator<Item = Record>>(mut self, inp: RecordsToMultiRecords<RecIt>) { use std::io::Write; for multirec in inp { @@ -53,8 +62,12 @@ for result in emitter._get().into_iter() { match self.dstfile.write(result.as_bytes()) { - Err(e) => println!("WARN: While reducing shard #{}: {}", self.params.shard_id, e), - Ok(_) => () + Err(e) => { + println!("WARN: While reducing shard #{}: {}", + self.params.shard_id, + e) + } + Ok(_) => (), } } }