view src/map.rs @ 29:f11aeab31e37

Fix compiler warnings
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 31 Jan 2016 17:05:14 +0000
parents 67ce650d5de5
children 03ea3c83f499
line wrap: on
line source

//! Implements the mapping phase.
//!

use std::io::Write;
use std::collections::{LinkedList, BTreeMap};
use mapreducer::{Record, MapReducer, MEmitter};

type MapInput = LinkedList<Record>;

/// This is the base of the mapping phase. It contains an input
/// and intermediary input and output forms.
/// Mapper threads run on this. Every mapper thread has one MapPartition
/// instance per input chunk.
struct MapPartition<MR: MapReducer> {
    mr: MR,
    input: MapInput,
    output: Box<Write>,
    sorted_input: BTreeMap<String, String>,
    sorted_output: BTreeMap<String, Vec<String>>,
}

impl<MR: MapReducer> MapPartition<MR> {
    pub fn _new(input: MapInput, mr: MR, output: Box<Write>) -> MapPartition<MR> {
        MapPartition {
            mr: mr,
            input: input,
            output: output,
            sorted_input: BTreeMap::new(),
            sorted_output: BTreeMap::new(),
        }
    }
    pub fn _run(mut self) {
        self.sort_input();
        self.do_map();
        self.write_output();
    }

    /// Sorts input into the sorted_input map, moving the records on the way
    /// (so no copying happens and memory consumption stays low-ish)
    fn sort_input(&mut self) {
        loop {
            match self.input.pop_front() {
                None => break,
                Some(record) => {
                    self.sorted_input.insert(record.key, record.value);
                }
            }
        }
    }

    /// Executes the mapping phase.
    fn do_map(&mut self) {
        // TODO: Make this configurable
        let key_buffer_size: usize = 256;
        let mut key_buffer = Vec::with_capacity(key_buffer_size);

        loop {
            for k in self.sorted_input.keys().take(key_buffer_size) {
                key_buffer.push(k.clone())
            }

            for k in &key_buffer[..] {
                let val;
                match self.sorted_input.remove(k) {
                    None => continue,
                    Some(v) => val = v,
                }
                let mut e = MEmitter::new();
                self.mr.map(&mut e,
                            Record {
                                key: k.clone(),
                                value: val,
                            });
                self.insert_result(e);
            }

            key_buffer.clear();
        }
    }

    fn write_output(&mut self) {
        for (k, vs) in self.sorted_output.iter() {
            for v in vs {
                let r1 = self.output.write(k.as_bytes());
                match r1 {
                    Err(e) => panic!("couldn't write map output: {}", e),
                    Ok(_) => (),
                }
                let r2 = self.output.write(v.as_bytes());
                match r2 {
                    Err(e) => panic!("couldn't write map output: {}", e),
                    Ok(_) => (),
                }
            }
        }
    }

    fn insert_result(&mut self, emitter: MEmitter) {
        for r in emitter._get() {
            let e;
            {
                e = self.sorted_output.remove(&r.key);
            }

            match e {
                None => {
                    self.sorted_output.insert(r.key, vec![r.value]);
                }
                Some(mut v) => {
                    v.push(r.value);
                    self.sorted_output.insert(r.key, v);
                }
            }
        }
    }
}