Mercurial > lbo > hg > localmr
changeset 92:0a6149c1c052
Use dictionary sort order in mapping phase
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 07 Feb 2016 16:49:26 +0000 |
parents | f451bcd4cecd |
children | c176114fad50 |
files | src/map.rs src/reduce.rs src/sort.rs |
diffstat | 3 files changed, 45 insertions(+), 10 deletions(-) [+] |
line wrap: on
line diff
--- a/src/map.rs Sun Feb 07 16:12:59 2016 +0000 +++ b/src/map.rs Sun Feb 07 16:49:26 2016 +0000 @@ -11,6 +11,7 @@ use mapreducer::MapReducer; use parameters::MRParameters; use record_types::{Record, MEmitter}; +use sort::DictComparableString; /// This is the base of the mapping phase. It contains an input /// and intermediary input and output forms. @@ -21,8 +22,8 @@ params: MRParameters, input: MapInput, sink: SinkGen, - sorted_input: BTreeMap<String, String>, - sorted_output: BTreeMap<String, Vec<String>>, + sorted_input: BTreeMap<DictComparableString, String>, + sorted_output: BTreeMap<DictComparableString, Vec<String>>, } impl<MR: MapReducer, MapInput: Iterator<Item=Record>, SinkGen: SinkGenerator> MapPartition<MR, MapInput, SinkGen> { @@ -53,7 +54,7 @@ match self.input.next() { None => break, Some(record) => { - self.sorted_input.insert(record.key, record.value); + self.sorted_input.insert(DictComparableString::DCS(record.key), record.value); } } } @@ -77,7 +78,7 @@ let mut e = MEmitter::new(); self.mr.map(&mut e, Record { - key: k.clone(), + key: k.clone().unwrap(), value: val, }); self.insert_result(e); @@ -109,10 +110,10 @@ let mut outputs = self.setup_output(); for (k, vs) in self.sorted_output.iter() { - let shard = self.mr.shard(self.params.reducers, k); + let shard = self.mr.shard(self.params.reducers, k.as_ref()); for v in vs { - let r1 = outputs[shard].write(k.as_bytes()); + let r1 = outputs[shard].write(k.as_ref().as_bytes()); match r1 { Err(e) => panic!("couldn't write map output: {}", e), Ok(_) => (), @@ -130,16 +131,16 @@ for r in emitter._get() { let e; { - e = self.sorted_output.remove(&r.key); + e = self.sorted_output.remove(&DictComparableString::wrap(r.key.clone())); } match e { None => { - self.sorted_output.insert(r.key, vec![r.value]); + self.sorted_output.insert(DictComparableString::wrap(r.key.clone()), vec![r.value]); } Some(mut v) => { v.push(r.value); - self.sorted_output.insert(r.key, v); + self.sorted_output.insert(DictComparableString::wrap(r.key.clone()), v); } } }
--- a/src/reduce.rs Sun Feb 07 16:12:59 2016 +0000 +++ b/src/reduce.rs Sun Feb 07 16:49:26 2016 +0000 @@ -210,7 +210,7 @@ let srcs = vec![get_records().into_iter()]; let dst = LinesSinkGenerator::new_to_files(); - let r = ReducePartition::new(mr, params, srcs, dst.new_output(&String::from("0"))); + let r = ReducePartition::new(mr, params, srcs, dst.new_output(&String::from("testdata/result_0"))); r._run(); } }
--- a/src/sort.rs Sun Feb 07 16:12:59 2016 +0000 +++ b/src/sort.rs Sun Feb 07 16:49:26 2016 +0000 @@ -83,6 +83,40 @@ a.to_ascii_lowercase().cmp(&b.to_ascii_lowercase()) } +/// A wrapped string that uses a dictionary string comparison as Ord implementation. +#[derive(PartialEq, Eq, Clone)] +pub enum DictComparableString { + DCS(String), +} + +impl DictComparableString { + pub fn wrap(s: String) -> DictComparableString { + DictComparableString::DCS(s) + } + pub fn unwrap(self) -> String { + let DictComparableString::DCS(s) = self; + s + } + pub fn as_ref(&self) -> &String { + let &DictComparableString::DCS(ref s) = self; + s + } +} + +impl PartialOrd for DictComparableString { + fn partial_cmp(&self, other: &DictComparableString) -> Option<Ordering> { + let (&DictComparableString::DCS(ref a), &DictComparableString::DCS(ref b)) = (self, other); + Some(dict_string_compare(a, b)) + } +} + +impl Ord for DictComparableString { + fn cmp(&self, other: &DictComparableString) -> Ordering { + let (&DictComparableString::DCS(ref a), &DictComparableString::DCS(ref b)) = (self, other); + dict_string_compare(a, b) + } +} + #[cfg(test)] mod tests { use super::*;