Mercurial > lbo > hg > localmr
changeset 107:500571302783
Mega commit: Split up MR into M/R/S; more doc; mutable shard receiver
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Wed, 15 Jun 2016 19:27:29 +0200 |
parents | f4de6139bf2d |
children | 27afb9528618 |
files | src/closure_mr.rs src/controller.rs src/formats/writelog.rs src/lib.rs src/map.rs src/mapreducer.rs src/parameters.rs src/phases/map.rs src/phases/mod.rs src/phases/reduce.rs src/reduce.rs src/shard_merge.rs src/sort.rs |
diffstat | 13 files changed, 528 insertions(+), 494 deletions(-) [+] |
line wrap: on
line diff
--- a/src/closure_mr.rs Wed Jun 15 18:54:43 2016 +0200 +++ b/src/closure_mr.rs Wed Jun 15 19:27:29 2016 +0200 @@ -1,6 +1,6 @@ //! A MapReducer that uses supplied map()/reduce() functions. -use mapreducer::{MapReducer, MapperF, ReducerF, SharderF, _std_shard}; +use mapreducer::{Mapper, Reducer, Sharder, MapperF, ReducerF, SharderF, _std_shard}; use record_types::{Record, MultiRecord, MEmitter, REmitter}; /// This type implements the MapReducer trait. You can use it to provide your own functions to a @@ -37,14 +37,18 @@ } } -impl MapReducer for ClosureMapReducer { - fn map(&self, e: &mut MEmitter, r: Record) { +impl Mapper for ClosureMapReducer { + fn map(&mut self, e: &mut MEmitter, r: Record) { (self.mapper)(e, r) } - fn reduce(&self, e: &mut REmitter, r: MultiRecord) { +} +impl Reducer for ClosureMapReducer { + fn reduce(&mut self, e: &mut REmitter, r: MultiRecord) { (self.reducer)(e, r) } - fn shard(&self, n: usize, k: &String) -> usize { +} +impl Sharder for ClosureMapReducer { + fn shard(&mut self, n: usize, k: &String) -> usize { (self.sharder)(n, k) } }
--- a/src/controller.rs Wed Jun 15 18:54:43 2016 +0200 +++ b/src/controller.rs Wed Jun 15 19:27:29 2016 +0200 @@ -3,20 +3,22 @@ use formats::util::{SinkGenerator, RecordReadIterator}; use formats::writelog::{WriteLogGenerator, WriteLogReader}; use input_cache::InputCache; -use map::MapPartition; -use mapreducer::MapReducer; +use phases::map::MapPartition; +use mapreducer::{Mapper, Reducer, Sharder}; use parameters::MRParameters; use record_types::Record; -use reduce::ReducePartition; +use phases::reduce::ReducePartition; use std::sync::mpsc::sync_channel; extern crate scoped_threadpool; use self::scoped_threadpool::Pool; -pub struct MRController<MR: MapReducer> { +pub struct MRController<M: Mapper, R: Reducer, S: Sharder> { params: MRParameters, - mr: MR, + m: M, + r: R, + s: S, // How many map partitions have been run? map_partitions_run: usize, @@ -47,15 +49,21 @@ } -impl<MR: MapReducer + Send> MRController<MR> { +impl<M: Mapper, R: Reducer, S: Sharder> MRController<M, R, S> { /// Create a new mapreduce instance and execute it immediately. - pub fn run<In: Iterator<Item = Record>, Out: SinkGenerator>(mr: MR, + /// + /// You can use `DefaultSharder` as `sharder` argument. + pub fn run<In: Iterator<Item = Record>, Out: SinkGenerator>(mapper: M, + reducer: R, + sharder: S, params: MRParameters, inp: In, out: Out) { let mut controller = MRController { params: params, - mr: mr, + m: mapper, + r: reducer, + s: sharder, map_partitions_run: 0, }; controller.run_map(inp); @@ -63,23 +71,10 @@ controller.clean_up(); } - fn map_runner(mr: MR, params: MRParameters, inp: InputCache) { - if inp.len() == 0 { - return; - } - let intermed_out = WriteLogGenerator::new(); - let map_part = MapPartition::_new(params, inp, mr, intermed_out); - map_part._run(); - } - - fn read_map_input<In: Iterator<Item = Record>>(it: &mut In, approx_bytes: usize) -> InputCache { - - let inp_cache = InputCache::from_iter(8192, approx_bytes, it); - inp_cache - } - fn run_map<In: Iterator<Item = Record>>(&mut self, mut input: In) { let mut pool = Pool::new(self.params.mappers as u32); + // Create channels for worker synchronization; this ensures that there are only as many + // mapper threads running as specified. let (send, recv) = sync_channel(self.params.mappers); for _ in 0..self.params.mappers { @@ -90,9 +85,12 @@ loop { let _ = recv.recv(); - let mr = self.mr.clone(); - let inp = MRController::<MR>::read_map_input(&mut input, - self.params.map_partition_size); + let m = self.m.clone(); + let s = self.s.clone(); + // Can't necessarily send the input handle to the mapper thread, therefore read + // input before spawn. + let inp = MRController::<M, R, S>::read_map_input(&mut input, + self.params.map_partition_size); if inp.len() == 0 { break; @@ -102,7 +100,7 @@ let done = send.clone(); scope.execute(move || { - MRController::map_runner(mr, params, inp); + MRController::<M, R, S>::map_runner(m, s, params, inp); let _ = done.send(true); }); self.map_partitions_run += 1; @@ -112,12 +110,28 @@ }); } + fn map_runner(mapper: M, sharder: S, params: MRParameters, inp: InputCache) { + if inp.len() == 0 { + return; + } + let intermed_out = WriteLogGenerator::new(); + let map_part = MapPartition::_new(params, inp, mapper, sharder, intermed_out); + map_part._run(); + } + + fn read_map_input<In: Iterator<Item = Record>>(it: &mut In, approx_bytes: usize) -> InputCache { + + let inp_cache = InputCache::from_iter(8192, approx_bytes, it); + inp_cache + } + + fn run_reduce<Out: SinkGenerator>(&self, outp: Out) { let mut pool = Pool::new(self.params.reducers as u32); pool.scoped(move |scope| { for i in 0..self.params.reducers { - let mr = self.mr.clone(); + let r = self.r.clone(); let params = self.params.clone().set_shard_id(i); let map_partitions = self.map_partitions_run; let output = outp.clone(); @@ -125,7 +139,7 @@ scope.execute(move || { let inputs = open_reduce_inputs(¶ms, map_partitions, i); let output = output.new_output(&get_reduce_output_name(¶ms)); - let reduce_part = ReducePartition::new(mr, params, inputs, output); + let reduce_part = ReducePartition::new(r, params, inputs, output); reduce_part._run(); }); }
--- a/src/formats/writelog.rs Wed Jun 15 18:54:43 2016 +0200 +++ b/src/formats/writelog.rs Wed Jun 15 19:27:29 2016 +0200 @@ -16,15 +16,15 @@ /// which was to write a log of all write operations to a database. /// /// # WriteLog -/// +/// /// WriteLog is a persistent data structure designed to be written to disk /// that is a sequence of bytestring. /// It can be read back in relatively efficiently and yields the same byte /// strings; on disk, it is represented as records prefixed by 4 byte /// big-endian length prefixes: `llllbbbbbbllllbbllllbbbbbbbbb...` -/// +/// /// Where l is a length byte and b are bytes of a bytestring. -/// +/// /// There is a special case of WriteLogs: The length-prefixing can be turned /// off in order to yield a better efficiency when encoding PCK files. Those /// files are indexed by IDX files describing offset and length of single entries, @@ -88,8 +88,8 @@ // BUG: May not account the length in a correct way if the length prefix // is written, but not the record. let result = self.dest - .write(&encode_u32(buf.len() as u32)[0..4]) - .and(self.dest.write(buf)); + .write(&encode_u32(buf.len() as u32)[0..4]) + .and(self.dest.write(buf)); match result { Err(_) => result, Ok(_) => { @@ -179,8 +179,8 @@ continue; } Ok(f) => { - reader = Box::new(reader.chain(io::BufReader::with_capacity(1024 * 1024, - f))) + reader = + Box::new(reader.chain(io::BufReader::with_capacity(1024 * 1024, f))) } } } @@ -379,8 +379,8 @@ fn bench_a_writing() { let buf: vec::Vec<u8> = "aaabbbcccdddeeefffggghhhiiijjjkkklllmmmnnnoooppp" - .bytes() - .collect(); + .bytes() + .collect(); match WriteLogWriter::<fs::File>::new_to_file(&String::from("bench_file.wlg"), false) { Err(e) => panic!("{}", e),
--- a/src/lib.rs Wed Jun 15 18:54:43 2016 +0200 +++ b/src/lib.rs Wed Jun 15 19:27:29 2016 +0200 @@ -6,11 +6,11 @@ pub mod controller; pub mod formats; pub mod input_cache; -mod map; pub mod mapreducer; pub mod parameters; pub mod record_types; -mod reduce; + +mod phases; mod shard_merge; mod sort;
--- a/src/map.rs Wed Jun 15 18:54:43 2016 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,212 +0,0 @@ -//! Implements the Map phase. -//! - -#![allow(dead_code)] - -use std::collections::BTreeMap; -use std::fmt; -use std::io::Write; - -use formats::util::SinkGenerator; -use mapreducer::MapReducer; -use parameters::MRParameters; -use record_types::{Record, MEmitter}; -use sort::DictComparableString; - -/// This is the base of the mapping phase. It contains an input -/// and intermediary input and output forms. -/// Mapper threads run on this. Every mapper thread has one MapPartition -/// instance per input chunk. -pub struct MapPartition<MR: MapReducer, MapInput: Iterator<Item = Record>, SinkGen: SinkGenerator> { - mr: MR, - params: MRParameters, - input: MapInput, - sink: SinkGen, - sorted_input: BTreeMap<DictComparableString, String>, - sorted_output: BTreeMap<DictComparableString, Vec<String>>, -} - -impl<MR: MapReducer, MapInput: Iterator<Item=Record>, SinkGen: SinkGenerator> MapPartition<MR, MapInput, SinkGen> { - pub fn _new(params: MRParameters, - input: MapInput, - mr: MR, - output: SinkGen) - -> MapPartition<MR, MapInput, SinkGen> { - MapPartition { - mr: mr, - params: params, - input: input, - sink: output, - sorted_input: BTreeMap::new(), - sorted_output: BTreeMap::new(), - } - } - pub fn _run(mut self) { - self.sort_input(); - self.do_map(); - self.write_output(); - } - -/// Sorts input into the sorted_input map, moving the records on the way -/// (so no copying happens and memory consumption stays low-ish) - fn sort_input(&mut self) { - loop { - match self.input.next() { - None => break, - Some(record) => { - self.sorted_input.insert(DictComparableString::DCS(record.key), record.value); - } - } - } - } - -/// Executes the mapping phase. - fn do_map(&mut self) { - let mut key_buffer = Vec::with_capacity(self.params.key_buffer_size); - - loop { - for k in self.sorted_input.keys().take(self.params.key_buffer_size) { - key_buffer.push(k.clone()) - } - - for k in &key_buffer[..] { - let val; - match self.sorted_input.remove(k) { - None => continue, - Some(v) => val = v, - } - let mut e = MEmitter::new(); - self.mr.map(&mut e, - Record { - key: k.clone().unwrap(), - value: val, - }); - self.insert_result(e); - } - - if key_buffer.len() < self.params.key_buffer_size { - break; - } - key_buffer.clear(); - } - } - - fn setup_output(&mut self) -> Vec<SinkGen::Sink> { -// Set up sharded outputs. - let mut outputs = Vec::new(); - - for i in 0..self.params.reducers { - let out = self.sink.new_output(&fmt::format(format_args!("{}{}.{}", - self.params.map_output_location, - self.params.shard_id, - i))); - outputs.push(out); - } - assert_eq!(outputs.len(), self.params.reducers); - outputs - } - - fn write_output(&mut self) { - let mut outputs = self.setup_output(); - - for (k, vs) in self.sorted_output.iter() { - let shard = self.mr.shard(self.params.reducers, k.as_ref()); - - for v in vs { - let r1 = outputs[shard].write(k.as_ref().as_bytes()); - match r1 { - Err(e) => panic!("couldn't write map output: {}", e), - Ok(_) => (), - } - let r2 = outputs[shard].write(v.as_bytes()); - match r2 { - Err(e) => panic!("couldn't write map output: {}", e), - Ok(_) => (), - } - } - } - } - - fn insert_result(&mut self, emitter: MEmitter) { - for r in emitter._get() { - let e; - { - e = self.sorted_output.remove(&DictComparableString::wrap(r.key.clone())); - } - - match e { - None => { - self.sorted_output.insert(DictComparableString::wrap(r.key), vec![r.value]); - } - Some(mut v) => { - v.push(r.value); - self.sorted_output.insert(DictComparableString::wrap(r.key), v); - } - } - } - } -} - -#[cfg(test)] -mod tests { - use closure_mr::ClosureMapReducer; - use formats::util::PosRecordIterator; - use formats::lines::LinesSinkGenerator; - use map::MapPartition; - use record_types::{MEmitter, REmitter, Record, MultiRecord}; - use parameters::MRParameters; - use std::collections::LinkedList; - - fn mapper_func(e: &mut MEmitter, r: Record) { - for w in r.value.split_whitespace() { - e.emit(String::from(w), String::from("1")); - } - } - - fn reducer_func(_: &mut REmitter, _: MultiRecord) { - // no-op - } - - fn get_mr() -> ClosureMapReducer { - ClosureMapReducer::new(mapper_func, reducer_func) - } - - fn get_input() -> LinkedList<Record> { - let inp: Vec<String> = vec!["abc def", - "xy yz za", - "hello world", - "let's do this", - "foo bar baz"] - .iter() - .map(move |s| String::from(*s)) - .collect(); - let ri: PosRecordIterator<_> = PosRecordIterator::new(inp.into_iter()); - ri.collect() - } - - - fn get_output() -> LinesSinkGenerator { - LinesSinkGenerator::new_to_files() - } - - #[test] - fn test_map_partition() { - // use std::fmt::format; - // use std::fs; - - let reducers = 3; - let mp = MapPartition::_new(MRParameters::new() - .set_concurrency(4, reducers) - .set_file_locations(String::from("testdata/map_im_"), - String::from("testdata/result_")), - get_input().into_iter(), - get_mr(), - get_output()); - mp._run(); - - for _ in 0..reducers { - // let filename = format(format_args!("testdata/map_im_{}", i)); - // let _ = fs::remove_file(filename); - } - } -}
--- a/src/mapreducer.rs Wed Jun 15 18:54:43 2016 +0200 +++ b/src/mapreducer.rs Wed Jun 15 19:27:29 2016 +0200 @@ -23,20 +23,31 @@ /// the return value should be in [0; n). pub type SharderF = fn(usize, &String) -> usize; -/// A type implementing map() and reduce() functions. -/// The MapReducer is cloned once per mapper/reducer thread. -pub trait MapReducer: Clone { +pub trait Mapper: Send + Clone { /// Takes one <key,value> pair and an emitter. /// The emitter is used to yield results from the map phase. - fn map(&self, em: &mut MEmitter, record: Record); + /// + /// Note that this method takes a &mut self; you can use this to cache expensive objects + /// between runs (but not between shards!) + fn map(&mut self, em: &mut MEmitter, record: Record); +} + +pub trait Reducer: Send + Clone { /// Takes one key and one or more values and emits one or more /// values. - fn reduce(&self, em: &mut REmitter, records: MultiRecord); + /// + /// Note that this method takes a &mut self; you can use this to cache expensive objects + /// between runs (but not between shards!) + fn reduce(&mut self, em: &mut REmitter, records: MultiRecord); +} +pub trait Sharder: Send + Clone { /// Determines how to map keys to (reduce) shards. /// Returns a number in [0; n) determining the shard the key belongs in. /// The default implementation uses a simple hash (SipHasher) and modulo. - fn shard(&self, n: usize, key: &String) -> usize { + fn shard(&mut self, n: usize, key: &String) -> usize { _std_shard(n, key) } } + +pub struct DefaultSharder;
--- a/src/parameters.rs Wed Jun 15 18:54:43 2016 +0200 +++ b/src/parameters.rs Wed Jun 15 19:27:29 2016 +0200 @@ -22,6 +22,7 @@ } impl MRParameters { + /// Creates an instance with sane defaults. pub fn new() -> MRParameters { MRParameters { key_buffer_size: 256, @@ -71,15 +72,14 @@ self } - /// prealloc_size: How big are the groups of keys in the reduce phase expected to be? (used for pre-allocating - /// buffers) - /// Default 1. + /// prealloc_size: How big are the groups of keys in the reduce phase expected to be? + /// (used for pre-allocating buffers). Default 1. /// /// insensitive: Whether to group strings together that differ in case. When used, the first /// encountered key will be supplied as key to the reduce function. - /// BUG: This will not work correctly until the map phase delivers outputs in the correct order, i.e. - /// dictionary order. The default Ord implementation for String treats lower and upper case - /// very differently. Default: false. + /// BUG: This will not work correctly until the map phase delivers outputs in the correct order, + /// i.e. dictionary order. The default Ord implementation for String treats lower and upper + /// case very differently. Default: false. pub fn set_reduce_group_opts(mut self, prealloc_size: usize, insensitive: bool)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/phases/map.rs Wed Jun 15 19:27:29 2016 +0200 @@ -0,0 +1,218 @@ +//! Implements the Map phase. +//! + +#![allow(dead_code)] + +use std::collections::BTreeMap; +use std::fmt; +use std::io::Write; + +use formats::util::SinkGenerator; +use mapreducer::{Mapper, Sharder}; +use parameters::MRParameters; +use record_types::{Record, MEmitter}; +use sort::DictComparableString; + +/// This is the base of the mapping phase. It contains an input +/// and intermediary input and output forms. +/// Mapper threads run on this. Every mapper thread has one MapPartition +/// instance per input chunk. +pub struct MapPartition<M: Mapper, + S: Sharder, + MapInput: Iterator<Item = Record>, + SinkGen: SinkGenerator> +{ + m: M, + sharder: S, + params: MRParameters, + input: MapInput, + sink: SinkGen, + sorted_input: BTreeMap<DictComparableString, String>, + sorted_output: BTreeMap<DictComparableString, Vec<String>>, +} + +impl<M: Mapper, S: Sharder, MapInput: Iterator<Item=Record>, + SinkGen: SinkGenerator> MapPartition<M, S, MapInput, SinkGen> { + pub fn _new(params: MRParameters, + input: MapInput, + mapper: M, + sharder: S, + output: SinkGen) + -> MapPartition<M, S, MapInput, SinkGen> { + MapPartition { + m: mapper, + sharder: sharder, + params: params, + input: input, + sink: output, + sorted_input: BTreeMap::new(), + sorted_output: BTreeMap::new(), + } + } + pub fn _run(mut self) { + self.sort_input(); + self.do_map(); + self.write_output(); + } + +/// Sorts input into the sorted_input map, moving the records on the way +/// (so no copying happens and memory consumption stays low-ish) + fn sort_input(&mut self) { + loop { + match self.input.next() { + None => break, + Some(record) => { + self.sorted_input.insert(DictComparableString::DCS(record.key), record.value); + } + } + } + } + +/// Executes the mapping phase. + fn do_map(&mut self) { + let mut key_buffer = Vec::with_capacity(self.params.key_buffer_size); + + loop { + for k in self.sorted_input.keys().take(self.params.key_buffer_size) { + key_buffer.push(k.clone()) + } + + for k in &key_buffer[..] { + let val; + match self.sorted_input.remove(k) { + None => continue, + Some(v) => val = v, + } + let mut e = MEmitter::new(); + self.m.map(&mut e, + Record { + key: k.clone().unwrap(), + value: val, + }); + self.insert_result(e); + } + + if key_buffer.len() < self.params.key_buffer_size { + break; + } + key_buffer.clear(); + } + } + + fn setup_output(&mut self) -> Vec<SinkGen::Sink> { +// Set up sharded outputs. + let mut outputs = Vec::new(); + + for i in 0..self.params.reducers { + let out = self.sink.new_output( + &fmt::format(format_args!("{}{}.{}", + self.params.map_output_location, + self.params.shard_id, + i))); + outputs.push(out); + } + assert_eq!(outputs.len(), self.params.reducers); + outputs + } + + fn write_output(&mut self) { + let mut outputs = self.setup_output(); + + for (k, vs) in self.sorted_output.iter() { + let shard = self.sharder.shard(self.params.reducers, k.as_ref()); + + for v in vs { + let r1 = outputs[shard].write(k.as_ref().as_bytes()); + match r1 { + Err(e) => panic!("couldn't write map output: {}", e), + Ok(_) => (), + } + let r2 = outputs[shard].write(v.as_bytes()); + match r2 { + Err(e) => panic!("couldn't write map output: {}", e), + Ok(_) => (), + } + } + } + } + + fn insert_result(&mut self, emitter: MEmitter) { + for r in emitter._get() { + let e; + { + e = self.sorted_output.remove(&DictComparableString::wrap(r.key.clone())); + } + + match e { + None => { + self.sorted_output.insert(DictComparableString::wrap(r.key), vec![r.value]); + } + Some(mut v) => { + v.push(r.value); + self.sorted_output.insert(DictComparableString::wrap(r.key), v); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use closure_mr::ClosureMapReducer; + use formats::util::PosRecordIterator; + use formats::lines::LinesSinkGenerator; + use map::MapPartition; + use record_types::{MEmitter, REmitter, Record, MultiRecord}; + use parameters::MRParameters; + use std::collections::LinkedList; + + fn mapper_func(e: &mut MEmitter, r: Record) { + for w in r.value.split_whitespace() { + e.emit(String::from(w), String::from("1")); + } + } + + fn reducer_func(_: &mut REmitter, _: MultiRecord) { + // no-op + } + + fn get_mr() -> ClosureMapReducer { + ClosureMapReducer::new(mapper_func, reducer_func) + } + + fn get_input() -> LinkedList<Record> { + let inp: Vec<String> = + vec!["abc def", "xy yz za", "hello world", "let's do this", "foo bar baz"] + .iter() + .map(move |s| String::from(*s)) + .collect(); + let ri: PosRecordIterator<_> = PosRecordIterator::new(inp.into_iter()); + ri.collect() + } + + + fn get_output() -> LinesSinkGenerator { + LinesSinkGenerator::new_to_files() + } + + #[test] + fn test_map_partition() { + // use std::fmt::format; + // use std::fs; + + let reducers = 3; + let mp = MapPartition::_new(MRParameters::new() + .set_concurrency(4, reducers) + .set_file_locations(String::from("testdata/map_im_"), + String::from("testdata/result_")), + get_input().into_iter(), + get_mr(), + get_output()); + mp._run(); + + for _ in 0..reducers { + // let filename = format(format_args!("testdata/map_im_{}", i)); + // let _ = fs::remove_file(filename); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/phases/mod.rs Wed Jun 15 19:27:29 2016 +0200 @@ -0,0 +1,2 @@ +pub mod map; +pub mod reduce;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/phases/reduce.rs Wed Jun 15 19:27:29 2016 +0200 @@ -0,0 +1,218 @@ +//! Implements the Reduce phase. +//! + +use std::io; +use std::iter::Peekable; + +use mapreducer::Reducer; +use parameters::MRParameters; +use record_types::{Record, MultiRecord, REmitter}; +use shard_merge::ShardMergeIterator; + +pub struct ReducePartition<R: Reducer, InputIt: Iterator<Item = Record>, Sink: io::Write> { + r: R, + params: MRParameters, + // Maybe we want to genericize this to an Iterator<Item=Read> or so? This defers opening + // the files to the reduce shard itself. + srcs: Vec<InputIt>, + dstfile: Sink, +} + +impl<R: Reducer, InputIt: Iterator<Item = Record>, Sink: io::Write> ReducePartition<R, + InputIt, + Sink> { + /// Create a new Reduce partition for the given reducer R; source and destination I/O. + /// mr is the map/reduce functions. + /// params is generic MR parameters as well as some applying directly to this reduce partition. + /// srcs is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's + /// outputs. + /// dstfiles is a Sink (as known from the mapping phase) that is used to create the output + /// file (there is one output file per reduce partition, currently). + pub fn new(r: R, + params: MRParameters, + srcs: Vec<InputIt>, + outp: Sink) + -> ReducePartition<R, InputIt, Sink> { + ReducePartition { + r: r, + params: params, + srcs: srcs, + dstfile: outp, + } + } + + /// Run the Reduce partition. + pub fn _run(mut self) { + let mut inputs = Vec::new(); + inputs.append(&mut self.srcs); + let mut it = inputs.into_iter(); + + let params = self.params.clone(); + + self.reduce(RecordsToMultiRecords::new(ShardMergeIterator::build(&mut it), params)) + } + + fn reduce<RecIt: Iterator<Item = Record>>(mut self, inp: RecordsToMultiRecords<RecIt>) { + use std::io::Write; + + for multirec in inp { + let mut emitter = REmitter::new(); + self.r.reduce(&mut emitter, multirec); + + for result in emitter._get().into_iter() { + match self.dstfile.write(result.as_bytes()) { + Err(e) => { + println!("WARN: While reducing shard #{}: {}", + self.params.shard_id, + e) + } + Ok(_) => (), + } + } + } + } +} + +/// Iterator adapter: Converts an Iterator<Item=Record> into an Iterator<Item=MultiRecord> by +/// grouping subsequent records with identical key. +/// The original iterator must yield records in sorted order (or at least in an order where +/// identical items are adjacent). +pub struct RecordsToMultiRecords<It: Iterator<Item = Record>> { + it: Peekable<It>, + params: MRParameters, +} + +impl<It: Iterator<Item = Record>> RecordsToMultiRecords<It> { + fn new(it: It, params: MRParameters) -> RecordsToMultiRecords<It> { + RecordsToMultiRecords { + it: it.peekable(), + params: params, + } + } +} + +impl<It: Iterator<Item = Record>> Iterator for RecordsToMultiRecords<It> { + type Item = MultiRecord; + fn next(&mut self) -> Option<Self::Item> { + use std::ascii::AsciiExt; + let mut collection = Vec::with_capacity(self.params.reduce_group_prealloc_size); + let key: String; + match self.it.next() { + None => return None, + Some(r) => { + if self.params.reduce_group_insensitive { + key = r.key[..].to_ascii_lowercase(); + } else { + key = r.key + } + collection.push(r.value) + } + } + loop { + match self.it.peek() { + None => break, + Some(r) => { + if !self.params.reduce_group_insensitive && r.key != key { + break; + } else if self.params.reduce_group_insensitive && + r.key[..].to_ascii_lowercase() != key { + break; + } + } + } + collection.push(self.it.next().unwrap().value); + } + return Some(MultiRecord::new(key, collection)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use closure_mr::ClosureMapReducer; + use formats::lines::LinesSinkGenerator; + use formats::util::SinkGenerator; + use parameters::MRParameters; + use record_types::*; + + use std::vec; + + fn get_records() -> Vec<Record> { + vec![mk_rcrd("aaa", "def"), + mk_rcrd("abb", "111"), + mk_rcrd("Abb", "112"), + mk_rcrd("abbb", "113"), + mk_rcrd("abc", "xyz"), + mk_rcrd("xyz", "___"), + mk_rcrd("xyz", "__foo"), + mk_rcrd("xyz", "---")] + } + + #[test] + fn test_grouping_iterator() { + let records = get_records(); + let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> = + RecordsToMultiRecords::new(records.into_iter(), + MRParameters::new().set_reduce_group_opts(2, true)); + + let lengths = vec![1, 2, 1, 1, 3]; + let mut i = 0; + + for multirec in group_it { + assert_eq!(multirec.into_iter().count(), lengths[i]); + i += 1; + } + } + + #[test] + fn test_grouping_iterator_sensitive() { + let records = get_records(); + let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> = + RecordsToMultiRecords::new(records.into_iter(), + MRParameters::new().set_reduce_group_opts(2, false)); + + let lengths = vec![1, 1, 1, 1, 1, 3]; + let mut i = 0; + + for multirec in group_it { + assert_eq!(multirec.into_iter().count(), lengths[i]); + i += 1; + } + } + + fn test_reducer(e: &mut REmitter, recs: MultiRecord) { + use std::fmt::Write; + use std::borrow::Borrow; + + let mut out = String::with_capacity(32); + let _ = out.write_fmt(format_args!("{}:", recs.key())); + + for val in recs { + let _ = out.write_str(" "); + let _ = out.write_str(val.borrow()); + } + + e.emit(out); + } + + fn fake_mapper(_: &mut MEmitter, _: Record) {} + + #[test] + fn test_reduce() { + let mr = ClosureMapReducer::new(fake_mapper, test_reducer); + let params = MRParameters::new() + .set_shard_id(42) + .set_reduce_group_opts(1, true) + .set_file_locations(String::from("testdata/map_intermed_"), + String::from("testdata/result_")); + let srcs = vec![get_records().into_iter()]; + let dst = LinesSinkGenerator::new_to_files(); + + let r = ReducePartition::new(mr, + params, + srcs, + dst.new_output(&String::from("testdata/result_0"))); + r._run(); + } +}
--- a/src/reduce.rs Wed Jun 15 18:54:43 2016 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,219 +0,0 @@ -//! Implements the Reduce phase. -//! - -use std::io; -use std::iter::Peekable; - -use mapreducer::MapReducer; -use parameters::MRParameters; -use record_types::{Record, MultiRecord, REmitter}; -use shard_merge::ShardMergeIterator; - -pub struct ReducePartition<MR: MapReducer, InputIt: Iterator<Item = Record>, Sink: io::Write> { - mr: MR, - params: MRParameters, - // Maybe we want to genericize this to an Iterator<Item=Read> or so? This defers opening - // the files to the reduce shard itself. - srcs: Vec<InputIt>, - dstfile: Sink, -} - -impl<MR: MapReducer, InputIt: Iterator<Item = Record>, Sink: io::Write> ReducePartition<MR, - InputIt, - Sink> - { - /// Create a new Reduce partition for the given MR; source and destination I/O. - /// mr is the map/reduce functions. - /// params is generic MR parameters as well as some applying directly to this reduce partition. - /// srcs is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's - /// outputs. - /// dstfiles is a Sink (as known from the mapping phase) that is used to create the output - /// file (there is one output file per reduce partition, currently). - pub fn new(mr: MR, - params: MRParameters, - srcs: Vec<InputIt>, - outp: Sink) - -> ReducePartition<MR, InputIt, Sink> { - ReducePartition { - mr: mr, - params: params, - srcs: srcs, - dstfile: outp, - } - } - - /// Run the Reduce partition. - pub fn _run(mut self) { - let mut inputs = Vec::new(); - inputs.append(&mut self.srcs); - let mut it = inputs.into_iter(); - - let params = self.params.clone(); - - self.reduce(RecordsToMultiRecords::new(ShardMergeIterator::build(&mut it), params)) - } - - fn reduce<RecIt: Iterator<Item = Record>>(mut self, inp: RecordsToMultiRecords<RecIt>) { - use std::io::Write; - - for multirec in inp { - let mut emitter = REmitter::new(); - self.mr.reduce(&mut emitter, multirec); - - for result in emitter._get().into_iter() { - match self.dstfile.write(result.as_bytes()) { - Err(e) => { - println!("WARN: While reducing shard #{}: {}", - self.params.shard_id, - e) - } - Ok(_) => (), - } - } - } - } -} - -/// Iterator adapter: Converts an Iterator<Item=Record> into an Iterator<Item=MultiRecord> by -/// grouping subsequent records with identical key. -/// The original iterator must yield records in sorted order (or at least in an order where -/// identical items are adjacent). -pub struct RecordsToMultiRecords<It: Iterator<Item = Record>> { - it: Peekable<It>, - params: MRParameters, -} - -impl<It: Iterator<Item = Record>> RecordsToMultiRecords<It> { - fn new(it: It, params: MRParameters) -> RecordsToMultiRecords<It> { - RecordsToMultiRecords { - it: it.peekable(), - params: params, - } - } -} - -impl<It: Iterator<Item = Record>> Iterator for RecordsToMultiRecords<It> { - type Item = MultiRecord; - fn next(&mut self) -> Option<Self::Item> { - use std::ascii::AsciiExt; - let mut collection = Vec::with_capacity(self.params.reduce_group_prealloc_size); - let key: String; - match self.it.next() { - None => return None, - Some(r) => { - if self.params.reduce_group_insensitive { - key = r.key[..].to_ascii_lowercase(); - } else { - key = r.key - } - collection.push(r.value) - } - } - loop { - match self.it.peek() { - None => break, - Some(r) => { - if !self.params.reduce_group_insensitive && r.key != key { - break; - } else if self.params.reduce_group_insensitive && - r.key[..].to_ascii_lowercase() != key { - break; - } - } - } - collection.push(self.it.next().unwrap().value); - } - return Some(MultiRecord::new(key, collection)); - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use closure_mr::ClosureMapReducer; - use formats::lines::LinesSinkGenerator; - use formats::util::SinkGenerator; - use parameters::MRParameters; - use record_types::*; - - use std::vec; - - fn get_records() -> Vec<Record> { - vec![mk_rcrd("aaa", "def"), - mk_rcrd("abb", "111"), - mk_rcrd("Abb", "112"), - mk_rcrd("abbb", "113"), - mk_rcrd("abc", "xyz"), - mk_rcrd("xyz", "___"), - mk_rcrd("xyz", "__foo"), - mk_rcrd("xyz", "---")] - } - - #[test] - fn test_grouping_iterator() { - let records = get_records(); - let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> = - RecordsToMultiRecords::new(records.into_iter(), - MRParameters::new().set_reduce_group_opts(2, true)); - - let lengths = vec![1, 2, 1, 1, 3]; - let mut i = 0; - - for multirec in group_it { - assert_eq!(multirec.into_iter().count(), lengths[i]); - i += 1; - } - } - - #[test] - fn test_grouping_iterator_sensitive() { - let records = get_records(); - let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> = - RecordsToMultiRecords::new(records.into_iter(), - MRParameters::new().set_reduce_group_opts(2, false)); - - let lengths = vec![1, 1, 1, 1, 1, 3]; - let mut i = 0; - - for multirec in group_it { - assert_eq!(multirec.into_iter().count(), lengths[i]); - i += 1; - } - } - - fn test_reducer(e: &mut REmitter, recs: MultiRecord) { - use std::fmt::Write; - use std::borrow::Borrow; - - let mut out = String::with_capacity(32); - let _ = out.write_fmt(format_args!("{}:", recs.key())); - - for val in recs { - let _ = out.write_str(" "); - let _ = out.write_str(val.borrow()); - } - - e.emit(out); - } - - fn fake_mapper(_: &mut MEmitter, _: Record) {} - - #[test] - fn test_reduce() { - let mr = ClosureMapReducer::new(fake_mapper, test_reducer); - let params = MRParameters::new() - .set_shard_id(42) - .set_reduce_group_opts(1, true) - .set_file_locations(String::from("testdata/map_intermed_"), - String::from("testdata/result_")); - let srcs = vec![get_records().into_iter()]; - let dst = LinesSinkGenerator::new_to_files(); - - let r = ReducePartition::new(mr, - params, - srcs, - dst.new_output(&String::from("testdata/result_0"))); - r._run(); - } -}
--- a/src/shard_merge.rs Wed Jun 15 18:54:43 2016 +0200 +++ b/src/shard_merge.rs Wed Jun 15 19:27:29 2016 +0200 @@ -207,7 +207,7 @@ get_collection_4(), get_collection_5(), get_collection_6()] - .into_iter()); + .into_iter()); let mut cmp = 0; let mut cnt = 0; @@ -241,7 +241,7 @@ let merge_it = ShardMergeIterator::build_with_cmp(&mut files.into_iter(), sort::dict_string_compare); let mut outfile = lines::LinesWriter::new_to_file(&String::from("testdata/all_sorted.txt")) - .unwrap(); + .unwrap(); for line in merge_it { let _ = outfile.write(line.as_bytes());
--- a/src/sort.rs Wed Jun 15 18:54:43 2016 +0200 +++ b/src/sort.rs Wed Jun 15 19:27:29 2016 +0200 @@ -105,16 +105,14 @@ impl PartialOrd for DictComparableString { fn partial_cmp(&self, other: &DictComparableString) -> Option<Ordering> { - let (&DictComparableString::DCS(ref a), - &DictComparableString::DCS(ref b)) = (self, other); + let (&DictComparableString::DCS(ref a), &DictComparableString::DCS(ref b)) = (self, other); Some(dict_string_compare(a, b)) } } impl Ord for DictComparableString { fn cmp(&self, other: &DictComparableString) -> Ordering { - let (&DictComparableString::DCS(ref a), - &DictComparableString::DCS(ref b)) = (self, other); + let (&DictComparableString::DCS(ref a), &DictComparableString::DCS(ref b)) = (self, other); dict_string_compare(a, b) } }