changeset 66:9d7c1468f1b2

Started implementation of reduce phase
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 06 Feb 2016 00:08:38 +0000
parents 07095d20ba5c
children 5bfe21949e22
files src/lib.rs src/reduce.rs src/shard_merge.rs
diffstat 3 files changed, 99 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/src/lib.rs	Sat Feb 06 00:08:27 2016 +0000
+++ b/src/lib.rs	Sat Feb 06 00:08:38 2016 +0000
@@ -8,6 +8,7 @@
 pub mod mapreducer;
 pub mod parameters;
 pub mod record_types;
+pub mod reduce;
 pub mod shard_merge;
 pub mod sort;
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/reduce.rs	Sat Feb 06 00:08:38 2016 +0000
@@ -0,0 +1,97 @@
+//! Implements the Reduce phase.
+//!
+
+use mapreducer::MapReducer;
+use parameters::MRParameters;
+use formats::util::MRSinkGenerator;
+use record_types::{Record, MultiRecord, REmitter};
+use shard_merge::ShardMergeIterator;
+
+struct ReducePartition<MR: MapReducer, InputIt: Iterator<Item = Record>, SinkGen: MRSinkGenerator> {
+    mr: MR,
+    params: MRParameters,
+    // Maybe we want to genericize this to an Iterator<Item=Read> or so? This defers opening
+    // the files to the reduce shard itself.
+    srcfiles: Vec<InputIt>,
+    dstfilegen: SinkGen,
+}
+
+impl<MR: MapReducer, InputIt: Iterator<Item=Record>, SinkGen: MRSinkGenerator> ReducePartition<MR, InputIt, SinkGen> {
+/// Create a new Reduce partition for the given MR; source and destination I/O.
+/// mr is the map/reduce functions.
+/// params is generic MR parameters as well as some applying directly to this reduce partition.
+/// srcfiles is a set of Iterator<Item=Record>s. Those are usually reading from the map phase's
+/// outputs.
+/// dstfiles is a SinkGen (as known from the mapping phase) that is used to create the output
+/// file (there is one output file per reduce partition, currently).
+    pub fn new(mr: MR, params: MRParameters, srcfiles: Vec<InputIt>, dstfiles: SinkGen) -> ReducePartition<MR, InputIt, SinkGen> {
+        ReducePartition { mr: mr, params: params, srcfiles: srcfiles, dstfilegen: dstfiles }
+    }
+
+/// Run the Reduce partition.
+    pub fn _run(mut self) {
+        let mut sorted_input = self.open_sorted_input();
+// reduce input and write results.
+    }
+
+/// Create an iterator that merges all input sources. Leaves self.srcfiles empty.
+    fn open_sorted_input(&mut self) -> ShardMergeIterator<Record> {
+        let mut inputs = Vec::new();
+        inputs.append(&mut self.srcfiles);
+        let mut it = inputs.into_iter();
+
+        ShardMergeIterator::build(&mut it)
+    }
+
+}
+
+use std::iter::Peekable;
+
+/// Iterator adapter: Converts an Iterator<Item=Record> into an Iterator<Item=MultiRecord> by
+/// grouping subsequent records with identical key.
+/// The original iterator must yield records in sorted order (or at least in an order where
+/// identical items are adjacent).
+struct RecordsToMultiRecords<It: Iterator<Item = Record>> {
+    it: Peekable<It>,
+    /// Efficiency knob: How big groups of records are expected to be. Default is 1.
+    expected_group_size: usize,
+}
+
+impl<It: Iterator<Item = Record>> RecordsToMultiRecords<It> {
+    fn new(it: It, egs: usize) -> RecordsToMultiRecords<It> {
+        RecordsToMultiRecords {
+            it: it.peekable(),
+            expected_group_size: egs,
+        }
+    }
+}
+
+impl<It: Iterator<Item = Record>> Iterator for RecordsToMultiRecords<It> {
+    type Item = MultiRecord;
+    fn next(&mut self) -> Option<Self::Item> {
+        let mut collection = Vec::with_capacity(self.expected_group_size);
+        let key: String;
+
+        match self.it.next() {
+            None => return None,
+            Some(r) => {
+                key = r.key;
+                collection.push(r.value)
+            }
+        }
+
+        loop {
+            match self.it.peek() {
+                None => break,
+                Some(r) => {
+                    if r.key != key {
+                        break;
+                    }
+                }
+            }
+            collection.push(self.it.next().unwrap().value);
+        }
+
+        return Some(MultiRecord::new(key, collection));
+    }
+}
--- a/src/shard_merge.rs	Sat Feb 06 00:08:27 2016 +0000
+++ b/src/shard_merge.rs	Sat Feb 06 00:08:38 2016 +0000
@@ -229,7 +229,7 @@
     use sort;
 
     // Slow test!
-    //#[test]
+    // #[test]
     fn test_merge_large_files() {
         let mut files = Vec::with_capacity(11);