changeset 99:e6b0d8ebe0f1

Implement dedicated type for caching mapping inputs
author Lewin Bormann <lbo@spheniscida.de>
date Mon, 08 Feb 2016 20:36:55 +0000
parents 97b70b7203a2
children 50ab6ea515b1
files src/controller.rs src/input_cache.rs src/lib.rs
diffstat 3 files changed, 86 insertions(+), 19 deletions(-) [+]
line wrap: on
line diff
--- a/src/controller.rs	Sun Feb 07 19:26:21 2016 +0000
+++ b/src/controller.rs	Mon Feb 08 20:36:55 2016 +0000
@@ -2,13 +2,13 @@
 
 use formats::util::{SinkGenerator, RecordReadIterator};
 use formats::writelog::{WriteLogGenerator, WriteLogReader};
+use input_cache::InputCache;
 use map::MapPartition;
 use mapreducer::MapReducer;
 use parameters::MRParameters;
 use record_types::Record;
 use reduce::ReducePartition;
 
-use std::collections::LinkedList;
 use std::sync::mpsc::sync_channel;
 
 extern crate scoped_threadpool;
@@ -63,30 +63,21 @@
         controller.clean_up();
     }
 
-    fn map_runner(mr: MR, params: MRParameters, inp: LinkedList<Record>) {
+    fn map_runner(mr: MR, params: MRParameters, inp: InputCache) {
         if inp.len() == 0 {
             return;
         }
         let intermed_out = WriteLogGenerator::new();
-        let map_part = MapPartition::_new(params, inp.into_iter(), mr, intermed_out);
+        let map_part = MapPartition::_new(params, inp, mr, intermed_out);
         map_part._run();
     }
 
     fn read_map_input<In: Iterator<Item = Record>>(it: &mut In,
                                                    approx_bytes: usize)
-                                                   -> LinkedList<Record> {
-        let mut ll = LinkedList::new();
-        let mut bytes_read: usize = 0;
+                                                   -> InputCache {
 
-        for r in it {
-            bytes_read += r.key.len() + r.value.len() + 4; // Heuristics :P
-            ll.push_back(r);
-
-            if bytes_read > approx_bytes {
-                break;
-            }
-        }
-        ll
+        let inp_cache = InputCache::from_iter(8192, approx_bytes, it);
+        inp_cache
     }
 
     fn run_map<In: Iterator<Item = Record>>(&mut self, mut input: In) {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/input_cache.rs	Mon Feb 08 20:36:55 2016 +0000
@@ -0,0 +1,75 @@
+use std::collections::linked_list;
+use std::collections::LinkedList;
+use std::vec;
+
+use record_types::Record;
+
+/// Holds inputs, e.g. to the Map phase, in memory.
+/// Specialty: Holding large amounts in memory in a way that is both efficient to store and
+/// efficient to iterate.
+pub struct InputCache {
+    chunks_iter: linked_list::IntoIter<Vec<Record>>,
+    chunk_iter: vec::IntoIter<Record>,
+    len: usize,
+}
+
+impl InputCache {
+    pub fn from_iter<It: IntoIterator<Item = Record>>(chunk_length: usize, max_bytes: usize, it: It) -> Self {
+        let mut chunklist = LinkedList::new();
+        let mut chunk = Vec::with_capacity(chunk_length);
+
+        let mut i: usize = 0;
+        let mut complete_length: usize = 0;
+        let mut bytes_read: usize = 0;
+
+        for v in it {
+            i += 1;
+            complete_length += 1;
+            bytes_read += v.key.len() + v.value.len();
+
+            chunk.push(v);
+
+            if i >= chunk_length {
+                chunklist.push_back(chunk);
+                chunk = Vec::with_capacity(chunk_length);
+                i = 0;
+            }
+            if bytes_read >= max_bytes {
+                break;
+            }
+        }
+
+        if chunk.len() > 0 {
+            chunklist.push_back(chunk);
+        }
+
+        if chunklist.len() == 0 {
+            InputCache { len: 0, chunks_iter: LinkedList::new().into_iter(), chunk_iter: Vec::new().into_iter() }
+        } else {
+            let first_chunk_iterator = chunklist.pop_front().unwrap().into_iter();
+            InputCache { len: complete_length, chunks_iter: chunklist.into_iter(), chunk_iter: first_chunk_iterator }
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+}
+
+impl Iterator for InputCache {
+    type Item = Record;
+    fn next(&mut self) -> Option<Self::Item> {
+        match self.chunk_iter.next() {
+            None => (),
+            Some(v) => return Some(v),
+        }
+        match self.chunks_iter.next() {
+            None => (),
+            Some(chunk) => {
+                self.chunk_iter = chunk.into_iter();
+                return self.chunk_iter.next()
+            },
+        }
+        None
+    }
+}
--- a/src/lib.rs	Sun Feb 07 19:26:21 2016 +0000
+++ b/src/lib.rs	Mon Feb 08 20:36:55 2016 +0000
@@ -5,13 +5,14 @@
 pub mod closure_mr;
 pub mod controller;
 pub mod formats;
-pub mod map;
+pub mod input_cache;
+mod map;
 pub mod mapreducer;
 pub mod parameters;
 pub mod record_types;
-pub mod reduce;
-pub mod shard_merge;
-pub mod sort;
+mod reduce;
+mod shard_merge;
+mod sort;
 
 #[test]
 fn it_works() {}