changeset 87:168e8777de73

Some refactoring
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 07 Feb 2016 15:14:22 +0000
parents 4a111475534f
children f35dc4bc24eb
files src/formats/lines.rs src/formats/util.rs src/formats/writelog.rs src/lib.rs src/map.rs src/reduce.rs
diffstat 6 files changed, 49 insertions(+), 48 deletions(-) [+]
line wrap: on
line diff
--- a/src/formats/lines.rs	Sun Feb 07 15:13:45 2016 +0000
+++ b/src/formats/lines.rs	Sun Feb 07 15:14:22 2016 +0000
@@ -98,14 +98,17 @@
     }
 }
 
-/// An MRSinkGenerator type that uses a simple path as base
+/// An SinkGenerator type that uses a simple path as base
 /// and creates text files based on it.
 #[allow(dead_code)]
+#[derive(Clone)]
 pub struct LinesSinkGenerator {
     // bogus field
     i: i32,
 }
 
+unsafe impl Send for LinesSinkGenerator {}
+
 impl LinesSinkGenerator {
     /// Use either a path like `/a/b/c/` to generate files in a directory
     /// or `/a/b/c/file_prefix_` to create files with that prefix.
@@ -114,9 +117,9 @@
     }
 }
 
-impl util::MRSinkGenerator for LinesSinkGenerator {
+impl util::SinkGenerator for LinesSinkGenerator {
     type Sink = LinesWriter<fs::File>;
-    fn new_output(&mut self, p: &String) -> Self::Sink {
+    fn new_output(&self, p: &String) -> Self::Sink {
         let f = fs::OpenOptions::new().write(true).truncate(true).create(true).open(p);
         match f {
             Err(e) => panic!("Couldn't open lines output file {}: {}", p, e),
@@ -128,7 +131,7 @@
 #[cfg(test)]
 mod test {
     use formats::lines;
-    use formats::util::MRSinkGenerator;
+    use formats::util::SinkGenerator;
     use std::fs;
     use std::io::Write;
 
@@ -168,7 +171,7 @@
     #[test]
     fn test_write_lines() {
         let line = String::from("abc def hello world");
-        let mut gen = lines::LinesSinkGenerator::new_to_files();
+        let gen = lines::LinesSinkGenerator::new_to_files();
         let mut f = gen.new_output(&String::from("testdata/writelines_1"));
 
         for _ in 0..10 {
--- a/src/formats/util.rs	Sun Feb 07 15:13:45 2016 +0000
+++ b/src/formats/util.rs	Sun Feb 07 15:14:22 2016 +0000
@@ -9,21 +9,21 @@
 /// records with the key being the position of the current record, starting with
 /// 1. Mainly used as input iterator in the mapping phase, from sources that only
 /// yield values (no keys).
-pub struct RecordIterator<I: Iterator<Item = String>> {
+pub struct PosRecordIterator<I: Iterator<Item = String>> {
     i: I,
     counter: u64,
 }
 
-impl<I: Iterator<Item = String>> RecordIterator<I> {
-    pub fn new(it: I) -> RecordIterator<I> {
-        RecordIterator {
+impl<I: Iterator<Item = String>> PosRecordIterator<I> {
+    pub fn new(it: I) -> PosRecordIterator<I> {
+        PosRecordIterator {
             i: it,
             counter: 0,
         }
     }
 }
 
-impl<I: Iterator<Item = String>> Iterator for RecordIterator<I> {
+impl<I: Iterator<Item = String>> Iterator for PosRecordIterator<I> {
     type Item = Record;
     fn next(&mut self) -> Option<Record> {
         match self.i.next() {
@@ -42,17 +42,17 @@
 /// Another transformation of [string] -> [(string,string)]; however,
 /// this one always reads one value, treats it as key, and another one,
 /// treated as value.
-pub struct KVReadIterator<I: Iterator<Item = String>> {
+pub struct RecordReadIterator<I: Iterator<Item = String>> {
     i: I,
 }
 
-impl<I: Iterator<Item = String>> KVReadIterator<I> {
-    pub fn new(it: I) -> KVReadIterator<I> {
-        KVReadIterator { i: it }
+impl<I: Iterator<Item = String>> RecordReadIterator<I> {
+    pub fn new(it: I) -> RecordReadIterator<I> {
+        RecordReadIterator { i: it }
     }
 }
 
-impl<I: Iterator<Item = String>> Iterator for KVReadIterator<I> {
+impl<I: Iterator<Item = String>> Iterator for RecordReadIterator<I> {
     type Item = Record;
     fn next(&mut self) -> Option<Record> {
         let (k, v) = (self.i.next(), self.i.next());
@@ -69,12 +69,16 @@
     }
 }
 
-/// A type implementing MRSinkGenerator is used at the end of the reducer
+/// A type implementing SinkGenerator is used at the end of the reducer
 /// phase to write the output. Given a name, new() should return a new object
 /// that can be used to write the output of a reduce partition.
 /// Values are always written as a whole to the writer.
-pub trait MRSinkGenerator {
+///
+/// SinkGenerator types are used in general to determine the format of outputs; existing options
+/// are plain text files (LinesSinkGenerator) or length-prefixed binary files (WriteLogGenerator).
+pub trait SinkGenerator: Send + Clone {
     type Sink: io::Write;
-    /// Return a new output.
-    fn new_output(&mut self, name: &String) -> Self::Sink;
+    /// Return a new output identified by name. The existing sink generators use `name` to open
+    /// files with that name (or path).
+    fn new_output(&self, name: &String) -> Self::Sink;
 }
--- a/src/formats/writelog.rs	Sun Feb 07 15:13:45 2016 +0000
+++ b/src/formats/writelog.rs	Sun Feb 07 15:14:22 2016 +0000
@@ -10,7 +10,7 @@
 use std::vec;
 use std::string;
 
-use formats::util::MRSinkGenerator;
+use formats::util::SinkGenerator;
 
 /// A length-prefixed record stream named for the original use case,
 /// which was to write a log of all write operations to a database.
@@ -108,19 +108,22 @@
 /// Like LinesSinkGenerator, opens new WriteLogWriters that write
 /// to files with the name given to new_output(). That name is in general based on the MRParameters
 /// supplied to a mapreduce instance.
+#[derive(Clone)]
 pub struct WriteLogGenerator {
     i: i32,
 }
 
+unsafe impl Send for WriteLogGenerator {}
+
 impl WriteLogGenerator {
     pub fn new() -> WriteLogGenerator {
         WriteLogGenerator { i: 0 }
     }
 }
 
-impl MRSinkGenerator for WriteLogGenerator {
+impl SinkGenerator for WriteLogGenerator {
     type Sink = WriteLogWriter<fs::File>;
-    fn new_output(&mut self, path: &String) -> Self::Sink {
+    fn new_output(&self, path: &String) -> Self::Sink {
         let writer = WriteLogWriter::<fs::File>::new_to_file(path, false);
         match writer {
             Err(e) => panic!("Could not open {}: {}", path, e),
@@ -138,7 +141,7 @@
 }
 
 impl WriteLogReader {
-    pub fn new(src: Box<Read>) -> WriteLogReader {
+    pub fn new(src: Box<Read+Send>) -> WriteLogReader {
         WriteLogReader {
             src: src,
             records_read: 0,
--- a/src/lib.rs	Sun Feb 07 15:13:45 2016 +0000
+++ b/src/lib.rs	Sun Feb 07 15:14:22 2016 +0000
@@ -3,6 +3,7 @@
 //!
 
 pub mod closure_mr;
+pub mod controller;
 pub mod formats;
 pub mod map;
 pub mod mapreducer;
--- a/src/map.rs	Sun Feb 07 15:13:45 2016 +0000
+++ b/src/map.rs	Sun Feb 07 15:14:22 2016 +0000
@@ -7,7 +7,7 @@
 use std::fmt;
 use std::io::Write;
 
-use formats::util::MRSinkGenerator;
+use formats::util::SinkGenerator;
 use mapreducer::MapReducer;
 use parameters::MRParameters;
 use record_types::{Record, MEmitter};
@@ -16,7 +16,7 @@
 /// and intermediary input and output forms.
 /// Mapper threads run on this. Every mapper thread has one MapPartition
 /// instance per input chunk.
-struct MapPartition<MR: MapReducer, MapInput: Iterator<Item = Record>, SinkGen: MRSinkGenerator> {
+pub struct MapPartition<MR: MapReducer, MapInput: Iterator<Item = Record>, SinkGen: SinkGenerator> {
     mr: MR,
     params: MRParameters,
     input: MapInput,
@@ -25,7 +25,7 @@
     sorted_output: BTreeMap<String, Vec<String>>,
 }
 
-impl<MR: MapReducer, MapInput: Iterator<Item=Record>, SinkGen: MRSinkGenerator> MapPartition<MR, MapInput, SinkGen> {
+impl<MR: MapReducer, MapInput: Iterator<Item=Record>, SinkGen: SinkGenerator> MapPartition<MR, MapInput, SinkGen> {
     pub fn _new(params: MRParameters,
                 input: MapInput,
                 mr: MR,
@@ -149,7 +149,7 @@
 #[cfg(test)]
 mod tests {
     use closure_mr::ClosureMapReducer;
-    use formats::util::RecordIterator;
+    use formats::util::PosRecordIterator;
     use formats::lines::LinesSinkGenerator;
     use map::MapPartition;
     use record_types::{MEmitter, REmitter, Record, MultiRecord};
@@ -179,7 +179,7 @@
                                    .iter()
                                    .map(move |s| String::from(*s))
                                    .collect();
-        let ri: RecordIterator<_> = RecordIterator::new(inp.into_iter());
+        let ri: PosRecordIterator<_> = PosRecordIterator::new(inp.into_iter());
         ri.collect()
     }
 
--- a/src/reduce.rs	Sun Feb 07 15:13:45 2016 +0000
+++ b/src/reduce.rs	Sun Feb 07 15:14:22 2016 +0000
@@ -1,9 +1,9 @@
 //! Implements the Reduce phase.
 //!
 
+use std::io;
 use std::iter::Peekable;
 
-use formats::util::MRSinkGenerator;
 use mapreducer::MapReducer;
 use parameters::MRParameters;
 use record_types::{Record, MultiRecord, REmitter};
@@ -11,26 +11,26 @@
 
 pub struct ReducePartition<MR: MapReducer,
                            InputIt: Iterator<Item = Record>,
-                           SinkGen: MRSinkGenerator>
+                           Sink: io::Write>
 {
     mr: MR,
     params: MRParameters,
     // Maybe we want to genericize this to an Iterator<Item=Read> or so? This defers opening
     // the files to the reduce shard itself.
     srcs: Vec<InputIt>,
-    dstfilegen: SinkGen,
+    dstfile: Sink,
 }
 
-impl<MR: MapReducer, InputIt: Iterator<Item=Record>, SinkGen: MRSinkGenerator> ReducePartition<MR, InputIt, SinkGen> {
+impl<MR: MapReducer, InputIt: Iterator<Item=Record>, Sink: io::Write> ReducePartition<MR, InputIt, Sink> {
 /// Create a new Reduce partition for the given MR; source and destination I/O.
 /// mr is the map/reduce functions.
 /// params is generic MR parameters as well as some applying directly to this reduce partition.
 /// srcs is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's
 /// outputs.
-/// dstfiles is a SinkGen (as known from the mapping phase) that is used to create the output
+/// dstfiles is a Sink (as known from the mapping phase) that is used to create the output
 /// file (there is one output file per reduce partition, currently).
-    pub fn new(mr: MR, params: MRParameters, srcs: Vec<InputIt>, dstfiles: SinkGen) -> ReducePartition<MR, InputIt, SinkGen> {
-        ReducePartition { mr: mr, params: params, srcs: srcs, dstfilegen: dstfiles }
+    pub fn new(mr: MR, params: MRParameters, srcs: Vec<InputIt>, outp: Sink) -> ReducePartition<MR, InputIt, Sink> {
+        ReducePartition { mr: mr, params: params, srcs: srcs, dstfile: outp}
     }
 
 /// Run the Reduce partition.
@@ -44,26 +44,15 @@
         self.reduce(RecordsToMultiRecords::new(ShardMergeIterator::build(&mut it), params))
     }
 
-    fn get_output_name(&self) -> String {
-        use std::fmt;
-        let mut name = String::new();
-        name.push_str(&self.params.reduce_output_shard_prefix[..]);
-        name.push_str(&fmt::format(format_args!("{}", self.params.shard_id))[..]);
-        name
-    }
-
     fn reduce<RecIt: Iterator<Item=Record>>(mut self, inp: RecordsToMultiRecords<RecIt>) {
         use std::io::Write;
 
-        let name = self.get_output_name();
-        let mut outp = self.dstfilegen.new_output(&name);
-
         for multirec in inp {
             let mut emitter = REmitter::new();
             self.mr.reduce(&mut emitter, multirec);
 
             for result in emitter._get().into_iter() {
-                match outp.write(result.as_bytes()) {
+                match self.dstfile.write(result.as_bytes()) {
                     Err(e) => println!("WARN: While reducing shard #{}: {}", self.params.shard_id, e),
                     Ok(_) => ()
                 }
@@ -131,6 +120,7 @@
 
     use closure_mr::ClosureMapReducer;
     use formats::lines::LinesSinkGenerator;
+    use formats::util::SinkGenerator;
     use parameters::MRParameters;
     use record_types::*;
 
@@ -207,7 +197,7 @@
         let srcs = vec![get_records().into_iter()];
         let dst = LinesSinkGenerator::new_to_files();
 
-        let r = ReducePartition::new(mr, params, srcs, dst);
+        let r = ReducePartition::new(mr, params, srcs, dst.new_output(&String::from("0")));
         r._run();
     }
 }