Mercurial > lbo > hg > localmr
changeset 108:27afb9528618
Move SinkGenerator and utils around; make file name generation saner
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Wed, 15 Jun 2016 19:55:04 +0200 |
parents | 500571302783 |
children | d676299a0a2d |
files | src/controller.rs src/formats/lines.rs src/formats/util.rs src/formats/writelog.rs src/phases/map.rs src/phases/mod.rs src/phases/output.rs src/shard_merge.rs |
diffstat | 8 files changed, 60 insertions(+), 62 deletions(-) [+] |
line wrap: on
line diff
--- a/src/controller.rs Wed Jun 15 19:27:29 2016 +0200 +++ b/src/controller.rs Wed Jun 15 19:55:04 2016 +0200 @@ -1,7 +1,7 @@ //! Controls the execution of a mapreduce instance. -use formats::util::{SinkGenerator, RecordReadIterator}; -use formats::writelog::{WriteLogGenerator, WriteLogReader}; +use phases::output::{SinkGenerator, open_reduce_inputs, get_reduce_output_name}; +use formats::writelog::WriteLogGenerator; use input_cache::InputCache; use phases::map::MapPartition; use mapreducer::{Mapper, Reducer, Sharder}; @@ -24,30 +24,6 @@ map_partitions_run: usize, } -/// Calculates the name of a reduce output shard from the parameters. -fn get_reduce_output_name(params: &MRParameters) -> String { - use std::fmt; - let mut name = String::new(); - name.push_str(¶ms.reduce_output_shard_prefix[..]); - name.push_str(&fmt::format(format_args!("{}", params.shard_id))[..]); - name -} - -fn open_reduce_inputs(params: &MRParameters, - partitions: usize, - shard: usize) - -> Vec<RecordReadIterator<WriteLogReader>> { - use std::fmt; - let mut inputs = Vec::new(); - - for part in 0..partitions { - let name = fmt::format(format_args!("{}{}.{}", params.map_output_location, part, shard)); - let wlg_reader = WriteLogReader::new_from_file(&name).unwrap(); - inputs.push(RecordReadIterator::new(wlg_reader)); - } - inputs -} - impl<M: Mapper, R: Reducer, S: Sharder> MRController<M, R, S> { /// Create a new mapreduce instance and execute it immediately. @@ -120,7 +96,6 @@ } fn read_map_input<In: Iterator<Item = Record>>(it: &mut In, approx_bytes: usize) -> InputCache { - let inp_cache = InputCache::from_iter(8192, approx_bytes, it); inp_cache } @@ -137,7 +112,7 @@ let output = outp.clone(); scope.execute(move || { - let inputs = open_reduce_inputs(¶ms, map_partitions, i); + let inputs = open_reduce_inputs(¶ms.map_output_location, map_partitions, i); let output = output.new_output(&get_reduce_output_name(¶ms)); let reduce_part = ReducePartition::new(r, params, inputs, output); reduce_part._run();
--- a/src/formats/lines.rs Wed Jun 15 19:27:29 2016 +0200 +++ b/src/formats/lines.rs Wed Jun 15 19:55:04 2016 +0200 @@ -3,7 +3,7 @@ //! using the RecordIterator from formats::util, the necessary key/value //! iterator can be implemented. -use formats::util; +use phases::output::SinkGenerator; use std::fs; use std::io; use std::io::{Read, BufRead}; @@ -102,10 +102,7 @@ /// and creates text files based on it. #[allow(dead_code)] #[derive(Clone)] -pub struct LinesSinkGenerator { - // bogus field - i: i32, -} +pub struct LinesSinkGenerator; unsafe impl Send for LinesSinkGenerator {} @@ -113,11 +110,11 @@ /// Use either a path like `/a/b/c/` to generate files in a directory /// or `/a/b/c/file_prefix_` to create files with that prefix. pub fn new_to_files() -> LinesSinkGenerator { - LinesSinkGenerator { i: 0 } + LinesSinkGenerator { } } } -impl util::SinkGenerator for LinesSinkGenerator { +impl SinkGenerator for LinesSinkGenerator { type Sink = LinesWriter<fs::File>; fn new_output(&self, p: &String) -> Self::Sink { let f = fs::OpenOptions::new().write(true).truncate(true).create(true).open(p);
--- a/src/formats/util.rs Wed Jun 15 19:27:29 2016 +0200 +++ b/src/formats/util.rs Wed Jun 15 19:55:04 2016 +0200 @@ -3,7 +3,6 @@ use record_types::Record; use std::fmt; -use std::io; /// Transforms an iterator<string> into an iterator<Record>. It yields /// records with the key being the position of the current record, starting with @@ -68,17 +67,3 @@ } } } - -/// A type implementing SinkGenerator is used at the end of the reducer -/// phase to write the output. Given a name, new() should return a new object -/// that can be used to write the output of a reduce partition. -/// Values are always written as a whole to the writer. -/// -/// SinkGenerator types are used in general to determine the format of outputs; existing options -/// are plain text files (LinesSinkGenerator) or length-prefixed binary files (WriteLogGenerator). -pub trait SinkGenerator: Send + Clone { - type Sink: io::Write; - /// Return a new output identified by name. The existing sink generators use `name` to open - /// files with that name (or path). - fn new_output(&self, name: &String) -> Self::Sink; -}
--- a/src/formats/writelog.rs Wed Jun 15 19:27:29 2016 +0200 +++ b/src/formats/writelog.rs Wed Jun 15 19:55:04 2016 +0200 @@ -10,7 +10,7 @@ use std::vec; use std::string; -use formats::util::SinkGenerator; +use phases::output::SinkGenerator; /// A length-prefixed record stream named for the original use case, /// which was to write a log of all write operations to a database.
--- a/src/phases/map.rs Wed Jun 15 19:27:29 2016 +0200 +++ b/src/phases/map.rs Wed Jun 15 19:55:04 2016 +0200 @@ -4,10 +4,9 @@ #![allow(dead_code)] use std::collections::BTreeMap; -use std::fmt; use std::io::Write; -use formats::util::SinkGenerator; +use phases::output::SinkGenerator; use mapreducer::{Mapper, Sharder}; use parameters::MRParameters; use record_types::{Record, MEmitter}; @@ -104,11 +103,9 @@ let mut outputs = Vec::new(); for i in 0..self.params.reducers { - let out = self.sink.new_output( - &fmt::format(format_args!("{}{}.{}", - self.params.map_output_location, - self.params.shard_id, - i))); + let out = self.sink.new_map_output(&self.params.map_output_location, + self.params.shard_id, + i); outputs.push(out); } assert_eq!(outputs.len(), self.params.reducers);
--- a/src/phases/mod.rs Wed Jun 15 19:27:29 2016 +0200 +++ b/src/phases/mod.rs Wed Jun 15 19:55:04 2016 +0200 @@ -1,2 +1,4 @@ pub mod map; pub mod reduce; + +pub mod output;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/phases/output.rs Wed Jun 15 19:55:04 2016 +0200 @@ -0,0 +1,46 @@ +use std::io; +use formats::util::RecordReadIterator; +use formats::writelog::WriteLogReader; +use parameters::MRParameters; + +fn map_output_name(base: &String, mapper: usize, shard: usize) -> String { + format!("{}-{}.{}", base, mapper, shard) +} + +/// A type implementing SinkGenerator is used at the end of the reducer +/// phase to write the output. Given a name, new() should return a new object +/// that can be used to write the output of a reduce partition. +/// Values are always written as a whole to the writer. +/// +/// SinkGenerator types are used in general to determine the format of outputs; existing options +/// are plain text files (LinesSinkGenerator) or length-prefixed binary files (WriteLogGenerator). +pub trait SinkGenerator: Send + Clone { + type Sink: io::Write; + /// Return a new intermediary file handle destined for reduce shard `shard` and requested by + /// map shard `mapper`. + fn new_map_output(&self, location: &String, mapper: usize, shard: usize) -> Self::Sink { + self.new_output(&map_output_name(location, mapper, shard)) + } + + /// Return a new file handle for `location`. + fn new_output(&self, location: &String) -> Self::Sink; +} + +pub fn open_reduce_inputs(location: &String, + partitions: usize, + shard: usize) + -> Vec<RecordReadIterator<WriteLogReader>> { + let mut inputs = Vec::new(); + + for part in 0..partitions { + let name = map_output_name(location, part, shard); + let wlg_reader = WriteLogReader::new_from_file(&name).unwrap(); + inputs.push(RecordReadIterator::new(wlg_reader)); + } + inputs +} + +/// Calculates the name of a reduce output shard from the parameters. +pub fn get_reduce_output_name(params: &MRParameters) -> String { + format!("{}{}", params.reduce_output_shard_prefix, params.shard_id) +}
--- a/src/shard_merge.rs Wed Jun 15 19:27:29 2016 +0200 +++ b/src/shard_merge.rs Wed Jun 15 19:55:04 2016 +0200 @@ -69,10 +69,6 @@ right: Box::new(iter::empty()), left_peeked: None, right_peeked: None, - // BUG: This should not be altered when used with Map phase output. - // The map phase uses a BTreeMap in order to sort the output, and the BTM - // only uses the standard Ord implementation for strings. Should the requirements - // change, we can work around that. comparer: sort::default_generic_compare, } }