changeset 71:2a58864abf08

Implement actual reducing
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 06 Feb 2016 18:37:22 +0000
parents a9f8ff5da6e8
children 98b6d1e37f6a
files src/reduce.rs
diffstat 1 files changed, 35 insertions(+), 20 deletions(-) [+]
line wrap: on
line diff
--- a/src/reduce.rs	Sat Feb 06 18:37:00 2016 +0000
+++ b/src/reduce.rs	Sat Feb 06 18:37:22 2016 +0000
@@ -30,19 +30,38 @@
 
     /// Run the Reduce partition.
     pub fn _run(mut self) {
-        let sorted_input = self.open_sorted_input();
-        // reduce input and write results.
-    }
-
-    /// Create an iterator that merges all input sources. Leaves self.srcfiles empty.
-    fn open_sorted_input(&mut self) -> ShardMergeIterator<Record> {
         let mut inputs = Vec::new();
         inputs.append(&mut self.srcfiles);
         let mut it = inputs.into_iter();
 
-        ShardMergeIterator::build(&mut it)
+        let params = self.params.clone();
+
+        self.reduce(RecordsToMultiRecords::new(ShardMergeIterator::build(&mut it), params))
+    }
+
+    fn get_output_name(&self) -> String {
+        use std::fmt;
+        let mut name = String::new();
+        name.push_str(&self.params.reduce_output_shard_prefix[..]);
+        name.push_str(&fmt::format(format_args!("{}", self.params.shard_id))[..]);
+        name
     }
 
+    fn reduce<RecIt: Iterator<Item=Record>>(mut self, inp: RecordsToMultiRecords<RecIt>) {
+        use std::io::Write;
+
+        let name = self.get_output_name();
+        let mut outp = self.dstfilegen.new_output(&name);
+
+        for multirec in inp {
+            let mut emitter = REmitter::new();
+            self.mr.reduce(&mut emitter, multirec);
+
+            for result in emitter._get().into_iter() {
+                outp.write(result.as_bytes());
+            }
+        }
+    }
 }
 
 use std::iter::Peekable;
@@ -106,9 +125,8 @@
     use record_types::*;
     use std::vec;
 
-    #[test]
-    fn test_grouping_iterator() {
-        let records = vec![
+    fn get_records() -> Vec<Record>{
+        vec![
             mk_rcrd("aaa", "def"),
             mk_rcrd("abb", "111"),
             mk_rcrd("Abb", "112"),
@@ -116,7 +134,12 @@
             mk_rcrd("abc", "xyz"),
             mk_rcrd("xyz", "___"),
             mk_rcrd("xyz", "__foo"),
-            mk_rcrd("xyz", "---")];
+            mk_rcrd("xyz", "---")]
+    }
+
+    #[test]
+    fn test_grouping_iterator() {
+        let records = get_records();
         let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> = RecordsToMultiRecords::new(records.into_iter(), MRParameters::new().set_reduce_group_opts(2, true));
 
         let lengths = vec![1, 2, 1, 1, 3];
@@ -130,15 +153,7 @@
 
     #[test]
     fn test_grouping_iterator_sensitive() {
-        let records = vec![
-            mk_rcrd("aaa", "def"),
-            mk_rcrd("abb", "111"),
-            mk_rcrd("Abb", "112"),
-            mk_rcrd("abbb", "113"),
-            mk_rcrd("abc", "xyz"),
-            mk_rcrd("xyz", "___"),
-            mk_rcrd("xyz", "__foo"),
-            mk_rcrd("xyz", "---")];
+        let records = get_records();
         let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> = RecordsToMultiRecords::new(records.into_iter(), MRParameters::new().set_reduce_group_opts(2, false));
 
         let lengths = vec![1, 1, 1, 1, 1, 3];