Mercurial > lbo > hg > localmr
changeset 65:07095d20ba5c
Genericize Map partition code over input type
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 06 Feb 2016 00:08:27 +0000 |
parents | 33f3fcc22c02 |
children | 9d7c1468f1b2 |
files | src/map.rs |
diffstat | 1 files changed, 11 insertions(+), 13 deletions(-) [+] |
line wrap: on
line diff
--- a/src/map.rs Sat Feb 06 00:07:47 2016 +0000 +++ b/src/map.rs Sat Feb 06 00:08:27 2016 +0000 @@ -1,23 +1,21 @@ -//! Implements the mapping phase. +//! Implements the Map phase. //! #![allow(dead_code)] use std::fmt; use std::io::Write; -use std::collections::{LinkedList, BTreeMap}; +use std::collections::BTreeMap; use mapreducer::MapReducer; use record_types::{Record, MEmitter}; use parameters::MRParameters; use formats::util::MRSinkGenerator; -type MapInput = LinkedList<Record>; - /// This is the base of the mapping phase. It contains an input /// and intermediary input and output forms. /// Mapper threads run on this. Every mapper thread has one MapPartition /// instance per input chunk. -struct MapPartition<MR: MapReducer, SinkGen: MRSinkGenerator> { +struct MapPartition<MR: MapReducer, MapInput: Iterator<Item = Record>, SinkGen: MRSinkGenerator> { mr: MR, params: MRParameters, input: MapInput, @@ -26,12 +24,12 @@ sorted_output: BTreeMap<String, Vec<String>>, } -impl<MR: MapReducer, SinkGen: MRSinkGenerator> MapPartition<MR, SinkGen> { +impl<MR: MapReducer, MapInput: Iterator<Item=Record>, SinkGen: MRSinkGenerator> MapPartition<MR, MapInput, SinkGen> { pub fn _new(params: MRParameters, input: MapInput, mr: MR, output: SinkGen) - -> MapPartition<MR, SinkGen> { + -> MapPartition<MR, MapInput, SinkGen> { MapPartition { mr: mr, params: params, @@ -47,11 +45,11 @@ self.write_output(); } - /// Sorts input into the sorted_input map, moving the records on the way - /// (so no copying happens and memory consumption stays low-ish) +/// Sorts input into the sorted_input map, moving the records on the way +/// (so no copying happens and memory consumption stays low-ish) fn sort_input(&mut self) { loop { - match self.input.pop_front() { + match self.input.next() { None => break, Some(record) => { self.sorted_input.insert(record.key, record.value); @@ -60,7 +58,7 @@ } } - /// Executes the mapping phase. +/// Executes the mapping phase. fn do_map(&mut self) { let mut key_buffer = Vec::with_capacity(self.params.key_buffer_size); @@ -92,7 +90,7 @@ } fn setup_output(&mut self) -> Vec<SinkGen::Sink> { - // Set up sharded outputs. +// Set up sharded outputs. let mut outputs = Vec::new(); for i in 0..self.params.reducers { @@ -195,7 +193,7 @@ let reducers = 3; let mp = MapPartition::_new(MRParameters::new().set_concurrency(4, reducers), - get_input(), + get_input().into_iter(), get_mr(), get_output()); mp._run();