changeset 84:4f24fe68d67d

Fix file location parameters.
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 07 Feb 2016 10:24:57 +0000
parents 3f5766b3a0f6
children 06e3c46db370
files src/map.rs src/parameters.rs
diffstat 2 files changed, 26 insertions(+), 10 deletions(-) [+]
line wrap: on
line diff
--- a/src/map.rs	Sun Feb 07 10:09:17 2016 +0000
+++ b/src/map.rs	Sun Feb 07 10:24:57 2016 +0000
@@ -95,9 +95,10 @@
         let mut outputs = Vec::new();
 
         for i in 0..self.params.reducers {
-            let out = self.sink.new_output(&fmt::format(format_args!("mapout_{}.{}",
-                                                                     self.params.shard_id,
-                                                                     i)));
+            let out = self.sink.new_output(&fmt::format(format_args!("{}{}.{}",
+                                                                     self.params.map_output_location,
+                                                                     i,
+                                                                     self.params.shard_id)));
             outputs.push(out);
         }
         assert_eq!(outputs.len(), self.params.reducers);
@@ -184,7 +185,7 @@
 
 
     fn get_output() -> LinesSinkGenerator {
-        LinesSinkGenerator::new_to_files(&String::from("test_map_"))
+        LinesSinkGenerator::new_to_files(&String::from("./"))
     }
 
     #[test]
@@ -193,7 +194,10 @@
         use std::fs;
 
         let reducers = 3;
-        let mp = MapPartition::_new(MRParameters::new().set_concurrency(4, reducers),
+        let mp = MapPartition::_new(MRParameters::new()
+                                        .set_concurrency(4, reducers)
+                                        .set_file_locations(String::from("testdata/map_im_"),
+                                                            String::from("testdata/result_")),
                                     get_input().into_iter(),
                                     get_mr(),
                                     get_output());
@@ -201,7 +205,7 @@
 
         for i in 0..reducers {
             let filename = format(format_args!("test_map_mapout_0.{}", i));
-            let _ = fs::remove_file(filename);
+            // let _ = fs::remove_file(filename);
         }
     }
 }
--- a/src/parameters.rs	Sun Feb 07 10:09:17 2016 +0000
+++ b/src/parameters.rs	Sun Feb 07 10:24:57 2016 +0000
@@ -13,6 +13,7 @@
     pub reduce_group_prealloc_size: usize,
     pub reduce_group_insensitive: bool,
 
+    pub map_output_location: String,
     pub reduce_output_shard_prefix: String,
 
     // Internal parameters
@@ -28,6 +29,7 @@
             map_partition_size: 100 * 1024 * 1024,
             reduce_group_prealloc_size: 1,
             reduce_group_insensitive: false,
+            map_output_location: String::from("map_intermediate_"),
             reduce_output_shard_prefix: String::from("output_"),
             shard_id: 0,
         }
@@ -85,10 +87,20 @@
         self
     }
 
-    /// Prefix for output files produced by the reduce phase.
-    /// Default: output_ (the id of the reduce shard will be appended to that string)
-    pub fn set_out_name(mut self, prefix: String) -> MRParameters {
-        self.reduce_output_shard_prefix = prefix;
+    /// map_out_prefix: A location that can be used for intermediate map outputs. For example,
+    /// '/home/user/processing/tmp/'. (Note: Make sure that the location provides enough disk
+    /// space). Default: './output_' (will lead to ./output_0, ./output_1 etc.)
+    ///
+    /// reduce_out_prefix: Path prefix for output files produced by the reduce phase, for example
+    /// '/home/user/processing/output_'. (Note: Make sure that the location provides enough
+    /// disk space). Default: './map_intermediate_' (will lead to ./map_intermediate_0.0 etc.)
+    ///
+    pub fn set_file_locations(mut self,
+                              map_out_prefix: String,
+                              reduce_out_prefix: String)
+                              -> MRParameters {
+        self.map_output_location = map_out_prefix;
+        self.reduce_output_shard_prefix = reduce_out_prefix;
         self
     }