changeset 92:0a6149c1c052

Use dictionary sort order in mapping phase
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 07 Feb 2016 16:49:26 +0000
parents f451bcd4cecd
children c176114fad50
files src/map.rs src/reduce.rs src/sort.rs
diffstat 3 files changed, 45 insertions(+), 10 deletions(-) [+]
line wrap: on
line diff
--- a/src/map.rs	Sun Feb 07 16:12:59 2016 +0000
+++ b/src/map.rs	Sun Feb 07 16:49:26 2016 +0000
@@ -11,6 +11,7 @@
 use mapreducer::MapReducer;
 use parameters::MRParameters;
 use record_types::{Record, MEmitter};
+use sort::DictComparableString;
 
 /// This is the base of the mapping phase. It contains an input
 /// and intermediary input and output forms.
@@ -21,8 +22,8 @@
     params: MRParameters,
     input: MapInput,
     sink: SinkGen,
-    sorted_input: BTreeMap<String, String>,
-    sorted_output: BTreeMap<String, Vec<String>>,
+    sorted_input: BTreeMap<DictComparableString, String>,
+    sorted_output: BTreeMap<DictComparableString, Vec<String>>,
 }
 
 impl<MR: MapReducer, MapInput: Iterator<Item=Record>, SinkGen: SinkGenerator> MapPartition<MR, MapInput, SinkGen> {
@@ -53,7 +54,7 @@
             match self.input.next() {
                 None => break,
                 Some(record) => {
-                    self.sorted_input.insert(record.key, record.value);
+                    self.sorted_input.insert(DictComparableString::DCS(record.key), record.value);
                 }
             }
         }
@@ -77,7 +78,7 @@
                 let mut e = MEmitter::new();
                 self.mr.map(&mut e,
                             Record {
-                                key: k.clone(),
+                                key: k.clone().unwrap(),
                                 value: val,
                             });
                 self.insert_result(e);
@@ -109,10 +110,10 @@
         let mut outputs = self.setup_output();
 
         for (k, vs) in self.sorted_output.iter() {
-            let shard = self.mr.shard(self.params.reducers, k);
+            let shard = self.mr.shard(self.params.reducers, k.as_ref());
 
             for v in vs {
-                let r1 = outputs[shard].write(k.as_bytes());
+                let r1 = outputs[shard].write(k.as_ref().as_bytes());
                 match r1 {
                     Err(e) => panic!("couldn't write map output: {}", e),
                     Ok(_) => (),
@@ -130,16 +131,16 @@
         for r in emitter._get() {
             let e;
             {
-                e = self.sorted_output.remove(&r.key);
+                e = self.sorted_output.remove(&DictComparableString::wrap(r.key.clone()));
             }
 
             match e {
                 None => {
-                    self.sorted_output.insert(r.key, vec![r.value]);
+                    self.sorted_output.insert(DictComparableString::wrap(r.key.clone()), vec![r.value]);
                 }
                 Some(mut v) => {
                     v.push(r.value);
-                    self.sorted_output.insert(r.key, v);
+                    self.sorted_output.insert(DictComparableString::wrap(r.key.clone()), v);
                 }
             }
         }
--- a/src/reduce.rs	Sun Feb 07 16:12:59 2016 +0000
+++ b/src/reduce.rs	Sun Feb 07 16:49:26 2016 +0000
@@ -210,7 +210,7 @@
         let srcs = vec![get_records().into_iter()];
         let dst = LinesSinkGenerator::new_to_files();
 
-        let r = ReducePartition::new(mr, params, srcs, dst.new_output(&String::from("0")));
+        let r = ReducePartition::new(mr, params, srcs, dst.new_output(&String::from("testdata/result_0")));
         r._run();
     }
 }
--- a/src/sort.rs	Sun Feb 07 16:12:59 2016 +0000
+++ b/src/sort.rs	Sun Feb 07 16:49:26 2016 +0000
@@ -83,6 +83,40 @@
     a.to_ascii_lowercase().cmp(&b.to_ascii_lowercase())
 }
 
+/// A wrapped string that uses a dictionary string comparison as Ord implementation.
+#[derive(PartialEq, Eq, Clone)]
+pub enum DictComparableString {
+    DCS(String),
+}
+
+impl DictComparableString {
+    pub fn wrap(s: String) -> DictComparableString {
+        DictComparableString::DCS(s)
+    }
+    pub fn unwrap(self) -> String {
+        let DictComparableString::DCS(s) = self;
+        s
+    }
+    pub fn as_ref(&self) -> &String {
+        let &DictComparableString::DCS(ref s) = self;
+        s
+    }
+}
+
+impl PartialOrd for DictComparableString {
+    fn partial_cmp(&self, other: &DictComparableString) -> Option<Ordering> {
+        let (&DictComparableString::DCS(ref a), &DictComparableString::DCS(ref b)) = (self, other);
+        Some(dict_string_compare(a, b))
+    }
+}
+
+impl Ord for DictComparableString {
+    fn cmp(&self, other: &DictComparableString) -> Ordering {
+        let (&DictComparableString::DCS(ref a), &DictComparableString::DCS(ref b)) = (self, other);
+        dict_string_compare(a, b)
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;