changeset 108:27afb9528618

Move SinkGenerator and utils around; make file name generation saner
author Lewin Bormann <lbo@spheniscida.de>
date Wed, 15 Jun 2016 19:55:04 +0200
parents 500571302783
children d676299a0a2d
files src/controller.rs src/formats/lines.rs src/formats/util.rs src/formats/writelog.rs src/phases/map.rs src/phases/mod.rs src/phases/output.rs src/shard_merge.rs
diffstat 8 files changed, 60 insertions(+), 62 deletions(-) [+]
line wrap: on
line diff
--- a/src/controller.rs	Wed Jun 15 19:27:29 2016 +0200
+++ b/src/controller.rs	Wed Jun 15 19:55:04 2016 +0200
@@ -1,7 +1,7 @@
 //! Controls the execution of a mapreduce instance.
 
-use formats::util::{SinkGenerator, RecordReadIterator};
-use formats::writelog::{WriteLogGenerator, WriteLogReader};
+use phases::output::{SinkGenerator, open_reduce_inputs, get_reduce_output_name};
+use formats::writelog::WriteLogGenerator;
 use input_cache::InputCache;
 use phases::map::MapPartition;
 use mapreducer::{Mapper, Reducer, Sharder};
@@ -24,30 +24,6 @@
     map_partitions_run: usize,
 }
 
-/// Calculates the name of a reduce output shard from the parameters.
-fn get_reduce_output_name(params: &MRParameters) -> String {
-    use std::fmt;
-    let mut name = String::new();
-    name.push_str(&params.reduce_output_shard_prefix[..]);
-    name.push_str(&fmt::format(format_args!("{}", params.shard_id))[..]);
-    name
-}
-
-fn open_reduce_inputs(params: &MRParameters,
-                      partitions: usize,
-                      shard: usize)
-                      -> Vec<RecordReadIterator<WriteLogReader>> {
-    use std::fmt;
-    let mut inputs = Vec::new();
-
-    for part in 0..partitions {
-        let name = fmt::format(format_args!("{}{}.{}", params.map_output_location, part, shard));
-        let wlg_reader = WriteLogReader::new_from_file(&name).unwrap();
-        inputs.push(RecordReadIterator::new(wlg_reader));
-    }
-    inputs
-}
-
 
 impl<M: Mapper, R: Reducer, S: Sharder> MRController<M, R, S> {
     /// Create a new mapreduce instance and execute it immediately.
@@ -120,7 +96,6 @@
     }
 
     fn read_map_input<In: Iterator<Item = Record>>(it: &mut In, approx_bytes: usize) -> InputCache {
-
         let inp_cache = InputCache::from_iter(8192, approx_bytes, it);
         inp_cache
     }
