Mercurial > lbo > hg > localmr
changeset 85:06e3c46db370
Update existing sink generators to not take a base path.
The base path is supplied in the MRParameters struct given to every MapReduce instance.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 07 Feb 2016 10:47:21 +0000 |
parents | 4f24fe68d67d |
children | 4a111475534f |
files | src/formats/lines.rs src/formats/writelog.rs src/map.rs src/reduce.rs |
diffstat | 4 files changed, 31 insertions(+), 29 deletions(-) [+] |
line wrap: on
line diff
--- a/src/formats/lines.rs Sun Feb 07 10:24:57 2016 +0000 +++ b/src/formats/lines.rs Sun Feb 07 10:47:21 2016 +0000 @@ -100,26 +100,26 @@ /// An MRSinkGenerator type that uses a simple path as base /// and creates text files based on it. +#[allow(dead_code)] pub struct LinesSinkGenerator { - basepath: String, + // bogus field + i: i32, } impl LinesSinkGenerator { /// Use either a path like `/a/b/c/` to generate files in a directory /// or `/a/b/c/file_prefix_` to create files with that prefix. - pub fn new_to_files(path: &String) -> LinesSinkGenerator { - LinesSinkGenerator { basepath: path.clone() } + pub fn new_to_files() -> LinesSinkGenerator { + LinesSinkGenerator { i: 0 } } } impl util::MRSinkGenerator for LinesSinkGenerator { type Sink = LinesWriter<fs::File>; - fn new_output(&mut self, name: &String) -> Self::Sink { - let mut path = self.basepath.clone(); - path.push_str(&name[..]); - let f = fs::OpenOptions::new().write(true).truncate(true).create(true).open(path); + fn new_output(&mut self, p: &String) -> Self::Sink { + let f = fs::OpenOptions::new().write(true).truncate(true).create(true).open(p); match f { - Err(e) => panic!("Couldn't open lines output file {}: {}", name, e), + Err(e) => panic!("Couldn't open lines output file {}: {}", p, e), Ok(f) => return LinesWriter { file: f }, } } @@ -168,8 +168,8 @@ #[test] fn test_write_lines() { let line = String::from("abc def hello world"); - let mut gen = lines::LinesSinkGenerator::new_to_files(&String::from("test_output_")); - let mut f = gen.new_output(&String::from("1")); + let mut gen = lines::LinesSinkGenerator::new_to_files(); + let mut f = gen.new_output(&String::from("testdata/writelines_1")); for _ in 0..10 { let _ = f.write(line.as_bytes()); @@ -178,13 +178,13 @@ { assert_eq!(fs::OpenOptions::new() .read(true) - .open("test_output_1") + .open("testdata/writelines_1") .unwrap() .metadata() .unwrap() .len(), 200); } - let _ = fs::remove_file("test_output_1"); + let _ = fs::remove_file("testdata/writelines_1"); } }
--- a/src/formats/writelog.rs Sun Feb 07 10:24:57 2016 +0000 +++ b/src/formats/writelog.rs Sun Feb 07 10:47:21 2016 +0000 @@ -105,25 +105,23 @@ } } -/// Like LinesSinkGenerator, opens new WriteLog sinks based on a base path. -/// The framework-supplied suffices are appended directly to the base path given -/// to new(). (E.g. base = `/a/b/c`, input = `_shard1` => `/a/b/c_shard1`) +/// Like LinesSinkGenerator, opens new WriteLogWriters that write +/// to files with the name given to new_output(). That name is in general based on the MRParameters +/// supplied to a mapreduce instance. pub struct WriteLogGenerator { - base: String, + i: i32, } impl WriteLogGenerator { - pub fn new(base: &String) -> WriteLogGenerator { - WriteLogGenerator { base: base.clone() } + pub fn new() -> WriteLogGenerator { + WriteLogGenerator { i: 0 } } } impl MRSinkGenerator for WriteLogGenerator { type Sink = WriteLogWriter<fs::File>; - fn new_output(&mut self, suffix: &String) -> Self::Sink { - let mut path = self.base.clone(); - path.push_str(&suffix[..]); - let writer = WriteLogWriter::<fs::File>::new_to_file(&path, false); + fn new_output(&mut self, path: &String) -> Self::Sink { + let writer = WriteLogWriter::<fs::File>::new_to_file(path, false); match writer { Err(e) => panic!("Could not open {}: {}", path, e), Ok(w) => w,
--- a/src/map.rs Sun Feb 07 10:24:57 2016 +0000 +++ b/src/map.rs Sun Feb 07 10:47:21 2016 +0000 @@ -185,13 +185,13 @@ fn get_output() -> LinesSinkGenerator { - LinesSinkGenerator::new_to_files(&String::from("./")) + LinesSinkGenerator::new_to_files() } #[test] fn test_map_partition() { - use std::fmt::format; - use std::fs; + // use std::fmt::format; + // use std::fs; let reducers = 3; let mp = MapPartition::_new(MRParameters::new() @@ -203,8 +203,8 @@ get_output()); mp._run(); - for i in 0..reducers { - let filename = format(format_args!("test_map_mapout_0.{}", i)); + for _ in 0..reducers { + // let filename = format(format_args!("testdata/map_im_{}", i)); // let _ = fs::remove_file(filename); } }
--- a/src/reduce.rs Sun Feb 07 10:24:57 2016 +0000 +++ b/src/reduce.rs Sun Feb 07 10:47:21 2016 +0000 @@ -199,9 +199,13 @@ #[test] fn test_reduce() { let mr = ClosureMapReducer::new(fake_mapper, test_reducer); - let params = MRParameters::new().set_shard_id(42).set_reduce_group_opts(1, true); + let params = MRParameters::new() + .set_shard_id(42) + .set_reduce_group_opts(1, true) + .set_file_locations(String::from("testdata/map_intermed_"), + String::from("testdata/result_")); let srcs = vec![get_records().into_iter()]; - let dst = LinesSinkGenerator::new_to_files(&String::from("testdata/reduce_out_")); + let dst = LinesSinkGenerator::new_to_files(); let r = ReducePartition::new(mr, params, srcs, dst); r._run();