Mercurial > lbo > hg > localmr
changeset 89:dc8d7e8f4ba1
Add MapReduce controller, the heart of the library
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 07 Feb 2016 15:14:56 +0000 |
parents | f35dc4bc24eb |
children | 9853a14b08fb |
files | src/controller.rs |
diffstat | 1 files changed, 144 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/controller.rs Sun Feb 07 15:14:56 2016 +0000 @@ -0,0 +1,144 @@ +//! Controls the execution of a mapreduce instance. + +use formats::util::{SinkGenerator, RecordReadIterator}; +use formats::writelog::{WriteLogGenerator, WriteLogReader}; +use map::MapPartition; +use mapreducer::MapReducer; +use parameters::MRParameters; +use record_types::Record; +use reduce::ReducePartition; + +use std::collections::LinkedList; +use std::sync::mpsc::sync_channel; + +extern crate scoped_threadpool; +use self::scoped_threadpool::Pool; + +pub struct MRController<MR: MapReducer> { + params: MRParameters, + mr: MR, + + // How many map partitions have been run? + map_partitions_run: usize, +} + +/// Calculates the name of a reduce output shard from the parameters. +fn get_reduce_output_name(params: &MRParameters) -> String { + use std::fmt; + let mut name = String::new(); + name.push_str(¶ms.reduce_output_shard_prefix[..]); + name.push_str(&fmt::format(format_args!("{}", params.shard_id))[..]); + name +} + +fn open_reduce_inputs(params: &MRParameters, + partitions: usize, + shard: usize) + -> Vec<RecordReadIterator<WriteLogReader>> { + use std::fmt; + let mut inputs = Vec::new(); + + for part in 0..partitions { + let name = fmt::format(format_args!("{}{}.{}", params.map_output_location, shard, part)); + let wlg_reader = WriteLogReader::new_from_file(&name).unwrap(); + inputs.push(RecordReadIterator::new(wlg_reader)); + } + inputs +} + + +impl<MR: MapReducer + Send> MRController<MR> { + /// Create a new mapreduce instance and execute it immediately. + pub fn run<In: Iterator<Item = Record>, Out: SinkGenerator>(mr: MR, + params: MRParameters, + inp: In, + out: Out) { + let mut controller = MRController { + params: params, + mr: mr, + map_partitions_run: 0, + }; + controller.run_map(inp); + controller.run_reduce(out); + } + + fn map_runner(mr: MR, params: MRParameters, inp: LinkedList<Record>) { + if inp.len() == 0 { + return; + } + let intermed_out = WriteLogGenerator::new(); + let map_part = MapPartition::_new(params, inp.into_iter(), mr, intermed_out); + map_part._run(); + } + + fn read_map_input<In: Iterator<Item = Record>>(it: &mut In, + approx_bytes: usize) + -> LinkedList<Record> { + let mut ll = LinkedList::new(); + let mut bytes_read: usize = 0; + + for r in it { + bytes_read += r.key.len() + r.value.len() + 4; // Heuristics :P + ll.push_back(r); + + if bytes_read > approx_bytes { + break; + } + } + ll + } + + fn run_map<In: Iterator<Item = Record>>(&mut self, mut input: In) { + let mut pool = Pool::new(self.params.mappers as u32); + let (send, recv) = sync_channel(self.params.mappers); + + for _ in 0..self.params.mappers { + let _ = send.send(true); + } + + pool.scoped(move |scope| { + loop { + let _ = recv.recv(); + + let mr = self.mr.clone(); + let inp = MRController::<MR>::read_map_input(&mut input, + self.params.map_partition_size); + + if inp.len() == 0 { + break; + } + + let params = self.params.clone().set_shard_id(self.map_partitions_run as usize); + let done = send.clone(); + + scope.execute(move || { + MRController::map_runner(mr, params, inp); + let _ = done.send(true); + }); + self.map_partitions_run += 1; + } + + scope.join_all(); + }); + } + + fn run_reduce<Out: SinkGenerator>(&self, outp: Out) { + let mut pool = Pool::new(self.params.reducers as u32); + + pool.scoped(move |scope| { + for i in 0..self.params.reducers { + let mr = self.mr.clone(); + let params = self.params.clone().set_shard_id(i); + let map_partitions = self.map_partitions_run; + let output = outp.clone(); + + scope.execute(move || { + let inputs = open_reduce_inputs(¶ms, map_partitions, i); + let output = output.new_output(&get_reduce_output_name(¶ms)); + let reduce_part = ReducePartition::new(mr, params, inputs, output); + reduce_part._run(); + }); + } + }); + } +}