@@ -137,7 +112,7 @@
                 let output = outp.clone();
 
                 scope.execute(move || {
-                    let inputs = open_reduce_inputs(&params, map_partitions, i);
+                    let inputs = open_reduce_inputs(&params.map_output_location, map_partitions, i);
                     let output = output.new_output(&get_reduce_output_name(&params));
                     let reduce_part = ReducePartition::new(r, params, inputs, output);
                     reduce_part._run();
--- a/src/formats/lines.rs	Wed Jun 15 19:27:29 2016 +0200
+++ b/src/formats/lines.rs	Wed Jun 15 19:55:04 2016 +0200
@@ -3,7 +3,7 @@
 //! using the RecordIterator from formats::util, the necessary key/value
 //! iterator can be implemented.
 
-use formats::util;
+use phases::output::SinkGenerator;
 use std::fs;
 use std::io;
 use std::io::{Read, BufRead};
@@ -102,10 +102,7 @@
 /// and creates text files based on it.
 #[allow(dead_code)]
 #[derive(Clone)]
-pub struct LinesSinkGenerator {
-    // bogus field
-    i: i32,
-}
+pub struct LinesSinkGenerator;
 
 unsafe impl Send for LinesSinkGenerator {}
 
@@ -113,11 +110,11 @@
     /// Use either a path like `/a/b/c/` to generate files in a directory
     /// or `/a/b/c/file_prefix_` to create files with that prefix.
     pub fn new_to_files() -> LinesSinkGenerator {
-        LinesSinkGenerator { i: 0 }
+        LinesSinkGenerator { }
     }
 }
 
-impl util::SinkGenerator for LinesSinkGenerator {
+impl SinkGenerator for LinesSinkGenerator {
     type Sink = LinesWriter<fs::File>;
     fn new_output(&self, p: &String) -> Self::Sink {
         let f = fs::OpenOptions::new().write(true).truncate(true).create(true).open(p);
--- a/src/formats/util.rs	Wed Jun 15 19:27:29 2016 +0200
+++ b/src/formats/util.rs	Wed Jun 15 19:55:04 2016 +0200
@@ -3,7 +3,6 @@
 
 use record_types::Record;
 use std::fmt;
-use std::io;
 
 /// Transforms an iterator<string> into an iterator<Record>. It yields
 /// records with the key being the position of the current record, starting with
@@ -68,17 +67,3 @@
         }
     }
 }
-
-/// A type implementing SinkGenerator is used at the end of the reducer
-/// phase to write the output. Given a name, new() should return a new object
-/// that can be used to write the output of a reduce partition.
-/// Values are always written as a whole to the writer.
-///
-/// SinkGenerator types are used in general to determine the format of outputs; existing options
-/// are plain text files (LinesSinkGenerator) or length-prefixed binary files (WriteLogGenerator).
-pub trait SinkGenerator: Send + Clone {
-    type Sink: io::Write;
-    /// Return a new output identified by name. The existing sink generators use `name` to open
-    /// files with that name (or path).
-    fn new_output(&self, name: &String) -> Self::Sink;
-}
--- a/src/formats/writelog.rs	Wed Jun 15 19:27:29 2016 +0200
+++ b/src/formats/writelog.rs	Wed Jun 15 19:55:04 2016 +0200
@@ -10,7 +10,7 @@
 use std::vec;
 use std::string;
 
-use formats::util::SinkGenerator;
+use phases::output::SinkGenerator;
 
 /// A length-prefixed record stream named for the original use case,
 /// which was to write a log of all write operations to a database.
--- a/src/phases/map.rs	Wed Jun 15 19:27:29 2016 +0200
+++ b/src/phases/map.rs	Wed Jun 15 19:55:04 2016 +0200
@@ -4,10 +4,9 @@
 #![allow(dead_code)]
 
 use std::collections::BTreeMap;
-use std::fmt;
 use std::io::Write;
 
-use formats::util::SinkGenerator;
+use phases::output::SinkGenerator;
 use mapreducer::{Mapper, Sharder};
 use parameters::MRParameters;
 use record_types::{Record, MEmitter};
@@ -104,11 +103,9 @@
         let mut outputs = Vec::new();
 
         for i in 0..self.params.reducers {
-            let out = self.sink.new_output(
-                &fmt::format(format_args!("{}{}.{}",
-                                          self.params.map_output_location,
-                                          self.params.shard_id,
-                                          i)));
+            let out = self.sink.new_map_output(&self.params.map_output_location,
+                                               self.params.shard_id,
+                                               i);
             outputs.push(out);
         }
         assert_eq!(outputs.len(), self.params.reducers);
--- a/src/phases/mod.rs	Wed Jun 15 19:27:29 2016 +0200
+++ b/src/phases/mod.rs	Wed Jun 15 19:55:04 2016 +0200
@@ -1,2 +1,4 @@
 pub mod map;
 pub mod reduce;
+
+pub mod output;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/phases/output.rs	Wed Jun 15 19:55:04 2016 +0200
@@ -0,0 +1,46 @@
+use std::io;
+use formats::util::RecordReadIterator;
+use formats::writelog::WriteLogReader;
+use parameters::MRParameters;
+
+fn map_output_name(base: &String, mapper: usize, shard: usize) -> String {
+    format!("{}-{}.{}", base, mapper, shard)
+}
+
+/// A type implementing SinkGenerator is used at the end of the reducer
+/// phase to write the output. Given a name, new() should return a new object
+/// that can be used to write the output of a reduce partition.
+/// Values are always written as a whole to the writer.
+///
+/// SinkGenerator types are used in general to determine the format of outputs; existing options
+/// are plain text files (LinesSinkGenerator) or length-prefixed binary files (WriteLogGenerator).
+pub trait SinkGenerator: Send + Clone {
+    type Sink: io::Write;
+    /// Return a new intermediary file handle destined for reduce shard `shard` and requested by
+    /// map shard `mapper`.
+    fn new_map_output(&self, location: &String, mapper: usize, shard: usize) -> Self::Sink {
+        self.new_output(&map_output_name(location, mapper, shard))
+    }
+
+    /// Return a new file handle for `location`.
+    fn new_output(&self, location: &String) -> Self::Sink;
+}
+
+pub fn open_reduce_inputs(location: &String,
+                      partitions: usize,
+                      shard: usize)
+                      -> Vec<RecordReadIterator<WriteLogReader>> {
+    let mut inputs = Vec::new();
+
+    for part in 0..partitions {
+        let name = map_output_name(location, part, shard);
+        let wlg_reader = WriteLogReader::new_from_file(&name).unwrap();
+        inputs.push(RecordReadIterator::new(wlg_reader));
+    }
+    inputs
+}
+
+/// Calculates the name of a reduce output shard from the parameters.
+pub fn get_reduce_output_name(params: &MRParameters) -> String {
+    format!("{}{}", params.reduce_output_shard_prefix, params.shard_id)
+}
--- a/src/shard_merge.rs	Wed Jun 15 19:27:29 2016 +0200
+++ b/src/shard_merge.rs	Wed Jun 15 19:55:04 2016 +0200
@@ -69,10 +69,6 @@
             right: Box::new(iter::empty()),
             left_peeked: None,
             right_peeked: None,
-            // BUG: This should not be altered when used with Map phase output.
-            // The map phase uses a BTreeMap in order to sort the output, and the BTM
-            // only uses the standard Ord implementation for strings. Should the requirements
-            // change, we can work around that.
             comparer: sort::default_generic_compare,
         }
     }