changeset 69:ddd39d8ba118

Add some tests for the grouping iterator
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 06 Feb 2016 17:08:16 +0000
parents edbfc8c3ff56
children a9f8ff5da6e8
files src/reduce.rs
diffstat 1 files changed, 77 insertions(+), 22 deletions(-) [+]
line wrap: on
line diff
--- a/src/reduce.rs	Sat Feb 06 17:07:59 2016 +0000
+++ b/src/reduce.rs	Sat Feb 06 17:08:16 2016 +0000
@@ -17,24 +17,24 @@
 }
 
 impl<MR: MapReducer, InputIt: Iterator<Item=Record>, SinkGen: MRSinkGenerator> ReducePartition<MR, InputIt, SinkGen> {
-/// Create a new Reduce partition for the given MR; source and destination I/O.
-/// mr is the map/reduce functions.
-/// params is generic MR parameters as well as some applying directly to this reduce partition.
-/// srcfiles is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's
-/// outputs.
-/// dstfiles is a SinkGen (as known from the mapping phase) that is used to create the output
-/// file (there is one output file per reduce partition, currently).
+    /// Create a new Reduce partition for the given MR; source and destination I/O.
+    /// mr is the map/reduce functions.
+    /// params is generic MR parameters as well as some applying directly to this reduce partition.
+    /// srcfiles is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's
+    /// outputs.
+    /// dstfiles is a SinkGen (as known from the mapping phase) that is used to create the output
+    /// file (there is one output file per reduce partition, currently).
     pub fn new(mr: MR, params: MRParameters, srcfiles: Vec<InputIt>, dstfiles: SinkGen) -> ReducePartition<MR, InputIt, SinkGen> {
         ReducePartition { mr: mr, params: params, srcfiles: srcfiles, dstfilegen: dstfiles }
     }
 
-/// Run the Reduce partition.
+    /// Run the Reduce partition.
     pub fn _run(mut self) {
-        let mut sorted_input = self.open_sorted_input();
-// reduce input and write results.
+        let sorted_input = self.open_sorted_input();
+        // reduce input and write results.
     }
 
-/// Create an iterator that merges all input sources. Leaves self.srcfiles empty.
+    /// Create an iterator that merges all input sources. Leaves self.srcfiles empty.
     fn open_sorted_input(&mut self) -> ShardMergeIterator<Record> {
         let mut inputs = Vec::new();
         inputs.append(&mut self.srcfiles);
@@ -51,17 +51,16 @@
 /// grouping subsequent records with identical key.
 /// The original iterator must yield records in sorted order (or at least in an order where
 /// identical items are adjacent).
-struct RecordsToMultiRecords<It: Iterator<Item = Record>> {
+pub struct RecordsToMultiRecords<It: Iterator<Item = Record>> {
     it: Peekable<It>,
-    /// Efficiency knob: How big groups of records are expected to be. Default is 1.
-    expected_group_size: usize,
+    settings: MRParameters,
 }
 
 impl<It: Iterator<Item = Record>> RecordsToMultiRecords<It> {
-    fn new(it: It, egs: usize) -> RecordsToMultiRecords<It> {
+    fn new(it: It, settings: MRParameters) -> RecordsToMultiRecords<It> {
         RecordsToMultiRecords {
             it: it.peekable(),
-            expected_group_size: egs,
+            settings: settings,
         }
     }
 }
@@ -69,29 +68,85 @@
 impl<It: Iterator<Item = Record>> Iterator for RecordsToMultiRecords<It> {
     type Item = MultiRecord;
     fn next(&mut self) -> Option<Self::Item> {
-        let mut collection = Vec::with_capacity(self.expected_group_size);
+        use std::ascii::AsciiExt;
+        let mut collection = Vec::with_capacity(self.settings.reduce_group_prealloc_size);
         let key: String;
-
         match self.it.next() {
             None => return None,
             Some(r) => {
-                key = r.key;
+                if self.settings.reduce_group_insensitive {
+                    key = r.key[..].to_ascii_lowercase();
+                } else {
+                    key = r.key
+                }
                 collection.push(r.value)
             }
         }
-
         loop {
             match self.it.peek() {
                 None => break,
                 Some(r) => {
-                    if r.key != key {
+                    if !self.settings.reduce_group_insensitive && r.key != key {
+                        break;
+                    } else if self.settings.reduce_group_insensitive && r.key[..].to_ascii_lowercase() != key {
                         break;
                     }
                 }
             }
             collection.push(self.it.next().unwrap().value);
         }
-
         return Some(MultiRecord::new(key, collection));
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use parameters::MRParameters;
+    use record_types::*;
+    use std::vec;
+
+    #[test]
+    fn test_grouping_iterator() {
+        let records = vec![
+            mk_rcrd("aaa", "def"),
+            mk_rcrd("abb", "111"),
+            mk_rcrd("Abb", "112"),
+            mk_rcrd("abbb", "113"),
+            mk_rcrd("abc", "xyz"),
+            mk_rcrd("xyz", "___"),
+            mk_rcrd("xyz", "__foo"),
+            mk_rcrd("xyz", "---")];
+        let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> = RecordsToMultiRecords::new(records.into_iter(), MRParameters::new().set_reduce_group_opts(2, true));
+
+        let lengths = vec![1, 2, 1, 1, 3];
+        let mut i = 0;
+
+        for multirec in group_it {
+            assert_eq!(multirec.into_iter().count(), lengths[i]);
+            i += 1;
+        }
+    }
+
+    #[test]
+    fn test_grouping_iterator_sensitive() {
+        let records = vec![
+            mk_rcrd("aaa", "def"),
+            mk_rcrd("abb", "111"),
+            mk_rcrd("Abb", "112"),
+            mk_rcrd("abbb", "113"),
+            mk_rcrd("abc", "xyz"),
+            mk_rcrd("xyz", "___"),
+            mk_rcrd("xyz", "__foo"),
+            mk_rcrd("xyz", "---")];
+        let group_it: RecordsToMultiRecords<vec::IntoIter<Record>> = RecordsToMultiRecords::new(records.into_iter(), MRParameters::new().set_reduce_group_opts(2, false));
+
+        let lengths = vec![1, 1, 1, 1, 1, 3];
+        let mut i = 0;
+
+        for multirec in group_it {
+            assert_eq!(multirec.into_iter().count(), lengths[i]);
+            i += 1;
+        }
+    }
+}