Mercurial > lbo > hg > localmr
changeset 31:ce0ed262a105
Add sharded output to mapping phase
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 31 Jan 2016 19:32:24 +0000 |
parents | 03ea3c83f499 |
children | 9e6cdeb49c8c |
files | src/map.rs |
diffstat | 1 files changed, 37 insertions(+), 18 deletions(-) [+] |
line wrap: on
line diff
--- a/src/map.rs Sun Jan 31 17:05:33 2016 +0000 +++ b/src/map.rs Sun Jan 31 19:32:24 2016 +0000 @@ -3,9 +3,12 @@ #![allow(dead_code)] +use std::fmt; use std::io::Write; use std::collections::{LinkedList, BTreeMap}; use mapreducer::{Record, MapReducer, MEmitter}; +use parameters::MRParameters; +use formats::util::MRSinkGenerator; type MapInput = LinkedList<Record>; @@ -13,20 +16,22 @@ /// and intermediary input and output forms. /// Mapper threads run on this. Every mapper thread has one MapPartition /// instance per input chunk. -struct MapPartition<MR: MapReducer> { +struct MapPartition<MR: MapReducer, SinkGen: MRSinkGenerator> { mr: MR, + params: MRParameters, input: MapInput, - output: Box<Write>, + sink: SinkGen, sorted_input: BTreeMap<String, String>, sorted_output: BTreeMap<String, Vec<String>>, } -impl<MR: MapReducer> MapPartition<MR> { - pub fn _new(input: MapInput, mr: MR, output: Box<Write>) -> MapPartition<MR> { +impl<MR: MapReducer, SinkGen: MRSinkGenerator> MapPartition<MR, SinkGen> { + pub fn _new(params: MRParameters, input: MapInput, mr: MR, output: SinkGen) -> MapPartition<MR, SinkGen> { MapPartition { mr: mr, + params: params, input: input, - output: output, + sink: output, sorted_input: BTreeMap::new(), sorted_output: BTreeMap::new(), } @@ -52,12 +57,10 @@ /// Executes the mapping phase. fn do_map(&mut self) { - // TODO: Make this configurable - let key_buffer_size: usize = 256; - let mut key_buffer = Vec::with_capacity(key_buffer_size); + let mut key_buffer = Vec::with_capacity(self.params.key_buffer_size); loop { - for k in self.sorted_input.keys().take(key_buffer_size) { + for k in self.sorted_input.keys().take(self.params.key_buffer_size) { key_buffer.push(k.clone()) } @@ -76,22 +79,38 @@ self.insert_result(e); } - if key_buffer.len() < key_buffer_size { + if key_buffer.len() < self.params.key_buffer_size { break; } key_buffer.clear(); } } + fn setup_output(&mut self) -> Vec<SinkGen::Sink> { + // Set up sharded outputs. + let mut outputs = Vec::new(); + + for i in 0..self.params.reducers { + let out = self.sink.new_output(&fmt::format(format_args!("mapout_{}.{}", self.params.shard_id, i))); + outputs.push(out); + } + assert_eq!(outputs.len(), self.params.reducers); + outputs + } + fn write_output(&mut self) { + let mut outputs = self.setup_output(); + for (k, vs) in self.sorted_output.iter() { + let shard = self.mr.shard(self.params.reducers, k); + for v in vs { - let r1 = self.output.write(k.as_bytes()); + let r1 = outputs[shard].write(k.as_bytes()); match r1 { Err(e) => panic!("couldn't write map output: {}", e), Ok(_) => (), } - let r2 = self.output.write(v.as_bytes()); + let r2 = outputs[shard].write(v.as_bytes()); match r2 { Err(e) => panic!("couldn't write map output: {}", e), Ok(_) => (), @@ -123,11 +142,12 @@ #[cfg(test)] mod tests { use closure_mr::ClosureMapReducer; - use formats::util::RecordIterator; + use formats::util::{RecordIterator}; + use formats::lines::LinesSinkGenerator; use map::MapPartition; use mapreducer::{MEmitter, REmitter, Record, MultiRecord}; + use parameters::MRParameters; use std::collections::LinkedList; - use std::io::Write; fn mapper_func(e: &mut MEmitter, r: Record) { for w in r.value.split_whitespace() { @@ -157,14 +177,13 @@ } - fn get_output() -> Box<Write> { - use formats::lines::LinesWriter; - Box::new(LinesWriter::new_to_file(&String::from("mapphase_1.wlg")).unwrap()) + fn get_output() -> LinesSinkGenerator { + LinesSinkGenerator::new(&String::from("test_map_")) } #[test] fn test_map_partition() { - let mp = MapPartition::_new(get_input(), get_mr(), get_output()); + let mp = MapPartition::_new(MRParameters::new().set_concurrency(4, 3), get_input(), get_mr(), get_output()); mp._run(); } }