view src/closure_mr.rs @ 101:f119d37e3f39

Use static dispatch in Closure MR type
author Lewin Bormann <lbo@spheniscida.de>
date Mon, 08 Feb 2016 19:24:26 +0000
parents ab7d19e012e0
children
line wrap: on
line source

//! A MapReducer that uses supplied map()/reduce() functions.

use mapreducer::{MapReducer, SharderF, _std_shard};
use record_types::{Record, MultiRecord, MEmitter, REmitter};

/// This type implements the MapReducer trait. You can use it to provide your own functions to a
/// MapReduce process. If you need more flexibility, however, you may want to simply implement your
/// own type that fulfills MapReducer.
pub struct ClosureMapReducer<M, R, S>
    where M: Fn(&mut MEmitter, Record) + Copy,
          R: Fn(&mut REmitter, MultiRecord) + Copy,
          S: Fn(usize, &String) -> usize + Copy
{
    mapper: M,
    reducer: R,
    sharder: S,
}

impl<M, R, S> Clone for ClosureMapReducer<M, R, S>
    where M: Fn(&mut MEmitter, Record) + Copy,
          R: Fn(&mut REmitter, MultiRecord) + Copy,
          S: Fn(usize, &String) -> usize + Copy
{
    fn clone(&self) -> ClosureMapReducer<M, R, S> {
        ClosureMapReducer {
            mapper: self.mapper,
            reducer: self.reducer,
            sharder: self.sharder,
        }
    }
}

impl<M, R> ClosureMapReducer<M, R, fn(usize, &String) -> usize>
    where M: Fn(&mut MEmitter, Record) + Copy,
          R: Fn(&mut REmitter, MultiRecord) + Copy
{
    /// Create a new MapReducer from the supplied functions.
    pub fn new(mapper: M, reducer: R) -> ClosureMapReducer<M, R, SharderF> {
        let f: fn(usize, &String) -> usize = _std_shard;
        ClosureMapReducer {
            mapper: mapper,
            reducer: reducer,
            sharder: f,
        }
    }
}

impl<M, R, S> ClosureMapReducer<M, R, S>
    where M: Fn(&mut MEmitter, Record) + Copy,
          R: Fn(&mut REmitter, MultiRecord) + Copy,
          S: Fn(usize, &String) -> usize + Copy
{
    /// Set the function used for sharding.
    pub fn set_sharder(&mut self, s: S) {
        self.sharder = s;
    }
}

impl<M, R, S> MapReducer for ClosureMapReducer<M, R, S>
    where M: Fn(&mut MEmitter, Record) + Copy,
          R: Fn(&mut REmitter, MultiRecord) + Copy,
          S: Fn(usize, &String) -> usize + Copy
{
    fn map(&self, e: &mut MEmitter, r: Record) {
        (self.mapper)(e, r)
    }
    fn reduce(&self, e: &mut REmitter, r: MultiRecord) {
        (self.reducer)(e, r)
    }
    fn shard(&self, n: usize, k: &String) -> usize {
        (self.sharder)(n, k)
    }
}