Mercurial > lbo > hg > localmr
changeset 99:e6b0d8ebe0f1
Implement dedicated type for caching mapping inputs
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Mon, 08 Feb 2016 20:36:55 +0000 |
parents | 97b70b7203a2 |
children | 50ab6ea515b1 |
files | src/controller.rs src/input_cache.rs src/lib.rs |
diffstat | 3 files changed, 86 insertions(+), 19 deletions(-) [+] |
line wrap: on
line diff
--- a/src/controller.rs Sun Feb 07 19:26:21 2016 +0000 +++ b/src/controller.rs Mon Feb 08 20:36:55 2016 +0000 @@ -2,13 +2,13 @@ use formats::util::{SinkGenerator, RecordReadIterator}; use formats::writelog::{WriteLogGenerator, WriteLogReader}; +use input_cache::InputCache; use map::MapPartition; use mapreducer::MapReducer; use parameters::MRParameters; use record_types::Record; use reduce::ReducePartition; -use std::collections::LinkedList; use std::sync::mpsc::sync_channel; extern crate scoped_threadpool; @@ -63,30 +63,21 @@ controller.clean_up(); } - fn map_runner(mr: MR, params: MRParameters, inp: LinkedList<Record>) { + fn map_runner(mr: MR, params: MRParameters, inp: InputCache) { if inp.len() == 0 { return; } let intermed_out = WriteLogGenerator::new(); - let map_part = MapPartition::_new(params, inp.into_iter(), mr, intermed_out); + let map_part = MapPartition::_new(params, inp, mr, intermed_out); map_part._run(); } fn read_map_input<In: Iterator<Item = Record>>(it: &mut In, approx_bytes: usize) - -> LinkedList<Record> { - let mut ll = LinkedList::new(); - let mut bytes_read: usize = 0; + -> InputCache { - for r in it { - bytes_read += r.key.len() + r.value.len() + 4; // Heuristics :P - ll.push_back(r); - - if bytes_read > approx_bytes { - break; - } - } - ll + let inp_cache = InputCache::from_iter(8192, approx_bytes, it); + inp_cache } fn run_map<In: Iterator<Item = Record>>(&mut self, mut input: In) {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/input_cache.rs Mon Feb 08 20:36:55 2016 +0000 @@ -0,0 +1,75 @@ +use std::collections::linked_list; +use std::collections::LinkedList; +use std::vec; + +use record_types::Record; + +/// Holds inputs, e.g. to the Map phase, in memory. +/// Specialty: Holding large amounts in memory in a way that is both efficient to store and +/// efficient to iterate. +pub struct InputCache { + chunks_iter: linked_list::IntoIter<Vec<Record>>, + chunk_iter: vec::IntoIter<Record>, + len: usize, +} + +impl InputCache { + pub fn from_iter<It: IntoIterator<Item = Record>>(chunk_length: usize, max_bytes: usize, it: It) -> Self { + let mut chunklist = LinkedList::new(); + let mut chunk = Vec::with_capacity(chunk_length); + + let mut i: usize = 0; + let mut complete_length: usize = 0; + let mut bytes_read: usize = 0; + + for v in it { + i += 1; + complete_length += 1; + bytes_read += v.key.len() + v.value.len(); + + chunk.push(v); + + if i >= chunk_length { + chunklist.push_back(chunk); + chunk = Vec::with_capacity(chunk_length); + i = 0; + } + if bytes_read >= max_bytes { + break; + } + } + + if chunk.len() > 0 { + chunklist.push_back(chunk); + } + + if chunklist.len() == 0 { + InputCache { len: 0, chunks_iter: LinkedList::new().into_iter(), chunk_iter: Vec::new().into_iter() } + } else { + let first_chunk_iterator = chunklist.pop_front().unwrap().into_iter(); + InputCache { len: complete_length, chunks_iter: chunklist.into_iter(), chunk_iter: first_chunk_iterator } + } + } + + pub fn len(&self) -> usize { + self.len + } +} + +impl Iterator for InputCache { + type Item = Record; + fn next(&mut self) -> Option<Self::Item> { + match self.chunk_iter.next() { + None => (), + Some(v) => return Some(v), + } + match self.chunks_iter.next() { + None => (), + Some(chunk) => { + self.chunk_iter = chunk.into_iter(); + return self.chunk_iter.next() + }, + } + None + } +}
--- a/src/lib.rs Sun Feb 07 19:26:21 2016 +0000 +++ b/src/lib.rs Mon Feb 08 20:36:55 2016 +0000 @@ -5,13 +5,14 @@ pub mod closure_mr; pub mod controller; pub mod formats; -pub mod map; +pub mod input_cache; +mod map; pub mod mapreducer; pub mod parameters; pub mod record_types; -pub mod reduce; -pub mod shard_merge; -pub mod sort; +mod reduce; +mod shard_merge; +mod sort; #[test] fn it_works() {}