Mercurial > lbo > hg > localmr
changeset 84:4f24fe68d67d
Fix file location parameters.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 07 Feb 2016 10:24:57 +0000 |
parents | 3f5766b3a0f6 |
children | 06e3c46db370 |
files | src/map.rs src/parameters.rs |
diffstat | 2 files changed, 26 insertions(+), 10 deletions(-) [+] |
line wrap: on
line diff
--- a/src/map.rs Sun Feb 07 10:09:17 2016 +0000 +++ b/src/map.rs Sun Feb 07 10:24:57 2016 +0000 @@ -95,9 +95,10 @@ let mut outputs = Vec::new(); for i in 0..self.params.reducers { - let out = self.sink.new_output(&fmt::format(format_args!("mapout_{}.{}", - self.params.shard_id, - i))); + let out = self.sink.new_output(&fmt::format(format_args!("{}{}.{}", + self.params.map_output_location, + i, + self.params.shard_id))); outputs.push(out); } assert_eq!(outputs.len(), self.params.reducers); @@ -184,7 +185,7 @@ fn get_output() -> LinesSinkGenerator { - LinesSinkGenerator::new_to_files(&String::from("test_map_")) + LinesSinkGenerator::new_to_files(&String::from("./")) } #[test] @@ -193,7 +194,10 @@ use std::fs; let reducers = 3; - let mp = MapPartition::_new(MRParameters::new().set_concurrency(4, reducers), + let mp = MapPartition::_new(MRParameters::new() + .set_concurrency(4, reducers) + .set_file_locations(String::from("testdata/map_im_"), + String::from("testdata/result_")), get_input().into_iter(), get_mr(), get_output()); @@ -201,7 +205,7 @@ for i in 0..reducers { let filename = format(format_args!("test_map_mapout_0.{}", i)); - let _ = fs::remove_file(filename); + // let _ = fs::remove_file(filename); } } }
--- a/src/parameters.rs Sun Feb 07 10:09:17 2016 +0000 +++ b/src/parameters.rs Sun Feb 07 10:24:57 2016 +0000 @@ -13,6 +13,7 @@ pub reduce_group_prealloc_size: usize, pub reduce_group_insensitive: bool, + pub map_output_location: String, pub reduce_output_shard_prefix: String, // Internal parameters @@ -28,6 +29,7 @@ map_partition_size: 100 * 1024 * 1024, reduce_group_prealloc_size: 1, reduce_group_insensitive: false, + map_output_location: String::from("map_intermediate_"), reduce_output_shard_prefix: String::from("output_"), shard_id: 0, } @@ -85,10 +87,20 @@ self } - /// Prefix for output files produced by the reduce phase. - /// Default: output_ (the id of the reduce shard will be appended to that string) - pub fn set_out_name(mut self, prefix: String) -> MRParameters { - self.reduce_output_shard_prefix = prefix; + /// map_out_prefix: A location that can be used for intermediate map outputs. For example, + /// '/home/user/processing/tmp/'. (Note: Make sure that the location provides enough disk + /// space). Default: './output_' (will lead to ./output_0, ./output_1 etc.) + /// + /// reduce_out_prefix: Path prefix for output files produced by the reduce phase, for example + /// '/home/user/processing/output_'. (Note: Make sure that the location provides enough + /// disk space). Default: './map_intermediate_' (will lead to ./map_intermediate_0.0 etc.) + /// + pub fn set_file_locations(mut self, + map_out_prefix: String, + reduce_out_prefix: String) + -> MRParameters { + self.map_output_location = map_out_prefix; + self.reduce_output_shard_prefix = reduce_out_prefix; self }