Mercurial > lbo > hg > localmr
changeset 101:f119d37e3f39
Use static dispatch in Closure MR type
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Mon, 08 Feb 2016 19:24:26 +0000 |
parents | 97b70b7203a2 |
children | 0ba5088ebcb7 |
files | src/closure_mr.rs src/controller.rs src/formats/writelog.rs src/reduce.rs src/sort.rs |
diffstat | 5 files changed, 55 insertions(+), 19 deletions(-) [+] |
line wrap: on
line diff
--- a/src/closure_mr.rs Sun Feb 07 19:26:21 2016 +0000 +++ b/src/closure_mr.rs Mon Feb 08 19:24:26 2016 +0000 @@ -1,19 +1,27 @@ //! A MapReducer that uses supplied map()/reduce() functions. -use mapreducer::{MapReducer, MapperF, ReducerF, SharderF, _std_shard}; +use mapreducer::{MapReducer, SharderF, _std_shard}; use record_types::{Record, MultiRecord, MEmitter, REmitter}; /// This type implements the MapReducer trait. You can use it to provide your own functions to a /// MapReduce process. If you need more flexibility, however, you may want to simply implement your /// own type that fulfills MapReducer. -pub struct ClosureMapReducer { - mapper: MapperF, - reducer: ReducerF, - sharder: SharderF, +pub struct ClosureMapReducer<M, R, S> + where M: Fn(&mut MEmitter, Record) + Copy, + R: Fn(&mut REmitter, MultiRecord) + Copy, + S: Fn(usize, &String) -> usize + Copy +{ + mapper: M, + reducer: R, + sharder: S, } -impl Clone for ClosureMapReducer { - fn clone(&self) -> ClosureMapReducer { +impl<M, R, S> Clone for ClosureMapReducer<M, R, S> + where M: Fn(&mut MEmitter, Record) + Copy, + R: Fn(&mut REmitter, MultiRecord) + Copy, + S: Fn(usize, &String) -> usize + Copy +{ + fn clone(&self) -> ClosureMapReducer<M, R, S> { ClosureMapReducer { mapper: self.mapper, reducer: self.reducer, @@ -22,22 +30,37 @@ } } -impl ClosureMapReducer { +impl<M, R> ClosureMapReducer<M, R, fn(usize, &String) -> usize> + where M: Fn(&mut MEmitter, Record) + Copy, + R: Fn(&mut REmitter, MultiRecord) + Copy +{ /// Create a new MapReducer from the supplied functions. - pub fn new(mapper: MapperF, reducer: ReducerF) -> ClosureMapReducer { + pub fn new(mapper: M, reducer: R) -> ClosureMapReducer<M, R, SharderF> { + let f: fn(usize, &String) -> usize = _std_shard; ClosureMapReducer { mapper: mapper, reducer: reducer, - sharder: _std_shard, + sharder: f, } } +} + +impl<M, R, S> ClosureMapReducer<M, R, S> + where M: Fn(&mut MEmitter, Record) + Copy, + R: Fn(&mut REmitter, MultiRecord) + Copy, + S: Fn(usize, &String) -> usize + Copy +{ /// Set the function used for sharding. - pub fn set_sharder(&mut self, s: SharderF) { + pub fn set_sharder(&mut self, s: S) { self.sharder = s; } } -impl MapReducer for ClosureMapReducer { +impl<M, R, S> MapReducer for ClosureMapReducer<M, R, S> + where M: Fn(&mut MEmitter, Record) + Copy, + R: Fn(&mut REmitter, MultiRecord) + Copy, + S: Fn(usize, &String) -> usize + Copy +{ fn map(&self, e: &mut MEmitter, r: Record) { (self.mapper)(e, r) }
--- a/src/controller.rs Sun Feb 07 19:26:21 2016 +0000 +++ b/src/controller.rs Mon Feb 08 19:24:26 2016 +0000 @@ -150,7 +150,10 @@ if !self.params.keep_temp_files { for mpart in 0..self.map_partitions_run { for rshard in 0..self.params.reducers { - let name = fmt::format(format_args!("{}{}.{}", self.params.map_output_location, mpart, rshard)); + let name = fmt::format(format_args!("{}{}.{}", + self.params.map_output_location, + mpart, + rshard)); let _ = fs::remove_file(name); } }
--- a/src/formats/writelog.rs Sun Feb 07 19:26:21 2016 +0000 +++ b/src/formats/writelog.rs Mon Feb 08 19:24:26 2016 +0000 @@ -153,7 +153,9 @@ fs::OpenOptions::new() .read(true) .open(file) - .map(move |f| WriteLogReader::new(Box::new(io::BufReader::with_capacity(1024 * 1024, f)))) + .map(move |f| { + WriteLogReader::new(Box::new(io::BufReader::with_capacity(1024 * 1024, f))) + }) } /// Opens all files from a directory which end in suffix, and chains them together. @@ -176,7 +178,10 @@ println!("Error opening {:?}: {}", name, e); continue; } - Ok(f) => reader = Box::new(reader.chain(io::BufReader::with_capacity(1024 * 1024, f))), + Ok(f) => { + reader = Box::new(reader.chain(io::BufReader::with_capacity(1024 * 1024, + f))) + } } } } @@ -204,7 +209,7 @@ return Err(io::Error::new(io::ErrorKind::InvalidData, "Could not read enough data")); } else { - return Ok(0) + return Ok(0); } } else if off + s < len { off += s;
--- a/src/reduce.rs Sun Feb 07 19:26:21 2016 +0000 +++ b/src/reduce.rs Mon Feb 08 19:24:26 2016 +0000 @@ -210,7 +210,10 @@ let srcs = vec![get_records().into_iter()]; let dst = LinesSinkGenerator::new_to_files(); - let r = ReducePartition::new(mr, params, srcs, dst.new_output(&String::from("testdata/result_0"))); + let r = ReducePartition::new(mr, + params, + srcs, + dst.new_output(&String::from("testdata/result_0"))); r._run(); } }
--- a/src/sort.rs Sun Feb 07 19:26:21 2016 +0000 +++ b/src/sort.rs Mon Feb 08 19:24:26 2016 +0000 @@ -105,14 +105,16 @@ impl PartialOrd for DictComparableString { fn partial_cmp(&self, other: &DictComparableString) -> Option<Ordering> { - let (&DictComparableString::DCS(ref a), &DictComparableString::DCS(ref b)) = (self, other); + let (&DictComparableString::DCS(ref a), + &DictComparableString::DCS(ref b)) = (self, other); Some(dict_string_compare(a, b)) } } impl Ord for DictComparableString { fn cmp(&self, other: &DictComparableString) -> Ordering { - let (&DictComparableString::DCS(ref a), &DictComparableString::DCS(ref b)) = (self, other); + let (&DictComparableString::DCS(ref a), + &DictComparableString::DCS(ref b)) = (self, other); dict_string_compare(a, b) } }