Mercurial > lbo > hg > localmr
changeset 82:0557a5c0d28d
Add test for ReducePartition
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 07 Feb 2016 09:53:03 +0000 |
parents | 36ca534fa940 |
children | 3f5766b3a0f6 |
files | src/reduce.rs |
diffstat | 1 files changed, 35 insertions(+), 1 deletions(-) [+] |
line wrap: on
line diff
--- a/src/reduce.rs Sun Feb 07 09:52:50 2016 +0000 +++ b/src/reduce.rs Sun Feb 07 09:53:03 2016 +0000 @@ -63,7 +63,10 @@ self.mr.reduce(&mut emitter, multirec); for result in emitter._get().into_iter() { - outp.write(result.as_bytes()); + match outp.write(result.as_bytes()) { + Err(e) => println!("WARN: While reducing shard #{}: {}", self.params.shard_id, e), + Ok(_) => () + } } } } @@ -125,6 +128,9 @@ #[cfg(test)] mod tests { use super::*; + + use closure_mr::ClosureMapReducer; + use formats::lines::LinesSinkGenerator; use parameters::MRParameters; use record_types::*; @@ -172,4 +178,32 @@ i += 1; } } + + fn test_reducer(e: &mut REmitter, recs: MultiRecord) { + use std::fmt::Write; + use std::borrow::Borrow; + + let mut out = String::with_capacity(32); + let _ = out.write_fmt(format_args!("{}:", recs.key())); + + for val in recs { + let _ = out.write_str(" "); + let _ = out.write_str(val.borrow()); + } + + e.emit(out); + } + + fn fake_mapper(_: &mut MEmitter, _: Record) {} + + #[test] + fn test_reduce() { + let mr = ClosureMapReducer::new(fake_mapper, test_reducer); + let params = MRParameters::new().set_shard_id(42).set_reduce_group_opts(1, true); + let srcs = vec![get_records().into_iter()]; + let dst = LinesSinkGenerator::new_to_files(&String::from("testdata/reduce_out_")); + + let r = ReducePartition::new(mr, params, srcs, dst); + r._run(); + } }