Mercurial > lbo > hg > localmr
changeset 35:fcb3ee662ee7
Add custom sharding functions to the Closure MR implementation
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 31 Jan 2016 19:43:53 +0000 |
parents | eca9d24b105f |
children | e80e00619c81 |
files | src/closure_mr.rs src/mapreducer.rs |
diffstat | 2 files changed, 27 insertions(+), 5 deletions(-) [+] |
line wrap: on
line diff
--- a/src/closure_mr.rs Sun Jan 31 19:33:36 2016 +0000 +++ b/src/closure_mr.rs Sun Jan 31 19:43:53 2016 +0000 @@ -1,11 +1,15 @@ //! A MapReducer that uses supplied map()/reduce() functions. -use mapreducer::{MEmitter, MapReducer, MapperF, MultiRecord, REmitter, Record, ReducerF}; +use mapreducer::{MEmitter, MapReducer, MapperF, MultiRecord, REmitter, Record, ReducerF, SharderF, + _std_shard}; -/// Use your functions in a MapReduce (instead of implementing your own mapreducer) +/// This type implements the MapReducer trait. You can use it to provide your own functions to a +/// MapReduce process. If you need more flexibility, however, you may want to simply implement your +/// own type that fulfills MapReducer. pub struct ClosureMapReducer { mapper: MapperF, reducer: ReducerF, + sharder: SharderF, } impl Clone for ClosureMapReducer { @@ -13,6 +17,7 @@ ClosureMapReducer { mapper: self.mapper, reducer: self.reducer, + sharder: self.sharder, } } } @@ -23,8 +28,13 @@ ClosureMapReducer { mapper: mapper, reducer: reducer, + sharder: _std_shard, } } + /// Set the function used for sharding. + pub fn set_sharder(&mut self, s: SharderF) { + self.sharder = s; + } } impl MapReducer for ClosureMapReducer { @@ -34,4 +44,7 @@ fn reduce(&self, e: &mut REmitter, r: MultiRecord) { (self.reducer)(e, r) } + fn shard(&self, n: usize, k: &String) -> usize { + (self.sharder)(n, k) + } }
--- a/src/mapreducer.rs Sun Jan 31 19:33:36 2016 +0000 +++ b/src/mapreducer.rs Sun Jan 31 19:43:53 2016 +0000 @@ -70,12 +70,23 @@ } } +/// Default sharding function. +pub fn _std_shard(n: usize, key: &String) -> usize { + let mut h = SipHasher::new(); + h.write(key.as_bytes()); + h.finish() as usize % n +} + /// Map() function type. The MEmitter argument is used to emit values from /// the map() function. pub type MapperF = fn(&mut MEmitter, Record); /// Reduce() function type. The REmitter argument is used to emit values /// from the reduce() function. pub type ReducerF = fn(&mut REmitter, MultiRecord); +/// A function used to determine the shard a key belongs in. +/// The first argument is the number of shards, the second one the key; +/// the return value should be in [0; n). +pub type SharderF = fn(usize, &String) -> usize; /// A type implementing map() and reduce() functions. /// The MapReducer is cloned once per mapper/reducer thread. @@ -91,8 +102,6 @@ /// Returns a number in [0; n) determining the shard the key belongs in. /// The default implementation uses a simple hash (SipHasher) and modulo. fn shard(&self, n: usize, key: &String) -> usize { - let mut h = SipHasher::new(); - h.write(key.as_bytes()); - h.finish() as usize % n + _std_shard(n, key) } }