Mercurial > lbo > hg > localmr
changeset 4:0d7cfea10370
Add mapreducer interface and closure/funcptr implementation
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 30 Jan 2016 20:48:45 +0000 |
parents | 30ab78379fff |
children | 173f7aba48cf |
files | src/closure_mr.rs src/lib.rs src/mapreducer.rs |
diffstat | 3 files changed, 102 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/closure_mr.rs Sat Jan 30 20:48:45 2016 +0000 @@ -0,0 +1,31 @@ +use mapreducer::{ + MEmitter, + MapReducer, + MapperF, + MultiRecord, + REmitter, + Record, + ReducerF, +}; + +struct ClosureMapReducer { + mapper: MapperF, + reducer: ReducerF, +} + +impl Clone for ClosureMapReducer { + fn clone(&self) -> ClosureMapReducer { + ClosureMapReducer { mapper: self.mapper, reducer: self.reducer } + } +} + +impl ClosureMapReducer { + pub fn new(mapper: MapperF, reducer: ReducerF) -> ClosureMapReducer { + ClosureMapReducer { mapper: mapper, reducer: reducer } + } +} + +impl MapReducer for ClosureMapReducer { + fn map(&self, e: &mut MEmitter, r: Record) { (self.mapper)(e, r) } + fn reduce(&self, e: &mut REmitter, r: MultiRecord) { (self.reducer)(e, r) } +}
--- a/src/lib.rs Sat Jan 30 20:47:38 2016 +0000 +++ b/src/lib.rs Sat Jan 30 20:48:45 2016 +0000 @@ -1,4 +1,6 @@ pub mod formats; +pub mod mapreducer; +pub mod closure_mr; #[test] fn it_works() {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/mapreducer.rs Sat Jan 30 20:48:45 2016 +0000 @@ -0,0 +1,69 @@ +use std::collections::LinkedList; +use std::clone::Clone; + +pub struct Record { + pub key: String, + pub value: String, +} + +/// Input to a reducer function. +pub struct MultiRecord { + key: String, + value: Box<Iterator<Item=String>>, +} + +impl MultiRecord { + /// Retrieves the key of the record. + pub fn key<'a>(&'a self) -> &'a String { + &self.key + } +} + +impl IntoIterator for MultiRecord { + type Item = String; + type IntoIter = Box<Iterator<Item=String>>; + /// Allows iterating over all the values. + fn into_iter(self) -> Self::IntoIter { + self.value + } +} + +pub struct MEmitter { + r: LinkedList<Record>, +} + +impl MEmitter { + pub fn new() -> MEmitter { + MEmitter { r: LinkedList::new() } + } + pub fn emit(&mut self, key: String, val: String) { + self.r.push_back(Record { key: key, value: val }) + } + pub fn _get(self) -> LinkedList<Record> { self.r } +} + +pub struct REmitter { + r: LinkedList<String>, +} + +impl REmitter { + pub fn new() -> REmitter { + REmitter { r: LinkedList::new() } + } + pub fn emit(&mut self, val: String) { + self.r.push_back(val) + } + pub fn _get(self) -> LinkedList<String> { self.r } +} + +pub type MapperF = fn(&mut MEmitter, Record); +pub type ReducerF = fn(&mut REmitter, MultiRecord); + +/// A type implementing map() and reduce() functions. +pub trait MapReducer: Clone { + /// Takes one <key,value> pair and an emitter. + /// The emitter is used to yield results from the map phase. + fn map(&self, em: &mut MEmitter, record: Record); + fn reduce(&self, em: &mut REmitter, records: MultiRecord); +} +