changeset 31:ce0ed262a105

Add sharded output to mapping phase
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 31 Jan 2016 19:32:24 +0000
parents 03ea3c83f499
children 9e6cdeb49c8c
files src/map.rs
diffstat 1 files changed, 37 insertions(+), 18 deletions(-) [+]
line wrap: on
line diff
--- a/src/map.rs	Sun Jan 31 17:05:33 2016 +0000
+++ b/src/map.rs	Sun Jan 31 19:32:24 2016 +0000
@@ -3,9 +3,12 @@
 
 #![allow(dead_code)]
 
+use std::fmt;
 use std::io::Write;
 use std::collections::{LinkedList, BTreeMap};
 use mapreducer::{Record, MapReducer, MEmitter};
+use parameters::MRParameters;
+use formats::util::MRSinkGenerator;
 
 type MapInput = LinkedList<Record>;
 
@@ -13,20 +16,22 @@
 /// and intermediary input and output forms.
 /// Mapper threads run on this. Every mapper thread has one MapPartition
 /// instance per input chunk.
-struct MapPartition<MR: MapReducer> {
+struct MapPartition<MR: MapReducer, SinkGen: MRSinkGenerator> {
     mr: MR,
+    params: MRParameters,
     input: MapInput,
-    output: Box<Write>,
+    sink: SinkGen,
     sorted_input: BTreeMap<String, String>,
     sorted_output: BTreeMap<String, Vec<String>>,
 }
 
-impl<MR: MapReducer> MapPartition<MR> {
-    pub fn _new(input: MapInput, mr: MR, output: Box<Write>) -> MapPartition<MR> {
+impl<MR: MapReducer, SinkGen: MRSinkGenerator> MapPartition<MR, SinkGen> {
+    pub fn _new(params: MRParameters, input: MapInput, mr: MR, output: SinkGen) -> MapPartition<MR, SinkGen> {
         MapPartition {
             mr: mr,
+            params: params,
             input: input,
-            output: output,
+            sink: output,
             sorted_input: BTreeMap::new(),
             sorted_output: BTreeMap::new(),
         }
@@ -52,12 +57,10 @@
 
     /// Executes the mapping phase.
     fn do_map(&mut self) {
-        // TODO: Make this configurable
-        let key_buffer_size: usize = 256;
-        let mut key_buffer = Vec::with_capacity(key_buffer_size);
+        let mut key_buffer = Vec::with_capacity(self.params.key_buffer_size);
 
         loop {
-            for k in self.sorted_input.keys().take(key_buffer_size) {
+            for k in self.sorted_input.keys().take(self.params.key_buffer_size) {
                 key_buffer.push(k.clone())
             }
 
@@ -76,22 +79,38 @@
                 self.insert_result(e);
             }
 
-            if key_buffer.len() < key_buffer_size {
+            if key_buffer.len() < self.params.key_buffer_size {
                 break;
             }
             key_buffer.clear();
         }
     }
 
+    fn setup_output(&mut self) -> Vec<SinkGen::Sink> {
+        // Set up sharded outputs.
+        let mut outputs = Vec::new();
+
+        for i in 0..self.params.reducers {
+            let out = self.sink.new_output(&fmt::format(format_args!("mapout_{}.{}", self.params.shard_id, i)));
+            outputs.push(out);
+        }
+        assert_eq!(outputs.len(), self.params.reducers);
+        outputs
+    }
+
     fn write_output(&mut self) {
+        let mut outputs = self.setup_output();
+
         for (k, vs) in self.sorted_output.iter() {
+            let shard = self.mr.shard(self.params.reducers, k);
+
             for v in vs {
-                let r1 = self.output.write(k.as_bytes());
+                let r1 = outputs[shard].write(k.as_bytes());
                 match r1 {
                     Err(e) => panic!("couldn't write map output: {}", e),
                     Ok(_) => (),
                 }
-                let r2 = self.output.write(v.as_bytes());
+                let r2 = outputs[shard].write(v.as_bytes());
                 match r2 {
                     Err(e) => panic!("couldn't write map output: {}", e),
                     Ok(_) => (),
@@ -123,11 +142,12 @@
 #[cfg(test)]
 mod tests {
     use closure_mr::ClosureMapReducer;
-    use formats::util::RecordIterator;
+    use formats::util::{RecordIterator};
+    use formats::lines::LinesSinkGenerator;
     use map::MapPartition;
     use mapreducer::{MEmitter, REmitter, Record, MultiRecord};
+    use parameters::MRParameters;
     use std::collections::LinkedList;
-    use std::io::Write;
 
     fn mapper_func(e: &mut MEmitter, r: Record) {
         for w in r.value.split_whitespace() {
@@ -157,14 +177,13 @@
     }
 
 
-    fn get_output() -> Box<Write> {
-        use formats::lines::LinesWriter;
-        Box::new(LinesWriter::new_to_file(&String::from("mapphase_1.wlg")).unwrap())
+    fn get_output() -> LinesSinkGenerator {
+        LinesSinkGenerator::new(&String::from("test_map_"))
     }
 
     #[test]
     fn test_map_partition() {
-        let mp = MapPartition::_new(get_input(), get_mr(), get_output());
+        let mp = MapPartition::_new(MRParameters::new().set_concurrency(4, 3), get_input(), get_mr(), get_output());
         mp._run();
     }
 }