changeset 65:07095d20ba5c

Genericize Map partition code over input type
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 06 Feb 2016 00:08:27 +0000
parents 33f3fcc22c02
children 9d7c1468f1b2
files src/map.rs
diffstat 1 files changed, 11 insertions(+), 13 deletions(-) [+]
line wrap: on
line diff
--- a/src/map.rs	Sat Feb 06 00:07:47 2016 +0000
+++ b/src/map.rs	Sat Feb 06 00:08:27 2016 +0000
@@ -1,23 +1,21 @@
-//! Implements the mapping phase.
+//! Implements the Map phase.
 //!
 
 #![allow(dead_code)]
 
 use std::fmt;
 use std::io::Write;
-use std::collections::{LinkedList, BTreeMap};
+use std::collections::BTreeMap;
 use mapreducer::MapReducer;
 use record_types::{Record, MEmitter};
 use parameters::MRParameters;
 use formats::util::MRSinkGenerator;
 
-type MapInput = LinkedList<Record>;
-
 /// This is the base of the mapping phase. It contains an input
 /// and intermediary input and output forms.
 /// Mapper threads run on this. Every mapper thread has one MapPartition
 /// instance per input chunk.
-struct MapPartition<MR: MapReducer, SinkGen: MRSinkGenerator> {
+struct MapPartition<MR: MapReducer, MapInput: Iterator<Item = Record>, SinkGen: MRSinkGenerator> {
     mr: MR,
     params: MRParameters,
     input: MapInput,
@@ -26,12 +24,12 @@
     sorted_output: BTreeMap<String, Vec<String>>,
 }
 
-impl<MR: MapReducer, SinkGen: MRSinkGenerator> MapPartition<MR, SinkGen> {
+impl<MR: MapReducer, MapInput: Iterator<Item=Record>, SinkGen: MRSinkGenerator> MapPartition<MR, MapInput, SinkGen> {
     pub fn _new(params: MRParameters,
                 input: MapInput,
                 mr: MR,
                 output: SinkGen)
-                -> MapPartition<MR, SinkGen> {
+                -> MapPartition<MR, MapInput, SinkGen> {
         MapPartition {
             mr: mr,
             params: params,
@@ -47,11 +45,11 @@
         self.write_output();
     }
 
-    /// Sorts input into the sorted_input map, moving the records on the way
-    /// (so no copying happens and memory consumption stays low-ish)
+/// Sorts input into the sorted_input map, moving the records on the way
+/// (so no copying happens and memory consumption stays low-ish)
     fn sort_input(&mut self) {
         loop {
-            match self.input.pop_front() {
+            match self.input.next() {
                 None => break,
                 Some(record) => {
                     self.sorted_input.insert(record.key, record.value);
@@ -60,7 +58,7 @@
         }
     }
 
-    /// Executes the mapping phase.
+/// Executes the mapping phase.
     fn do_map(&mut self) {
         let mut key_buffer = Vec::with_capacity(self.params.key_buffer_size);
 
@@ -92,7 +90,7 @@
     }
 
     fn setup_output(&mut self) -> Vec<SinkGen::Sink> {
-        // Set up sharded outputs.
+// Set up sharded outputs.
         let mut outputs = Vec::new();
 
         for i in 0..self.params.reducers {
@@ -195,7 +193,7 @@
 
         let reducers = 3;
         let mp = MapPartition::_new(MRParameters::new().set_concurrency(4, reducers),
-                                    get_input(),
+                                    get_input().into_iter(),
                                     get_mr(),
                                     get_output());
         mp._run();