Mercurial > lbo > hg > localmr
changeset 87:168e8777de73
Some refactoring
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 07 Feb 2016 15:14:22 +0000 |
parents | 4a111475534f |
children | f35dc4bc24eb |
files | src/formats/lines.rs src/formats/util.rs src/formats/writelog.rs src/lib.rs src/map.rs src/reduce.rs |
diffstat | 6 files changed, 49 insertions(+), 48 deletions(-) [+] |
line wrap: on
line diff
--- a/src/formats/lines.rs Sun Feb 07 15:13:45 2016 +0000 +++ b/src/formats/lines.rs Sun Feb 07 15:14:22 2016 +0000 @@ -98,14 +98,17 @@ } } -/// An MRSinkGenerator type that uses a simple path as base +/// An SinkGenerator type that uses a simple path as base /// and creates text files based on it. #[allow(dead_code)] +#[derive(Clone)] pub struct LinesSinkGenerator { // bogus field i: i32, } +unsafe impl Send for LinesSinkGenerator {} + impl LinesSinkGenerator { /// Use either a path like `/a/b/c/` to generate files in a directory /// or `/a/b/c/file_prefix_` to create files with that prefix. @@ -114,9 +117,9 @@ } } -impl util::MRSinkGenerator for LinesSinkGenerator { +impl util::SinkGenerator for LinesSinkGenerator { type Sink = LinesWriter<fs::File>; - fn new_output(&mut self, p: &String) -> Self::Sink { + fn new_output(&self, p: &String) -> Self::Sink { let f = fs::OpenOptions::new().write(true).truncate(true).create(true).open(p); match f { Err(e) => panic!("Couldn't open lines output file {}: {}", p, e), @@ -128,7 +131,7 @@ #[cfg(test)] mod test { use formats::lines; - use formats::util::MRSinkGenerator; + use formats::util::SinkGenerator; use std::fs; use std::io::Write; @@ -168,7 +171,7 @@ #[test] fn test_write_lines() { let line = String::from("abc def hello world"); - let mut gen = lines::LinesSinkGenerator::new_to_files(); + let gen = lines::LinesSinkGenerator::new_to_files(); let mut f = gen.new_output(&String::from("testdata/writelines_1")); for _ in 0..10 {
--- a/src/formats/util.rs Sun Feb 07 15:13:45 2016 +0000 +++ b/src/formats/util.rs Sun Feb 07 15:14:22 2016 +0000 @@ -9,21 +9,21 @@ /// records with the key being the position of the current record, starting with /// 1. Mainly used as input iterator in the mapping phase, from sources that only /// yield values (no keys). -pub struct RecordIterator<I: Iterator<Item = String>> { +pub struct PosRecordIterator<I: Iterator<Item = String>> { i: I, counter: u64, } -impl<I: Iterator<Item = String>> RecordIterator<I> { - pub fn new(it: I) -> RecordIterator<I> { - RecordIterator { +impl<I: Iterator<Item = String>> PosRecordIterator<I> { + pub fn new(it: I) -> PosRecordIterator<I> { + PosRecordIterator { i: it, counter: 0, } } } -impl<I: Iterator<Item = String>> Iterator for RecordIterator<I> { +impl<I: Iterator<Item = String>> Iterator for PosRecordIterator<I> { type Item = Record; fn next(&mut self) -> Option<Record> { match self.i.next() { @@ -42,17 +42,17 @@ /// Another transformation of [string] -> [(string,string)]; however, /// this one always reads one value, treats it as key, and another one, /// treated as value. -pub struct KVReadIterator<I: Iterator<Item = String>> { +pub struct RecordReadIterator<I: Iterator<Item = String>> { i: I, } -impl<I: Iterator<Item = String>> KVReadIterator<I> { - pub fn new(it: I) -> KVReadIterator<I> { - KVReadIterator { i: it } +impl<I: Iterator<Item = String>> RecordReadIterator<I> { + pub fn new(it: I) -> RecordReadIterator<I> { + RecordReadIterator { i: it } } } -impl<I: Iterator<Item = String>> Iterator for KVReadIterator<I> { +impl<I: Iterator<Item = String>> Iterator for RecordReadIterator<I> { type Item = Record; fn next(&mut self) -> Option<Record> { let (k, v) = (self.i.next(), self.i.next()); @@ -69,12 +69,16 @@ } } -/// A type implementing MRSinkGenerator is used at the end of the reducer +/// A type implementing SinkGenerator is used at the end of the reducer /// phase to write the output. Given a name, new() should return a new object /// that can be used to write the output of a reduce partition. /// Values are always written as a whole to the writer. -pub trait MRSinkGenerator { +/// +/// SinkGenerator types are used in general to determine the format of outputs; existing options +/// are plain text files (LinesSinkGenerator) or length-prefixed binary files (WriteLogGenerator). +pub trait SinkGenerator: Send + Clone { type Sink: io::Write; - /// Return a new output. - fn new_output(&mut self, name: &String) -> Self::Sink; + /// Return a new output identified by name. The existing sink generators use `name` to open + /// files with that name (or path). + fn new_output(&self, name: &String) -> Self::Sink; }
--- a/src/formats/writelog.rs Sun Feb 07 15:13:45 2016 +0000 +++ b/src/formats/writelog.rs Sun Feb 07 15:14:22 2016 +0000 @@ -10,7 +10,7 @@ use std::vec; use std::string; -use formats::util::MRSinkGenerator; +use formats::util::SinkGenerator; /// A length-prefixed record stream named for the original use case, /// which was to write a log of all write operations to a database. @@ -108,19 +108,22 @@ /// Like LinesSinkGenerator, opens new WriteLogWriters that write /// to files with the name given to new_output(). That name is in general based on the MRParameters /// supplied to a mapreduce instance. +#[derive(Clone)] pub struct WriteLogGenerator { i: i32, } +unsafe impl Send for WriteLogGenerator {} + impl WriteLogGenerator { pub fn new() -> WriteLogGenerator { WriteLogGenerator { i: 0 } } } -impl MRSinkGenerator for WriteLogGenerator { +impl SinkGenerator for WriteLogGenerator { type Sink = WriteLogWriter<fs::File>; - fn new_output(&mut self, path: &String) -> Self::Sink { + fn new_output(&self, path: &String) -> Self::Sink { let writer = WriteLogWriter::<fs::File>::new_to_file(path, false); match writer { Err(e) => panic!("Could not open {}: {}", path, e), @@ -138,7 +141,7 @@ } impl WriteLogReader { - pub fn new(src: Box<Read>) -> WriteLogReader { + pub fn new(src: Box<Read+Send>) -> WriteLogReader { WriteLogReader { src: src, records_read: 0,
--- a/src/lib.rs Sun Feb 07 15:13:45 2016 +0000 +++ b/src/lib.rs Sun Feb 07 15:14:22 2016 +0000 @@ -3,6 +3,7 @@ //! pub mod closure_mr; +pub mod controller; pub mod formats; pub mod map; pub mod mapreducer;
--- a/src/map.rs Sun Feb 07 15:13:45 2016 +0000 +++ b/src/map.rs Sun Feb 07 15:14:22 2016 +0000 @@ -7,7 +7,7 @@ use std::fmt; use std::io::Write; -use formats::util::MRSinkGenerator; +use formats::util::SinkGenerator; use mapreducer::MapReducer; use parameters::MRParameters; use record_types::{Record, MEmitter}; @@ -16,7 +16,7 @@ /// and intermediary input and output forms. /// Mapper threads run on this. Every mapper thread has one MapPartition /// instance per input chunk. -struct MapPartition<MR: MapReducer, MapInput: Iterator<Item = Record>, SinkGen: MRSinkGenerator> { +pub struct MapPartition<MR: MapReducer, MapInput: Iterator<Item = Record>, SinkGen: SinkGenerator> { mr: MR, params: MRParameters, input: MapInput, @@ -25,7 +25,7 @@ sorted_output: BTreeMap<String, Vec<String>>, } -impl<MR: MapReducer, MapInput: Iterator<Item=Record>, SinkGen: MRSinkGenerator> MapPartition<MR, MapInput, SinkGen> { +impl<MR: MapReducer, MapInput: Iterator<Item=Record>, SinkGen: SinkGenerator> MapPartition<MR, MapInput, SinkGen> { pub fn _new(params: MRParameters, input: MapInput, mr: MR, @@ -149,7 +149,7 @@ #[cfg(test)] mod tests { use closure_mr::ClosureMapReducer; - use formats::util::RecordIterator; + use formats::util::PosRecordIterator; use formats::lines::LinesSinkGenerator; use map::MapPartition; use record_types::{MEmitter, REmitter, Record, MultiRecord}; @@ -179,7 +179,7 @@ .iter() .map(move |s| String::from(*s)) .collect(); - let ri: RecordIterator<_> = RecordIterator::new(inp.into_iter()); + let ri: PosRecordIterator<_> = PosRecordIterator::new(inp.into_iter()); ri.collect() }
--- a/src/reduce.rs Sun Feb 07 15:13:45 2016 +0000 +++ b/src/reduce.rs Sun Feb 07 15:14:22 2016 +0000 @@ -1,9 +1,9 @@ //! Implements the Reduce phase. //! +use std::io; use std::iter::Peekable; -use formats::util::MRSinkGenerator; use mapreducer::MapReducer; use parameters::MRParameters; use record_types::{Record, MultiRecord, REmitter}; @@ -11,26 +11,26 @@ pub struct ReducePartition<MR: MapReducer, InputIt: Iterator<Item = Record>, - SinkGen: MRSinkGenerator> + Sink: io::Write> { mr: MR, params: MRParameters, // Maybe we want to genericize this to an Iterator<Item=Read> or so? This defers opening // the files to the reduce shard itself. srcs: Vec<InputIt>, - dstfilegen: SinkGen, + dstfile: Sink, } -impl<MR: MapReducer, InputIt: Iterator<Item=Record>, SinkGen: MRSinkGenerator> ReducePartition<MR, InputIt, SinkGen> { +impl<MR: MapReducer, InputIt: Iterator<Item=Record>, Sink: io::Write> ReducePartition<MR, InputIt, Sink> { /// Create a new Reduce partition for the given MR; source and destination I/O. /// mr is the map/reduce functions. /// params is generic MR parameters as well as some applying directly to this reduce partition. /// srcs is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's /// outputs. -/// dstfiles is a SinkGen (as known from the mapping phase) that is used to create the output +/// dstfiles is a Sink (as known from the mapping phase) that is used to create the output /// file (there is one output file per reduce partition, currently). - pub fn new(mr: MR, params: MRParameters, srcs: Vec<InputIt>, dstfiles: SinkGen) -> ReducePartition<MR, InputIt, SinkGen> { - ReducePartition { mr: mr, params: params, srcs: srcs, dstfilegen: dstfiles } + pub fn new(mr: MR, params: MRParameters, srcs: Vec<InputIt>, outp: Sink) -> ReducePartition<MR, InputIt, Sink> { + ReducePartition { mr: mr, params: params, srcs: srcs, dstfile: outp} } /// Run the Reduce partition. @@ -44,26 +44,15 @@ self.reduce(RecordsToMultiRecords::new(ShardMergeIterator::build(&mut it), params)) } - fn get_output_name(&self) -> String { - use std::fmt; - let mut name = String::new(); - name.push_str(&self.params.reduce_output_shard_prefix[..]); - name.push_str(&fmt::format(format_args!("{}", self.params.shard_id))[..]); - name - } - fn reduce<RecIt: Iterator<Item=Record>>(mut self, inp: RecordsToMultiRecords<RecIt>) { use std::io::Write; - let name = self.get_output_name(); - let mut outp = self.dstfilegen.new_output(&name); - for multirec in inp { let mut emitter = REmitter::new(); self.mr.reduce(&mut emitter, multirec); for result in emitter._get().into_iter() { - match outp.write(result.as_bytes()) { + match self.dstfile.write(result.as_bytes()) { Err(e) => println!("WARN: While reducing shard #{}: {}", self.params.shard_id, e), Ok(_) => () } @@ -131,6 +120,7 @@ use closure_mr::ClosureMapReducer; use formats::lines::LinesSinkGenerator; + use formats::util::SinkGenerator; use parameters::MRParameters; use record_types::*; @@ -207,7 +197,7 @@ let srcs = vec![get_records().into_iter()]; let dst = LinesSinkGenerator::new_to_files(); - let r = ReducePartition::new(mr, params, srcs, dst); + let r = ReducePartition::new(mr, params, srcs, dst.new_output(&String::from("0"))); r._run(); } }