changeset 101:f119d37e3f39

Use static dispatch in Closure MR type
author Lewin Bormann <lbo@spheniscida.de>
date Mon, 08 Feb 2016 19:24:26 +0000
parents 97b70b7203a2
children 0ba5088ebcb7
files src/closure_mr.rs src/controller.rs src/formats/writelog.rs src/reduce.rs src/sort.rs
diffstat 5 files changed, 55 insertions(+), 19 deletions(-) [+]
line wrap: on
line diff
--- a/src/closure_mr.rs	Sun Feb 07 19:26:21 2016 +0000
+++ b/src/closure_mr.rs	Mon Feb 08 19:24:26 2016 +0000
@@ -1,19 +1,27 @@
 //! A MapReducer that uses supplied map()/reduce() functions.
 
-use mapreducer::{MapReducer, MapperF, ReducerF, SharderF, _std_shard};
+use mapreducer::{MapReducer, SharderF, _std_shard};
 use record_types::{Record, MultiRecord, MEmitter, REmitter};
 
 /// This type implements the MapReducer trait. You can use it to provide your own functions to a
 /// MapReduce process. If you need more flexibility, however, you may want to simply implement your
 /// own type that fulfills MapReducer.
-pub struct ClosureMapReducer {
-    mapper: MapperF,
-    reducer: ReducerF,
-    sharder: SharderF,
+pub struct ClosureMapReducer<M, R, S>
+    where M: Fn(&mut MEmitter, Record) + Copy,
+          R: Fn(&mut REmitter, MultiRecord) + Copy,
+          S: Fn(usize, &String) -> usize + Copy
+{
+    mapper: M,
+    reducer: R,
+    sharder: S,
 }
 
-impl Clone for ClosureMapReducer {
-    fn clone(&self) -> ClosureMapReducer {
+impl<M, R, S> Clone for ClosureMapReducer<M, R, S>
+    where M: Fn(&mut MEmitter, Record) + Copy,
+          R: Fn(&mut REmitter, MultiRecord) + Copy,
+          S: Fn(usize, &String) -> usize + Copy
+{
+    fn clone(&self) -> ClosureMapReducer<M, R, S> {
         ClosureMapReducer {
             mapper: self.mapper,
             reducer: self.reducer,
@@ -22,22 +30,37 @@
     }
 }
 
-impl ClosureMapReducer {
+impl<M, R> ClosureMapReducer<M, R, fn(usize, &String) -> usize>
+    where M: Fn(&mut MEmitter, Record) + Copy,
+          R: Fn(&mut REmitter, MultiRecord) + Copy
+{
     /// Create a new MapReducer from the supplied functions.
-    pub fn new(mapper: MapperF, reducer: ReducerF) -> ClosureMapReducer {
+    pub fn new(mapper: M, reducer: R) -> ClosureMapReducer<M, R, SharderF> {
+        let f: fn(usize, &String) -> usize = _std_shard;
         ClosureMapReducer {
             mapper: mapper,
             reducer: reducer,
-            sharder: _std_shard,
+            sharder: f,
         }
     }
+}
+
+impl<M, R, S> ClosureMapReducer<M, R, S>
+    where M: Fn(&mut MEmitter, Record) + Copy,
+          R: Fn(&mut REmitter, MultiRecord) + Copy,
+          S: Fn(usize, &String) -> usize + Copy
+{
     /// Set the function used for sharding.
-    pub fn set_sharder(&mut self, s: SharderF) {
+    pub fn set_sharder(&mut self, s: S) {
         self.sharder = s;
     }
 }
 
-impl MapReducer for ClosureMapReducer {
+impl<M, R, S> MapReducer for ClosureMapReducer<M, R, S>
+    where M: Fn(&mut MEmitter, Record) + Copy,
+          R: Fn(&mut REmitter, MultiRecord) + Copy,
+          S: Fn(usize, &String) -> usize + Copy
+{
     fn map(&self, e: &mut MEmitter, r: Record) {
         (self.mapper)(e, r)
     }
--- a/src/controller.rs	Sun Feb 07 19:26:21 2016 +0000
+++ b/src/controller.rs	Mon Feb 08 19:24:26 2016 +0000
@@ -150,7 +150,10 @@
         if !self.params.keep_temp_files {
             for mpart in 0..self.map_partitions_run {
                 for rshard in 0..self.params.reducers {
-                    let name = fmt::format(format_args!("{}{}.{}", self.params.map_output_location, mpart, rshard));
+                    let name = fmt::format(format_args!("{}{}.{}",
+                                                        self.params.map_output_location,
+                                                        mpart,
+                                                        rshard));
                     let _ = fs::remove_file(name);
                 }
             }
--- a/src/formats/writelog.rs	Sun Feb 07 19:26:21 2016 +0000
+++ b/src/formats/writelog.rs	Mon Feb 08 19:24:26 2016 +0000
@@ -153,7 +153,9 @@
         fs::OpenOptions::new()
             .read(true)
             .open(file)
-            .map(move |f| WriteLogReader::new(Box::new(io::BufReader::with_capacity(1024 * 1024, f))))
+            .map(move |f| {
+                WriteLogReader::new(Box::new(io::BufReader::with_capacity(1024 * 1024, f)))
+            })
     }
 
     /// Opens all files from a directory which end in suffix, and chains them together.
@@ -176,7 +178,10 @@
                         println!("Error opening {:?}: {}", name, e);
                         continue;
                     }
-                    Ok(f) => reader = Box::new(reader.chain(io::BufReader::with_capacity(1024 * 1024, f))),
+                    Ok(f) => {
+                        reader = Box::new(reader.chain(io::BufReader::with_capacity(1024 * 1024,
+                                                                                    f)))
+                    }
                 }
             }
         }
@@ -204,7 +209,7 @@
                             return Err(io::Error::new(io::ErrorKind::InvalidData,
                                                       "Could not read enough data"));
                         } else {
-                            return Ok(0)
+                            return Ok(0);
                         }
                     } else if off + s < len {
                         off += s;
--- a/src/reduce.rs	Sun Feb 07 19:26:21 2016 +0000
+++ b/src/reduce.rs	Mon Feb 08 19:24:26 2016 +0000
@@ -210,7 +210,10 @@
         let srcs = vec![get_records().into_iter()];
         let dst = LinesSinkGenerator::new_to_files();
 
-        let r = ReducePartition::new(mr, params, srcs, dst.new_output(&String::from("testdata/result_0")));
+        let r = ReducePartition::new(mr,
+                                     params,
+                                     srcs,
+                                     dst.new_output(&String::from("testdata/result_0")));
         r._run();
     }
 }
--- a/src/sort.rs	Sun Feb 07 19:26:21 2016 +0000
+++ b/src/sort.rs	Mon Feb 08 19:24:26 2016 +0000
@@ -105,14 +105,16 @@
 
 impl PartialOrd for DictComparableString {
     fn partial_cmp(&self, other: &DictComparableString) -> Option<Ordering> {
-        let (&DictComparableString::DCS(ref a), &DictComparableString::DCS(ref b)) = (self, other);
+        let (&DictComparableString::DCS(ref a),
+             &DictComparableString::DCS(ref b)) = (self, other);
         Some(dict_string_compare(a, b))
     }
 }
 
 impl Ord for DictComparableString {
     fn cmp(&self, other: &DictComparableString) -> Ordering {
-        let (&DictComparableString::DCS(ref a), &DictComparableString::DCS(ref b)) = (self, other);
+        let (&DictComparableString::DCS(ref a),
+             &DictComparableString::DCS(ref b)) = (self, other);
         dict_string_compare(a, b)
     }
 }