changeset 26:5a61cde90fe6

Add Map phase implementation
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 31 Jan 2016 16:20:16 +0000
parents 68f181727187
children 67ce650d5de5
files src/map.rs
diffstat 1 files changed, 105 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/map.rs	Sun Jan 31 16:20:16 2016 +0000
@@ -0,0 +1,105 @@
+//! Implements the mapping phase.
+//!
+
+use std::io::Write;
+use std::collections::{LinkedList, BTreeMap};
+use mapreducer::{Record, MapReducer, MEmitter};
+
+type MapInput = LinkedList<Record>;
+
+/// This is the base of the mapping phase. It contains an input
+/// and intermediary input and output forms.
+/// Mapper threads run on this. Every mapper thread has one MapPartition
+/// instance per input chunk.
+struct MapPartition<MR: MapReducer> {
+    mr: MR,
+    input: MapInput,
+    output: Box<Write>,
+    sorted_input: BTreeMap<String, String>,
+    sorted_output: BTreeMap<String, Vec<String>>,
+}
+
+impl<MR: MapReducer> MapPartition<MR> {
+    pub fn _new(input: MapInput, mr: MR, output: Box<Write>) -> MapPartition<MR> {
+        MapPartition {
+            mr: mr,
+            input: input,
+            output: output,
+            sorted_input: BTreeMap::new(),
+            sorted_output: BTreeMap::new(),
+        }
+    }
+    pub fn _run(mut self) {
+        self.sort_input();
+        self.do_map();
+        self.write_output();
+    }
+
+    /// Sorts input into the sorted_input map, moving the records on the way
+    /// (so no copying happens and memory consumption stays low-ish)
+    fn sort_input(&mut self) {
+        loop {
+            match self.input.pop_front() {
+                None => break,
+                Some(record) => { self.sorted_input.insert(record.key, record.value); },
+            }
+        }
+    }
+
+    /// Executes the mapping phase.
+    fn do_map(&mut self) {
+        // TODO: Make this configurable
+        let key_buffer_size: usize = 256;
+        let mut key_buffer = Vec::with_capacity(key_buffer_size);
+
+        loop {
+            for k in self.sorted_input.keys().take(key_buffer_size) {
+                key_buffer.push(k.clone())
+            }
+
+            for k in &key_buffer[..] {
+                let val;
+                match self.sorted_input.remove(k) {
+                    None => continue,
+                    Some(v) => val = v
+                }
+                let mut e = MEmitter::new();
+                self.mr.map(&mut e, Record { key: k.clone(), value: val });
+                self.insert_result(e);
+            }
+
+            key_buffer.clear();
+        }
+    }
+
+    fn write_output(&mut self) {
+        for (k, vs) in self.sorted_output.iter() {
+            for v in vs {
+                let r1 = self.output.write(k.as_bytes());
+                match r1 {
+                    Err(e) => panic!("couldn't write map output: {}", e),
+                    Ok(_) => ()
+                }
+                let r2 = self.output.write(v.as_bytes());
+                match r2 {
+                    Err(e) => panic!("couldn't write map output: {}", e),
+                    Ok(_) => ()
+                }
+            }
+        }
+    }
+
+    fn insert_result(&mut self, emitter: MEmitter) {
+        for r in emitter._get() {
+            let e;
+            {
+                e = self.sorted_output.remove(&r.key);
+            }
+
+            match e {
+                None => { self.sorted_output.insert(r.key, vec![r.value]); },
+                Some(mut v) => { v.push(r.value); self.sorted_output.insert(r.key, v); },
+            }
+        }
+    }
+}