changeset 35:fcb3ee662ee7

Add custom sharding functions to the Closure MR implementation
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 31 Jan 2016 19:43:53 +0000
parents eca9d24b105f
children e80e00619c81
files src/closure_mr.rs src/mapreducer.rs
diffstat 2 files changed, 27 insertions(+), 5 deletions(-) [+]
line wrap: on
line diff
--- a/src/closure_mr.rs	Sun Jan 31 19:33:36 2016 +0000
+++ b/src/closure_mr.rs	Sun Jan 31 19:43:53 2016 +0000
@@ -1,11 +1,15 @@
 //! A MapReducer that uses supplied map()/reduce() functions.
 
-use mapreducer::{MEmitter, MapReducer, MapperF, MultiRecord, REmitter, Record, ReducerF};
+use mapreducer::{MEmitter, MapReducer, MapperF, MultiRecord, REmitter, Record, ReducerF, SharderF,
+                 _std_shard};
 
-/// Use your functions in a MapReduce (instead of implementing your own mapreducer)
+/// This type implements the MapReducer trait. You can use it to provide your own functions to a
+/// MapReduce process. If you need more flexibility, however, you may want to simply implement your
+/// own type that fulfills MapReducer.
 pub struct ClosureMapReducer {
     mapper: MapperF,
     reducer: ReducerF,
+    sharder: SharderF,
 }
 
 impl Clone for ClosureMapReducer {
@@ -13,6 +17,7 @@
         ClosureMapReducer {
             mapper: self.mapper,
             reducer: self.reducer,
+            sharder: self.sharder,
         }
     }
 }
@@ -23,8 +28,13 @@
         ClosureMapReducer {
             mapper: mapper,
             reducer: reducer,
+            sharder: _std_shard,
         }
     }
+    /// Set the function used for sharding.
+    pub fn set_sharder(&mut self, s: SharderF) {
+        self.sharder = s;
+    }
 }
 
 impl MapReducer for ClosureMapReducer {
@@ -34,4 +44,7 @@
     fn reduce(&self, e: &mut REmitter, r: MultiRecord) {
         (self.reducer)(e, r)
     }
+    fn shard(&self, n: usize, k: &String) -> usize {
+        (self.sharder)(n, k)
+    }
 }
--- a/src/mapreducer.rs	Sun Jan 31 19:33:36 2016 +0000
+++ b/src/mapreducer.rs	Sun Jan 31 19:43:53 2016 +0000
@@ -70,12 +70,23 @@
     }
 }
 
+/// Default sharding function.
+pub fn _std_shard(n: usize, key: &String) -> usize {
+    let mut h = SipHasher::new();
+    h.write(key.as_bytes());
+    h.finish() as usize % n
+}
+
 /// Map() function type. The MEmitter argument is used to emit values from
 /// the map() function.
 pub type MapperF = fn(&mut MEmitter, Record);
 /// Reduce() function type. The REmitter argument is used to emit values
 /// from the reduce() function.
 pub type ReducerF = fn(&mut REmitter, MultiRecord);
+/// A function used to determine the shard a key belongs in.
+/// The first argument is the number of shards, the second one the key;
+/// the return value should be in [0; n).
+pub type SharderF = fn(usize, &String) -> usize;
 
 /// A type implementing map() and reduce() functions.
 /// The MapReducer is cloned once per mapper/reducer thread.
@@ -91,8 +102,6 @@
     /// Returns a number in [0; n) determining the shard the key belongs in.
     /// The default implementation uses a simple hash (SipHasher) and modulo.
     fn shard(&self, n: usize, key: &String) -> usize {
-        let mut h = SipHasher::new();
-        h.write(key.as_bytes());
-        h.finish() as usize % n
+        _std_shard(n, key)
     }
 }