changeset 82:0557a5c0d28d

Add test for ReducePartition
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 07 Feb 2016 09:53:03 +0000
parents 36ca534fa940
children 3f5766b3a0f6
files src/reduce.rs
diffstat 1 files changed, 35 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/src/reduce.rs	Sun Feb 07 09:52:50 2016 +0000
+++ b/src/reduce.rs	Sun Feb 07 09:53:03 2016 +0000
@@ -63,7 +63,10 @@
             self.mr.reduce(&mut emitter, multirec);
 
             for result in emitter._get().into_iter() {
-                outp.write(result.as_bytes());
+                match outp.write(result.as_bytes()) {
+                    Err(e) => println!("WARN: While reducing shard #{}: {}", self.params.shard_id, e),
+                    Ok(_) => ()
+                }
             }
         }
     }
@@ -125,6 +128,9 @@
 #[cfg(test)]
 mod tests {
     use super::*;
+
+    use closure_mr::ClosureMapReducer;
+    use formats::lines::LinesSinkGenerator;
     use parameters::MRParameters;
     use record_types::*;
 
@@ -172,4 +178,32 @@
             i += 1;
         }
     }
+
+    fn test_reducer(e: &mut REmitter, recs: MultiRecord) {
+        use std::fmt::Write;
+        use std::borrow::Borrow;
+
+        let mut out = String::with_capacity(32);
+        let _ = out.write_fmt(format_args!("{}:", recs.key()));
+
+        for val in recs {
+            let _ = out.write_str(" ");
+            let _ = out.write_str(val.borrow());
+        }
+
+        e.emit(out);
+    }
+
+    fn fake_mapper(_: &mut MEmitter, _: Record) {}
+
+    #[test]
+    fn test_reduce() {
+        let mr = ClosureMapReducer::new(fake_mapper, test_reducer);
+        let params = MRParameters::new().set_shard_id(42).set_reduce_group_opts(1, true);
+        let srcs = vec![get_records().into_iter()];
+        let dst = LinesSinkGenerator::new_to_files(&String::from("testdata/reduce_out_"));
+
+        let r = ReducePartition::new(mr, params, srcs, dst);
+        r._run();
+    }
 }