changeset 85:06e3c46db370

Update existing sink generators to not take a base path. The base path is supplied in the MRParameters struct given to every MapReduce instance.
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 07 Feb 2016 10:47:21 +0000
parents 4f24fe68d67d
children 4a111475534f
files src/formats/lines.rs src/formats/writelog.rs src/map.rs src/reduce.rs
diffstat 4 files changed, 31 insertions(+), 29 deletions(-) [+]
line wrap: on
line diff
--- a/src/formats/lines.rs	Sun Feb 07 10:24:57 2016 +0000
+++ b/src/formats/lines.rs	Sun Feb 07 10:47:21 2016 +0000
@@ -100,26 +100,26 @@
 
 /// An MRSinkGenerator type that uses a simple path as base
 /// and creates text files based on it.
+#[allow(dead_code)]
 pub struct LinesSinkGenerator {
-    basepath: String,
+    // bogus field
+    i: i32,
 }
 
 impl LinesSinkGenerator {
     /// Use either a path like `/a/b/c/` to generate files in a directory
     /// or `/a/b/c/file_prefix_` to create files with that prefix.
-    pub fn new_to_files(path: &String) -> LinesSinkGenerator {
-        LinesSinkGenerator { basepath: path.clone() }
+    pub fn new_to_files() -> LinesSinkGenerator {
+        LinesSinkGenerator { i: 0 }
     }
 }
 
 impl util::MRSinkGenerator for LinesSinkGenerator {
     type Sink = LinesWriter<fs::File>;
-    fn new_output(&mut self, name: &String) -> Self::Sink {
-        let mut path = self.basepath.clone();
-        path.push_str(&name[..]);
-        let f = fs::OpenOptions::new().write(true).truncate(true).create(true).open(path);
+    fn new_output(&mut self, p: &String) -> Self::Sink {
+        let f = fs::OpenOptions::new().write(true).truncate(true).create(true).open(p);
         match f {
-            Err(e) => panic!("Couldn't open lines output file {}: {}", name, e),
+            Err(e) => panic!("Couldn't open lines output file {}: {}", p, e),
             Ok(f) => return LinesWriter { file: f },
         }
     }
@@ -168,8 +168,8 @@
     #[test]
     fn test_write_lines() {
         let line = String::from("abc def hello world");
-        let mut gen = lines::LinesSinkGenerator::new_to_files(&String::from("test_output_"));
-        let mut f = gen.new_output(&String::from("1"));
+        let mut gen = lines::LinesSinkGenerator::new_to_files();
+        let mut f = gen.new_output(&String::from("testdata/writelines_1"));
 
         for _ in 0..10 {
             let _ = f.write(line.as_bytes());
@@ -178,13 +178,13 @@
         {
             assert_eq!(fs::OpenOptions::new()
                            .read(true)
-                           .open("test_output_1")
+                           .open("testdata/writelines_1")
                            .unwrap()
                            .metadata()
                            .unwrap()
                            .len(),
                        200);
         }
-        let _ = fs::remove_file("test_output_1");
+        let _ = fs::remove_file("testdata/writelines_1");
     }
 }
--- a/src/formats/writelog.rs	Sun Feb 07 10:24:57 2016 +0000
+++ b/src/formats/writelog.rs	Sun Feb 07 10:47:21 2016 +0000
@@ -105,25 +105,23 @@
     }
 }
 
-/// Like LinesSinkGenerator, opens new WriteLog sinks based on a base path.
-/// The framework-supplied suffices are appended directly to the base path given
-/// to new(). (E.g. base = `/a/b/c`, input = `_shard1` => `/a/b/c_shard1`)
+/// Like LinesSinkGenerator, opens new WriteLogWriters that write
+/// to files with the name given to new_output(). That name is in general based on the MRParameters
+/// supplied to a mapreduce instance.
 pub struct WriteLogGenerator {
-    base: String,
+    i: i32,
 }
 
 impl WriteLogGenerator {
-    pub fn new(base: &String) -> WriteLogGenerator {
-        WriteLogGenerator { base: base.clone() }
+    pub fn new() -> WriteLogGenerator {
+        WriteLogGenerator { i: 0 }
     }
 }
 
 impl MRSinkGenerator for WriteLogGenerator {
     type Sink = WriteLogWriter<fs::File>;
-    fn new_output(&mut self, suffix: &String) -> Self::Sink {
-        let mut path = self.base.clone();
-        path.push_str(&suffix[..]);
-        let writer = WriteLogWriter::<fs::File>::new_to_file(&path, false);
+    fn new_output(&mut self, path: &String) -> Self::Sink {
+        let writer = WriteLogWriter::<fs::File>::new_to_file(path, false);
         match writer {
             Err(e) => panic!("Could not open {}: {}", path, e),
             Ok(w) => w,
--- a/src/map.rs	Sun Feb 07 10:24:57 2016 +0000
+++ b/src/map.rs	Sun Feb 07 10:47:21 2016 +0000
@@ -185,13 +185,13 @@
 
 
     fn get_output() -> LinesSinkGenerator {
-        LinesSinkGenerator::new_to_files(&String::from("./"))
+        LinesSinkGenerator::new_to_files()
     }
 
     #[test]
     fn test_map_partition() {
-        use std::fmt::format;
-        use std::fs;
+        // use std::fmt::format;
+        // use std::fs;
 
         let reducers = 3;
         let mp = MapPartition::_new(MRParameters::new()
@@ -203,8 +203,8 @@
                                     get_output());
         mp._run();
 
-        for i in 0..reducers {
-            let filename = format(format_args!("test_map_mapout_0.{}", i));
+        for _ in 0..reducers {
+            // let filename = format(format_args!("testdata/map_im_{}", i));
             // let _ = fs::remove_file(filename);
         }
     }
--- a/src/reduce.rs	Sun Feb 07 10:24:57 2016 +0000
+++ b/src/reduce.rs	Sun Feb 07 10:47:21 2016 +0000
@@ -199,9 +199,13 @@
     #[test]
     fn test_reduce() {
         let mr = ClosureMapReducer::new(fake_mapper, test_reducer);
-        let params = MRParameters::new().set_shard_id(42).set_reduce_group_opts(1, true);
+        let params = MRParameters::new()
+                         .set_shard_id(42)
+                         .set_reduce_group_opts(1, true)
+                         .set_file_locations(String::from("testdata/map_intermed_"),
+                                             String::from("testdata/result_"));
         let srcs = vec![get_records().into_iter()];
-        let dst = LinesSinkGenerator::new_to_files(&String::from("testdata/reduce_out_"));
+        let dst = LinesSinkGenerator::new_to_files();
 
         let r = ReducePartition::new(mr, params, srcs, dst);
         r._run();