changeset 107:500571302783

Mega commit: Split up MR into M/R/S; more doc; mutable shard receiver
author Lewin Bormann <lbo@spheniscida.de>
date Wed, 15 Jun 2016 19:27:29 +0200
parents f4de6139bf2d
children 27afb9528618
files src/closure_mr.rs src/controller.rs src/formats/writelog.rs src/lib.rs src/map.rs src/mapreducer.rs src/parameters.rs src/phases/map.rs src/phases/mod.rs src/phases/reduce.rs src/reduce.rs src/shard_merge.rs src/sort.rs
diffstat 13 files changed, 528 insertions(+), 494 deletions(-) [+]
line wrap: on
line diff
--- a/src/closure_mr.rs	Wed Jun 15 18:54:43 2016 +0200
+++ b/src/closure_mr.rs	Wed Jun 15 19:27:29 2016 +0200
@@ -1,6 +1,6 @@
 //! A MapReducer that uses supplied map()/reduce() functions.
 
-use mapreducer::{MapReducer, MapperF, ReducerF, SharderF, _std_shard};
+use mapreducer::{Mapper, Reducer, Sharder, MapperF, ReducerF, SharderF, _std_shard};
 use record_types::{Record, MultiRecord, MEmitter, REmitter};
 
 /// This type implements the MapReducer trait. You can use it to provide your own functions to a
@@ -37,14 +37,18 @@
     }
 }
 
-impl MapReducer for ClosureMapReducer {
-    fn map(&self, e: &mut MEmitter, r: Record) {
+impl Mapper for ClosureMapReducer {
+    fn map(&mut self, e: &mut MEmitter, r: Record) {
         (self.mapper)(e, r)
     }
-    fn reduce(&self, e: &mut REmitter, r: MultiRecord) {
+}
+impl Reducer for ClosureMapReducer {
+    fn reduce(&mut self, e: &mut REmitter, r: MultiRecord) {
         (self.reducer)(e, r)
     }
-    fn shard(&self, n: usize, k: &String) -> usize {
+}
+impl Sharder for ClosureMapReducer {
+    fn shard(&mut self, n: usize, k: &String) -> usize {
         (self.sharder)(n, k)
     }
 }
--- a/src/controller.rs	Wed Jun 15 18:54:43 2016 +0200
+++ b/src/controller.rs	Wed Jun 15 19:27:29 2016 +0200
@@ -3,20 +3,22 @@
 use formats::util::{SinkGenerator, RecordReadIterator};
 use formats::writelog::{WriteLogGenerator, WriteLogReader};
 use input_cache::InputCache;
-use map::MapPartition;
-use mapreducer::MapReducer;
+use phases::map::MapPartition;
+use mapreducer::{Mapper, Reducer, Sharder};
 use parameters::MRParameters;
 use record_types::Record;
-use reduce::ReducePartition;
+use phases::reduce::ReducePartition;
 
 use std::sync::mpsc::sync_channel;
 
 extern crate scoped_threadpool;
 use self::scoped_threadpool::Pool;
 
-pub struct MRController<MR: MapReducer> {
+pub struct MRController<M: Mapper, R: Reducer, S: Sharder> {
     params: MRParameters,
-    mr: MR,
+    m: M,
+    r: R,
+    s: S,
 
     // How many map partitions have been run?
     map_partitions_run: usize,
@@ -47,15 +49,21 @@
 }
 
 
-impl<MR: MapReducer + Send> MRController<MR> {
+impl<M: Mapper, R: Reducer, S: Sharder> MRController<M, R, S> {
     /// Create a new mapreduce instance and execute it immediately.
-    pub fn run<In: Iterator<Item = Record>, Out: SinkGenerator>(mr: MR,
+    ///
+    /// You can use `DefaultSharder` as `sharder` argument.
+    pub fn run<In: Iterator<Item = Record>, Out: SinkGenerator>(mapper: M,
+                                                                reducer: R,
+                                                                sharder: S,
                                                                 params: MRParameters,
                                                                 inp: In,
                                                                 out: Out) {
         let mut controller = MRController {
             params: params,
-            mr: mr,
+            m: mapper,
+            r: reducer,
+            s: sharder,
             map_partitions_run: 0,
         };
         controller.run_map(inp);
@@ -63,23 +71,10 @@
         controller.clean_up();
     }
 
-    fn map_runner(mr: MR, params: MRParameters, inp: InputCache) {
-        if inp.len() == 0 {
-            return;
-        }
-        let intermed_out = WriteLogGenerator::new();
-        let map_part = MapPartition::_new(params, inp, mr, intermed_out);
-        map_part._run();
-    }
-
-    fn read_map_input<In: Iterator<Item = Record>>(it: &mut In, approx_bytes: usize) -> InputCache {
-
-        let inp_cache = InputCache::from_iter(8192, approx_bytes, it);
-        inp_cache
-    }
-
     fn run_map<In: Iterator<Item = Record>>(&mut self, mut input: In) {
         let mut pool = Pool::new(self.params.mappers as u32);
+        // Create channels for worker synchronization; this ensures that there are only as many
+        // mapper threads running as specified.
         let (send, recv) = sync_channel(self.params.mappers);
 
         for _ in 0..self.params.mappers {
@@ -90,9 +85,12 @@
             loop {
                 let _ = recv.recv();
 
-                let mr = self.mr.clone();
-                let inp = MRController::<MR>::read_map_input(&mut input,
-                                                             self.params.map_partition_size);
+                let m = self.m.clone();
+                let s = self.s.clone();
+                // Can't necessarily send the input handle to the mapper thread, therefore read
+                // input before spawn.
+                let inp = MRController::<M, R, S>::read_map_input(&mut input,
+                                                                  self.params.map_partition_size);
 
                 if inp.len() == 0 {
                     break;
@@ -102,7 +100,7 @@
                 let done = send.clone();
 
                 scope.execute(move || {
-                    MRController::map_runner(mr, params, inp);
+                    MRController::<M, R, S>::map_runner(m, s, params, inp);
                     let _ = done.send(true);
                 });
                 self.map_partitions_run += 1;
@@ -112,12 +110,28 @@
         });
     }
 
+    fn map_runner(mapper: M, sharder: S, params: MRParameters, inp: InputCache) {
+        if inp.len() == 0 {
+            return;
+        }
+        let intermed_out = WriteLogGenerator::new();
+        let map_part = MapPartition::_new(params, inp, mapper, sharder, intermed_out);
+        map_part._run();
+    }
+
+    fn read_map_input<In: Iterator<Item = Record>>(it: &mut In, approx_bytes: usize) -> InputCache {
+
+        let inp_cache = InputCache::from_iter(8192, approx_bytes, it);
+        inp_cache
+    }
+
+
     fn run_reduce<Out: SinkGenerator>(&self, outp: Out) {
         let mut pool = Pool::new(self.params.reducers as u32);
 
         pool.scoped(move |scope| {
             for i in 0..self.params.reducers {
-                let mr = self.mr.clone();
+                let r = self.r.clone();
                 let params = self.params.clone().set_shard_id(i);
                 let map_partitions = self.map_partitions_run;
                 let output = outp.clone();
@@ -125,7 +139,7 @@
                 scope.execute(move || {
                     let inputs = open_reduce_inputs(&params, map_partitions, i);
                     let output = output.new_output(&get_reduce_output_name(&params));
-                    let reduce_part = ReducePartition::new(mr, params, inputs, output);
+                    let reduce_part = ReducePartition::new(r, params, inputs, output);
                     reduce_part._run();
                 });
             }
--- a/src/formats/writelog.rs	Wed Jun 15 18:54:43 2016 +0200
+++ b/src/formats/writelog.rs	Wed Jun 15 19:27:29 2016 +0200
@@ -16,15 +16,15 @@
 /// which was to write a log of all write operations to a database.
 ///
 /// # WriteLog
-/// 
+///
 /// WriteLog is a persistent data structure designed to be written to disk
 /// that is a sequence of bytestring.
 /// It can be read back in relatively efficiently and yields the same byte
 /// strings; on disk, it is represented as records prefixed by 4 byte
 /// big-endian length prefixes: `llllbbbbbbllllbbllllbbbbbbbbb...`
-/// 
+///
 /// Where l is a length byte and b are bytes of a bytestring.
-/// 
+///
 /// There is a special case of WriteLogs: The length-prefixing can be turned
 /// off in order to yield a better efficiency when encoding PCK files. Those
 /// files are indexed by IDX files describing offset and length of single entries,
@@ -88,8 +88,8 @@
         // BUG: May not account the length in a correct way if the length prefix
         // is written, but not the record.
         let result = self.dest
-                         .write(&encode_u32(buf.len() as u32)[0..4])
-                         .and(self.dest.write(buf));
+            .write(&encode_u32(buf.len() as u32)[0..4])
+            .and(self.dest.write(buf));
         match result {
             Err(_) => result,
             Ok(_) => {
@@ -179,8 +179,8 @@
                         continue;
                     }
                     Ok(f) => {
-                        reader = Box::new(reader.chain(io::BufReader::with_capacity(1024 * 1024,
-                                                                                    f)))
+                        reader =
+                            Box::new(reader.chain(io::BufReader::with_capacity(1024 * 1024, f)))
                     }
                 }
             }
@@ -379,8 +379,8 @@
 
     fn bench_a_writing() {
         let buf: vec::Vec<u8> = "aaabbbcccdddeeefffggghhhiiijjjkkklllmmmnnnoooppp"
-                                    .bytes()
-                                    .collect();
+            .bytes()
+            .collect();
 
         match WriteLogWriter::<fs::File>::new_to_file(&String::from("bench_file.wlg"), false) {
             Err(e) => panic!("{}", e),
--- a/src/lib.rs	Wed Jun 15 18:54:43 2016 +0200
+++ b/src/lib.rs	Wed Jun 15 19:27:29 2016 +0200
@@ -6,11 +6,11 @@
 pub mod controller;
 pub mod formats;
 pub mod input_cache;
-mod map;
 pub mod mapreducer;
 pub mod parameters;
 pub mod record_types;
-mod reduce;
+
+mod phases;
 mod shard_merge;
 mod sort;
 
--- a/src/map.rs	Wed Jun 15 18:54:43 2016 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,212 +0,0 @@
-//! Implements the Map phase.
-//!
-
-#![allow(dead_code)]
-
-use std::collections::BTreeMap;
-use std::fmt;
-use std::io::Write;
-
-use formats::util::SinkGenerator;
-use mapreducer::MapReducer;
-use parameters::MRParameters;
-use record_types::{Record, MEmitter};
-use sort::DictComparableString;
-
-/// This is the base of the mapping phase. It contains an input
-/// and intermediary input and output forms.
-/// Mapper threads run on this. Every mapper thread has one MapPartition
-/// instance per input chunk.
-pub struct MapPartition<MR: MapReducer, MapInput: Iterator<Item = Record>, SinkGen: SinkGenerator> {
-    mr: MR,
-    params: MRParameters,
-    input: MapInput,
-    sink: SinkGen,
-    sorted_input: BTreeMap<DictComparableString, String>,
-    sorted_output: BTreeMap<DictComparableString, Vec<String>>,
-}
-
-impl<MR: MapReducer, MapInput: Iterator<Item=Record>, SinkGen: SinkGenerator> MapPartition<MR, MapInput, SinkGen> {
-    pub fn _new(params: MRParameters,
-                input: MapInput,
-                mr: MR,
-                output: SinkGen)
-                -> MapPartition<MR, MapInput, SinkGen> {
-        MapPartition {
-            mr: mr,
-            params: params,
-            input: input,
-            sink: output,
-            sorted_input: BTreeMap::new(),
-            sorted_output: BTreeMap::new(),
-        }
-    }
-    pub fn _run(mut self) {
-        self.sort_input();
-        self.do_map();
-        self.write_output();
-    }
-
-/// Sorts input into the sorted_input map, moving the records on the way
-/// (so no copying happens and memory consumption stays low-ish)
-    fn sort_input(&mut self) {
-        loop {
-            match self.input.next() {
-                None => break,
-                Some(record) => {
-                    self.sorted_input.insert(DictComparableString::DCS(record.key), record.value);
-                }
-            }
-        }
-    }
-
-/// Executes the mapping phase.
-    fn do_map(&mut self) {
-        let mut key_buffer = Vec::with_capacity(self.params.key_buffer_size);
-
-        loop {
-            for k in self.sorted_input.keys().take(self.params.key_buffer_size) {
-                key_buffer.push(k.clone())
-            }
-
-            for k in &key_buffer[..] {
-                let val;
-                match self.sorted_input.remove(k) {
-                    None => continue,
-                    Some(v) => val = v,
-                }
-                let mut e = MEmitter::new();
-                self.mr.map(&mut e,
-                            Record {
-                                key: k.clone().unwrap(),
-                                value: val,
-                            });
-                self.insert_result(e);
-            }
-
-            if key_buffer.len() < self.params.key_buffer_size {
-                break;
-            }
-            key_buffer.clear();
-        }
-    }
-
-    fn setup_output(&mut self) -> Vec<SinkGen::Sink> {
-// Set up sharded outputs.
-        let mut outputs = Vec::new();
-
-        for i in 0..self.params.reducers {
-            let out = self.sink.new_output(&fmt::format(format_args!("{}{}.{}",
-                                                                     self.params.map_output_location,
-                                                                     self.params.shard_id,
-                                                                     i)));
-            outputs.push(out);
-        }
-        assert_eq!(outputs.len(), self.params.reducers);
-        outputs
-    }
-
-    fn write_output(&mut self) {
-        let mut outputs = self.setup_output();
-
-        for (k, vs) in self.sorted_output.iter() {
-            let shard = self.mr.shard(self.params.reducers, k.as_ref());
-
-            for v in vs {
-                let r1 = outputs[shard].write(k.as_ref().as_bytes());
-                match r1 {
-                    Err(e) => panic!("couldn't write map output: {}", e),
-                    Ok(_) => (),
-                }
-                let r2 = outputs[shard].write(v.as_bytes());
-                match r2 {
-                    Err(e) => panic!("couldn't write map output: {}", e),
-                    Ok(_) => (),
-                }
-            }
-        }
-    }
-
-    fn insert_result(&mut self, emitter: MEmitter) {
-        for r in emitter._get() {
-            let e;
-            {
-                e = self.sorted_output.remove(&DictComparableString::wrap(r.key.clone()));
-            }
-
-            match e {
-                None => {
-                    self.sorted_output.insert(DictComparableString::wrap(r.key), vec![r.value]);
-                }
-                Some(mut v) => {
-                    v.push(r.value);
-                    self.sorted_output.insert(DictComparableString::wrap(r.key), v);
-                }
-            }
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use closure_mr::ClosureMapReducer;
-    use formats::util::PosRecordIterator;
-    use formats::lines::LinesSinkGenerator;
-    use map::MapPartition;
-    use record_types::{MEmitter, REmitter, Record, MultiRecord};
-    use parameters::MRParameters;
-    use std::collections::LinkedList;
-
-    fn mapper_func(e: &mut MEmitter, r: Record) {
-        for w in r.value.split_whitespace() {
-            e.emit(String::from(w), String::from("1"));
-        }
-    }
-
-    fn reducer_func(_: &mut REmitter, _: MultiRecord) {
-        // no-op
-    }
-
-    fn get_mr() -> ClosureMapReducer {
-        ClosureMapReducer::new(mapper_func, reducer_func)
-    }
-
-    fn get_input() -> LinkedList<Record> {
-        let inp: Vec<String> = vec!["abc def",
-                                    "xy yz za",
-                                    "hello world",
-                                    "let's do this",
-                                    "foo bar baz"]
-                                   .iter()
-                                   .map(move |s| String::from(*s))
-                                   .collect();
-        let ri: PosRecordIterator<_> = PosRecordIterator::new(inp.into_iter());
-        ri.collect()
-    }
-
-
-    fn get_output() -> LinesSinkGenerator {
-        LinesSinkGenerator::new_to_files()
-    }
-
-    #[test]
-    fn test_map_partition() {
-        // use std::fmt::format;
-        // use std::fs;
-
-        let reducers = 3;
-        let mp = MapPartition::_new(MRParameters::new()
-                                        .set_concurrency(4, reducers)
-                                        .set_file_locations(String::from("testdata/map_im_"),
-                                                            String::from("testdata/result_")),
-                                    get_input().into_iter(),
-                                    get_mr(),
-                                    get_output());
-        mp._run();
-
-        for _ in 0..reducers {
-            // let filename = format(format_args!("testdata/map_im_{}", i));
-            // let _ = fs::remove_file(filename);
-        }
-    }
-}
--- a/src/mapreducer.rs	Wed Jun 15 18:54:43 2016 +0200
+++ b/src/mapreducer.rs	Wed Jun 15 19:27:29 2016 +0200
@@ -23,20 +23,31 @@
 /// the return value should be in [0; n).
 pub type SharderF = fn(usize, &String) -> usize;
 
-/// A type implementing map() and reduce() functions.
-/// The MapReducer is cloned once per mapper/reducer thread.
-pub trait MapReducer: Clone {
+pub trait Mapper: Send + Clone {
     /// Takes one <key,value> pair and an emitter.
     /// The emitter is used to yield results from the map phase.
-    fn map(&self, em: &mut MEmitter, record: Record);
+    ///
+    /// Note that this method takes a &mut self; you can use this to cache expensive objects
+    /// between runs (but not between shards!)
+    fn map(&mut self, em: &mut MEmitter, record: Record);
+}
+
+pub trait Reducer: Send + Clone {
     /// Takes one key and one or more values and emits one or more
     /// values.
-    fn reduce(&self, em: &mut REmitter, records: MultiRecord);
+    ///
+    /// Note that this method takes a &mut self; you can use this to cache expensive objects
+    /// between runs (but not between shards!)
+    fn reduce(&mut self, em: &mut REmitter, records: MultiRecord);
+}
 
+pub trait Sharder: Send + Clone {
     /// Determines how to map keys to (reduce) shards.
     /// Returns a number in [0; n) determining the shard the key belongs in.
     /// The default implementation uses a simple hash (SipHasher) and modulo.
-    fn shard(&self, n: usize, key: &String) -> usize {
+    fn shard(&mut self, n: usize, key: &String) -> usize {
         _std_shard(n, key)
     }
 }
+
+pub struct DefaultSharder;
--- a/src/parameters.rs	Wed Jun 15 18:54:43 2016 +0200
+++ b/src/parameters.rs	Wed Jun 15 19:27:29 2016 +0200
@@ -22,6 +22,7 @@
 }
 
 impl MRParameters {
+    /// Creates an instance with sane defaults.
     pub fn new() -> MRParameters {
         MRParameters {
             key_buffer_size: 256,
@@ -71,15 +72,14 @@
         self
     }
 
-    /// prealloc_size: How big are the groups of keys in the reduce phase expected to be? (used for pre-allocating
-    /// buffers)
-    /// Default 1.
+    /// prealloc_size: How big are the groups of keys in the reduce phase expected to be?
+    /// (used for pre-allocating buffers). Default 1.
     ///
     /// insensitive: Whether to group strings together that differ in case. When used, the first
     /// encountered key will be supplied as key to the reduce function.
-    /// BUG: This will not work correctly until the map phase delivers outputs in the correct order, i.e.
-    /// dictionary order. The default Ord implementation for String treats lower and upper case
-    /// very differently. Default: false.
+    /// BUG: This will not work correctly until the map phase delivers outputs in the correct order,
+    /// i.e. dictionary order. The default Ord implementation for String treats lower and upper
+    /// case very differently. Default: false.
     pub fn set_reduce_group_opts(mut self,
                                  prealloc_size: usize,
                                  insensitive: bool)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/phases/map.rs	Wed Jun 15 19:27:29 2016 +0200
@@ -0,0 +1,218 @@
+//! Implements the Map phase.
+//!
+
+#![allow(dead_code)]
+
+use std::collections::BTreeMap;
+use std::fmt;
+use std::io::Write;
+
+use formats::util::SinkGenerator;
+use mapreducer::{Mapper, Sharder};
+use parameters::MRParameters;
+use record_types::{Record, MEmitter};
+use sort::DictComparableString;
+
+/// This is the base of the mapping phase. It contains an input
+/// and intermediary input and output forms.
+/// Mapper threads run on this. Every mapper thread has one MapPartition
+/// instance per input chunk.
+pub struct MapPartition<M: Mapper,
+                        S: Sharder,
+                        MapInput: Iterator<Item = Record>,
+                        SinkGen: SinkGenerator>
+{
+    m: M,
+    sharder: S,
+    params: MRParameters,
+    input: MapInput,
+    sink: SinkGen,
+    sorted_input: BTreeMap<DictComparableString, String>,
+    sorted_output: BTreeMap<DictComparableString, Vec<String>>,
+}
+
+impl<M: Mapper, S: Sharder, MapInput: Iterator<Item=Record>,
+    SinkGen: SinkGenerator> MapPartition<M, S, MapInput, SinkGen> {
+    pub fn _new(params: MRParameters,
+                input: MapInput,
+                mapper: M,
+                sharder: S,
+                output: SinkGen)
+                -> MapPartition<M, S, MapInput, SinkGen> {
+        MapPartition {
+            m: mapper,
+            sharder: sharder,
+            params: params,
+            input: input,
+            sink: output,
+            sorted_input: BTreeMap::new(),
+            sorted_output: BTreeMap::new(),
+        }
+    }
+    pub fn _run(mut self) {
+        self.sort_input();
+        self.do_map();
+        self.write_output();
+    }
+
+/// Sorts input into the sorted_input map, moving the records on the way
+/// (so no copying happens and memory consumption stays low-ish)
+    fn sort_input(&mut self) {
+        loop {
+            match self.input.next() {
+                None => break,
+                Some(record) => {
+                    self.sorted_input.insert(DictComparableString::DCS(record.key), record.value);
+                }
+            }
+        }
+    }
+
+/// Executes the mapping phase.
+    fn do_map(&mut self) {
+        let mut key_buffer = Vec::with_capacity(self.params.key_buffer_size);
+
+        loop {
+            for k in self.sorted_input.keys().take(self.params.key_buffer_size) {
+                key_buffer.push(k.clone())
+            }
+
+            for k in &key_buffer[..] {
+                let val;
+                match self.sorted_input.remove(k) {
+                    None => continue,
+                    Some(v) => val = v,
+                }
+                let mut e = MEmitter::new();
+                self.m.map(&mut e,
+                            Record {
+                                key: k.clone().unwrap(),
+                                value: val,
+                            });
+                self.insert_result(e);
+            }
+
+            if key_buffer.len() < self.params.key_buffer_size {
+                break;
+            }
+            key_buffer.clear();
+        }
+    }
+
+    fn setup_output(&mut self) -> Vec<SinkGen::Sink> {
+// Set up sharded outputs.
+        let mut outputs = Vec::new();
+
+        for i in 0..self.params.reducers {
+            let out = self.sink.new_output(
+                &fmt::format(format_args!("{}{}.{}",
+                                          self.params.map_output_location,
+                                          self.params.shard_id,
+                                          i)));
+            outputs.push(out);
+        }
+        assert_eq!(outputs.len(), self.params.reducers);
+        outputs
+    }
+
+    fn write_output(&mut self) {
+        let mut outputs = self.setup_output();
+
+        for (k, vs) in self.sorted_output.iter() {
+            let shard = self.sharder.shard(self.params.reducers, k.as_ref());
+
+            for v in vs {
+                let r1 = outputs[shard].write(k.as_ref().as_bytes());
+                match r1 {
+                    Err(e) => panic!("couldn't write map output: {}", e),
+                    Ok(_) => (),
+                }
+                let r2 = outputs[shard].write(v.as_bytes());
+                match r2 {
+                    Err(e) => panic!("couldn't write map output: {}", e),
+                    Ok(_) => (),
+                }
+            }
+        }
+    }
+
+    fn insert_result(&mut self, emitter: MEmitter) {
+        for r in emitter._get() {
+            let e;
+            {
+                e = self.sorted_output.remove(&DictComparableString::wrap(r.key.clone()));
+            }
+
+            match e {
+                None => {
+                    self.sorted_output.insert(DictComparableString::wrap(r.key), vec![r.value]);
+                }
+                Some(mut v) => {
+                    v.push(r.value);
+                    self.sorted_output.insert(DictComparableString::wrap(r.key), v);
+                }
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use closure_mr::ClosureMapReducer;
+    use formats::util::PosRecordIterator;
+    use formats::lines::LinesSinkGenerator;
+    use map::MapPartition;
+    use record_types::{MEmitter, REmitter, Record, MultiRecord};
+    use parameters::MRParameters;
+    use std::collections::LinkedList;
+
+    fn mapper_func(e: &mut MEmitter, r: Record) {
+        for w in r.value.split_whitespace() {
+            e.emit(String::from(w), String::from("1"));
+        }
+    }
+
+    fn reducer_func(_: &mut REmitter, _: MultiRecord) {
+        // no-op
+    }
+
+    fn get_mr() -> ClosureMapReducer {
+        ClosureMapReducer::new(mapper_func, reducer_func)
+    }
+
+    fn get_input() -> LinkedList<Record> {
+        let inp: Vec<String> =
+            vec!["abc def", "xy yz za", "hello world", "let's do this", "foo bar baz"]
+                .iter()
+                .map(move |s| String::from(*s))
+                .collect();
+        let ri: PosRecordIterator<_> = PosRecordIterator::new(inp.into_iter());
+        ri.collect()
+    }
+
+
+    fn get_output() -> LinesSinkGenerator {
+        LinesSinkGenerator::new_to_files()
+    }
+
+    #[test]
+    fn test_map_partition() {
+        // use std::fmt::format;
+        // use std::fs;
+
+        let reducers = 3;
+        let mp = MapPartition::_new(MRParameters::new()
+                                        .set_concurrency(4, reducers)
+                                        .set_file_locations(String::from("testdata/map_im_"),
+                                                            String::from("testdata/result_")),
+                                    get_input().into_iter(),
+                                    get_mr(),
+                                    get_output());
+        mp._run();
+
+        for _ in 0..reducers {
+            // let filename = format(format_args!("testdata/map_im_{}", i));
+            // let _ = fs::remove_file(filename);
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/phases/mod.rs	Wed Jun 15 19:27:29 2016 +0200
@@ -0,0 +1,2 @@
+pub mod map;
+pub mod reduce;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/phases/reduce.rs	Wed Jun 15 19:27:29 2016 +0200
@@ -0,0 +1,218 @@
+//! Implements the Reduce phase.
+//!
+
+use std::io;
+use std::iter::Peekable;
+
+use mapreducer::Reducer;
+use parameters::MRParameters;
+use record_types::{Record, MultiRecord, REmitter};
+use shard_merge::ShardMergeIterator;
+
+pub struct ReducePartition<R: Reducer, InputIt: Iterator<Item = Record>, Sink: io::Write> {
+    r: R,
+    params: MRParameters,
+    // Maybe we want to genericize this to an Iterator<Item=Read> or so? This defers opening
+    // the files to the reduce shard itself.
+    srcs: Vec<InputIt>,
+    dstfile: Sink,
+}
+
+impl<R: Reducer, InputIt: Iterator<Item = Record>, Sink: io::Write> ReducePartition<R,
+                                                                                    InputIt,
+                                                                                    Sink> {
+    /// Create a new Reduce partition for the given reducer R; source and destination I/O.
+    /// mr is the map/reduce functions.
+    /// params is generic MR parameters as well as some applying directly to this reduce partition.
+    /// srcs is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's
+    /// outputs.
+    /// dstfiles is a Sink (as known from the mapping phase) that is used to create the output
+    /// file (there is one output file per reduce partition, currently).
+    pub fn new(r: R,
+               params: MRParameters,
+               srcs: Vec<InputIt>,
+               outp: Sink)
+               -> ReducePartition<R, InputIt, Sink> {
+        ReducePartition {
+            r: r,
+            params: params,
+            srcs: srcs,
+            dstfile: outp,
+        }
+    }
+
+    /// Run the Reduce partition.
+    pub fn _run(mut self) {
+        let mut inputs = Vec::new();
+        inputs.append(&mut self.srcs);
+        let mut it = inputs.into_iter();
+
+        let params = self.params.clone();
+
+        self.reduce(RecordsToMultiRecords::new(ShardMergeIterator::build(&mut it), params))
+    }
+
+    fn reduce<RecIt: Iterator<Item = Record>>(mut self, inp: RecordsToMultiRecords<RecIt>) {
+        use std::io::Write;
+
+        for multirec in inp {
+            let mut emitter = REmitter::new();
+            self.r.reduce(&mut emitter, multirec);
+
+            for result in emitter._get().into_iter() {
+                match self.dstfile.write(result.as_bytes()) {
+                    Err(e) => {
+                        println!("WARN: While reducing shard #{}: {}",
+                                 self.params.shard_id,
+                                 e)
+                    }
+                    Ok(_) => (),
+                }
+            }
+        }
+    }
+}
+
+/// Iterator adapter: Converts an Iterator<Item=Record> into an Iterator<Item=MultiRecord> by
+/// grouping subsequent records with identical key.
+/// The original iterator must yield records in sorted order (or at least in an order where
+/// identical items are adjacent).
+pub struct RecordsToMultiRecords<It: Iterator<Item = Record>> {
+    it: Peekable<It>,
+    params: MRParameters,
+}
+
+impl<It: Iterator<Item = Record>> RecordsToMultiRecords<It> {
+    fn new(it: It, params: MRParameters) -> RecordsToMultiRecords<It> {
+        RecordsToMultiRecords {
+            it: it.peekable(),
+            params: params,
+        }
+    }
+}
+
+impl<It: Iterator<Item = Record>> Iterator for RecordsToMultiRecords<It> {
+    type Item = MultiRecord;
+    fn next(&mut self) -> Option<Self::Item> {
+        use std::ascii::AsciiExt;
+        let mut collection = Vec::with_capacity(self.params.reduce_group_prealloc_size);
+        let key: String;
+        match self.it.next() {
+            None => return None,
+            Some(r) => {
+                if self.params.reduce_group_insensitive {
+                    key = r.key[..].to_ascii_lowercase();
+                } else {
+                    key = r.key
+                }
+                collection.push(r.value)
+            }
+        }
+        loop {
+            match self.it.peek() {
+                None => break,
+                Some(r) => {
+                    if !self.params.reduce_group_insensitive && r.key != key {
+                        break;
+                    } else if self.params.reduce_group_insensitive &&
+                       r.key[..].to_ascii_lowercase() != key {
+                        break;
+                    }
+                }
+            }
+            collection.push(self.it.next().unwrap().value);
+        }
+        return Some(MultiRecord::new(key, collection));
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use closure_mr::ClosureMapReducer;
+    use formats::lines::LinesSinkGenerator;
+    use formats::util::SinkGenerator;
+    use parameters::MRParameters;
+    use record_types::*;
+
+    use std::vec;
+
+    fn get_records() -> Vec<Record> {
+        vec![mk_rcrd("aaa", "def"),
+             mk_rcrd("abb", "111"),
+             mk_rcrd("Abb", "112"),
+             mk_rcrd("abbb", "113"),
+             mk_rcrd("abc", "xyz"),
+             mk_rcrd("xyz", "___"),
+             mk_rcrd("xyz", "__foo"),
+             mk_rcrd("xyz", "---")]
+    }
+
+    #[test]
+    fn test_grouping_iterator() {
+        let records = get_records();
+        let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> =
+            RecordsToMultiRecords::new(records.into_iter(),
+                                       MRParameters::new().set_reduce_group_opts(2, true));
+
+        let lengths = vec![1, 2, 1, 1, 3];
+        let mut i = 0;
+
+        for multirec in group_it {
+            assert_eq!(multirec.into_iter().count(), lengths[i]);
+            i += 1;
+        }
+    }
+
+    #[test]
+    fn test_grouping_iterator_sensitive() {
+        let records = get_records();
+        let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> =
+            RecordsToMultiRecords::new(records.into_iter(),
+                                       MRParameters::new().set_reduce_group_opts(2, false));
+
+        let lengths = vec![1, 1, 1, 1, 1, 3];
+        let mut i = 0;
+
+        for multirec in group_it {
+            assert_eq!(multirec.into_iter().count(), lengths[i]);
+            i += 1;
+        }
+    }
+
+    fn test_reducer(e: &mut REmitter, recs: MultiRecord) {
+        use std::fmt::Write;
+        use std::borrow::Borrow;
+
+        let mut out = String::with_capacity(32);
+        let _ = out.write_fmt(format_args!("{}:", recs.key()));
+
+        for val in recs {
+            let _ = out.write_str(" ");
+            let _ = out.write_str(val.borrow());
+        }
+
+        e.emit(out);
+    }
+
+    fn fake_mapper(_: &mut MEmitter, _: Record) {}
+
+    #[test]
+    fn test_reduce() {
+        let mr = ClosureMapReducer::new(fake_mapper, test_reducer);
+        let params = MRParameters::new()
+            .set_shard_id(42)
+            .set_reduce_group_opts(1, true)
+            .set_file_locations(String::from("testdata/map_intermed_"),
+                                String::from("testdata/result_"));
+        let srcs = vec![get_records().into_iter()];
+        let dst = LinesSinkGenerator::new_to_files();
+
+        let r = ReducePartition::new(mr,
+                                     params,
+                                     srcs,
+                                     dst.new_output(&String::from("testdata/result_0")));
+        r._run();
+    }
+}
--- a/src/reduce.rs	Wed Jun 15 18:54:43 2016 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,219 +0,0 @@
-//! Implements the Reduce phase.
-//!
-
-use std::io;
-use std::iter::Peekable;
-
-use mapreducer::MapReducer;
-use parameters::MRParameters;
-use record_types::{Record, MultiRecord, REmitter};
-use shard_merge::ShardMergeIterator;
-
-pub struct ReducePartition<MR: MapReducer, InputIt: Iterator<Item = Record>, Sink: io::Write> {
-    mr: MR,
-    params: MRParameters,
-    // Maybe we want to genericize this to an Iterator<Item=Read> or so? This defers opening
-    // the files to the reduce shard itself.
-    srcs: Vec<InputIt>,
-    dstfile: Sink,
-}
-
-impl<MR: MapReducer, InputIt: Iterator<Item = Record>, Sink: io::Write> ReducePartition<MR,
-                                                                                        InputIt,
-                                                                                        Sink>
-    {
-    /// Create a new Reduce partition for the given MR; source and destination I/O.
-    /// mr is the map/reduce functions.
-    /// params is generic MR parameters as well as some applying directly to this reduce partition.
-    /// srcs is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's
-    /// outputs.
-    /// dstfiles is a Sink (as known from the mapping phase) that is used to create the output
-    /// file (there is one output file per reduce partition, currently).
-    pub fn new(mr: MR,
-               params: MRParameters,
-               srcs: Vec<InputIt>,
-               outp: Sink)
-               -> ReducePartition<MR, InputIt, Sink> {
-        ReducePartition {
-            mr: mr,
-            params: params,
-            srcs: srcs,
-            dstfile: outp,
-        }
-    }
-
-    /// Run the Reduce partition.
-    pub fn _run(mut self) {
-        let mut inputs = Vec::new();
-        inputs.append(&mut self.srcs);
-        let mut it = inputs.into_iter();
-
-        let params = self.params.clone();
-
-        self.reduce(RecordsToMultiRecords::new(ShardMergeIterator::build(&mut it), params))
-    }
-
-    fn reduce<RecIt: Iterator<Item = Record>>(mut self, inp: RecordsToMultiRecords<RecIt>) {
-        use std::io::Write;
-
-        for multirec in inp {
-            let mut emitter = REmitter::new();
-            self.mr.reduce(&mut emitter, multirec);
-
-            for result in emitter._get().into_iter() {
-                match self.dstfile.write(result.as_bytes()) {
-                    Err(e) => {
-                        println!("WARN: While reducing shard #{}: {}",
-                                 self.params.shard_id,
-                                 e)
-                    }
-                    Ok(_) => (),
-                }
-            }
-        }
-    }
-}
-
-/// Iterator adapter: Converts an Iterator<Item=Record> into an Iterator<Item=MultiRecord> by
-/// grouping subsequent records with identical key.
-/// The original iterator must yield records in sorted order (or at least in an order where
-/// identical items are adjacent).
-pub struct RecordsToMultiRecords<It: Iterator<Item = Record>> {
-    it: Peekable<It>,
-    params: MRParameters,
-}
-
-impl<It: Iterator<Item = Record>> RecordsToMultiRecords<It> {
-    fn new(it: It, params: MRParameters) -> RecordsToMultiRecords<It> {
-        RecordsToMultiRecords {
-            it: it.peekable(),
-            params: params,
-        }
-    }
-}
-
-impl<It: Iterator<Item = Record>> Iterator for RecordsToMultiRecords<It> {
-    type Item = MultiRecord;
-    fn next(&mut self) -> Option<Self::Item> {
-        use std::ascii::AsciiExt;
-        let mut collection = Vec::with_capacity(self.params.reduce_group_prealloc_size);
-        let key: String;
-        match self.it.next() {
-            None => return None,
-            Some(r) => {
-                if self.params.reduce_group_insensitive {
-                    key = r.key[..].to_ascii_lowercase();
-                } else {
-                    key = r.key
-                }
-                collection.push(r.value)
-            }
-        }
-        loop {
-            match self.it.peek() {
-                None => break,
-                Some(r) => {
-                    if !self.params.reduce_group_insensitive && r.key != key {
-                        break;
-                    } else if self.params.reduce_group_insensitive &&
-                       r.key[..].to_ascii_lowercase() != key {
-                        break;
-                    }
-                }
-            }
-            collection.push(self.it.next().unwrap().value);
-        }
-        return Some(MultiRecord::new(key, collection));
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    use closure_mr::ClosureMapReducer;
-    use formats::lines::LinesSinkGenerator;
-    use formats::util::SinkGenerator;
-    use parameters::MRParameters;
-    use record_types::*;
-
-    use std::vec;
-
-    fn get_records() -> Vec<Record> {
-        vec![mk_rcrd("aaa", "def"),
-             mk_rcrd("abb", "111"),
-             mk_rcrd("Abb", "112"),
-             mk_rcrd("abbb", "113"),
-             mk_rcrd("abc", "xyz"),
-             mk_rcrd("xyz", "___"),
-             mk_rcrd("xyz", "__foo"),
-             mk_rcrd("xyz", "---")]
-    }
-
-    #[test]
-    fn test_grouping_iterator() {
-        let records = get_records();
-        let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> =
-            RecordsToMultiRecords::new(records.into_iter(),
-                                       MRParameters::new().set_reduce_group_opts(2, true));
-
-        let lengths = vec![1, 2, 1, 1, 3];
-        let mut i = 0;
-
-        for multirec in group_it {
-            assert_eq!(multirec.into_iter().count(), lengths[i]);
-            i += 1;
-        }
-    }
-
-    #[test]
-    fn test_grouping_iterator_sensitive() {
-        let records = get_records();
-        let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> =
-            RecordsToMultiRecords::new(records.into_iter(),
-                                       MRParameters::new().set_reduce_group_opts(2, false));
-
-        let lengths = vec![1, 1, 1, 1, 1, 3];
-        let mut i = 0;
-
-        for multirec in group_it {
-            assert_eq!(multirec.into_iter().count(), lengths[i]);
-            i += 1;
-        }
-    }
-
-    fn test_reducer(e: &mut REmitter, recs: MultiRecord) {
-        use std::fmt::Write;
-        use std::borrow::Borrow;
-
-        let mut out = String::with_capacity(32);
-        let _ = out.write_fmt(format_args!("{}:", recs.key()));
-
-        for val in recs {
-            let _ = out.write_str(" ");
-            let _ = out.write_str(val.borrow());
-        }
-
-        e.emit(out);
-    }
-
-    fn fake_mapper(_: &mut MEmitter, _: Record) {}
-
-    #[test]
-    fn test_reduce() {
-        let mr = ClosureMapReducer::new(fake_mapper, test_reducer);
-        let params = MRParameters::new()
-                         .set_shard_id(42)
-                         .set_reduce_group_opts(1, true)
-                         .set_file_locations(String::from("testdata/map_intermed_"),
-                                             String::from("testdata/result_"));
-        let srcs = vec![get_records().into_iter()];
-        let dst = LinesSinkGenerator::new_to_files();
-
-        let r = ReducePartition::new(mr,
-                                     params,
-                                     srcs,
-                                     dst.new_output(&String::from("testdata/result_0")));
-        r._run();
-    }
-}
--- a/src/shard_merge.rs	Wed Jun 15 18:54:43 2016 +0200
+++ b/src/shard_merge.rs	Wed Jun 15 19:27:29 2016 +0200
@@ -207,7 +207,7 @@
                                                      get_collection_4(),
                                                      get_collection_5(),
                                                      get_collection_6()]
-                                                    .into_iter());
+            .into_iter());
         let mut cmp = 0;
         let mut cnt = 0;
 
@@ -241,7 +241,7 @@
         let merge_it = ShardMergeIterator::build_with_cmp(&mut files.into_iter(),
                                                           sort::dict_string_compare);
         let mut outfile = lines::LinesWriter::new_to_file(&String::from("testdata/all_sorted.txt"))
-                              .unwrap();
+            .unwrap();
 
         for line in merge_it {
             let _ = outfile.write(line.as_bytes());
--- a/src/sort.rs	Wed Jun 15 18:54:43 2016 +0200
+++ b/src/sort.rs	Wed Jun 15 19:27:29 2016 +0200
@@ -105,16 +105,14 @@
 
 impl PartialOrd for DictComparableString {
     fn partial_cmp(&self, other: &DictComparableString) -> Option<Ordering> {
-        let (&DictComparableString::DCS(ref a),
-             &DictComparableString::DCS(ref b)) = (self, other);
+        let (&DictComparableString::DCS(ref a), &DictComparableString::DCS(ref b)) = (self, other);
         Some(dict_string_compare(a, b))
     }
 }
 
 impl Ord for DictComparableString {
     fn cmp(&self, other: &DictComparableString) -> Ordering {
-        let (&DictComparableString::DCS(ref a),
-             &DictComparableString::DCS(ref b)) = (self, other);
+        let (&DictComparableString::DCS(ref a), &DictComparableString::DCS(ref b)) = (self, other);
         dict_string_compare(a, b)
     }
 }