view src/formats/util.rs @ 87:168e8777de73

Some refactoring
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 07 Feb 2016 15:14:22 +0000
parents ab7d19e012e0
children 27afb9528618
line wrap: on
line source

//! Various iterators/adapters used for input/output formats.


use record_types::Record;
use std::fmt;
use std::io;

/// Transforms an iterator<string> into an iterator<Record>. It yields
/// records with the key being the position of the current record, starting with
/// 1. Mainly used as input iterator in the mapping phase, from sources that only
/// yield values (no keys).
pub struct PosRecordIterator<I: Iterator<Item = String>> {
    i: I,
    counter: u64,
}

impl<I: Iterator<Item = String>> PosRecordIterator<I> {
    pub fn new(it: I) -> PosRecordIterator<I> {
        PosRecordIterator {
            i: it,
            counter: 0,
        }
    }
}

impl<I: Iterator<Item = String>> Iterator for PosRecordIterator<I> {
    type Item = Record;
    fn next(&mut self) -> Option<Record> {
        match self.i.next() {
            None => None,
            Some(val) => {
                self.counter += 1;
                Some(Record {
                    key: fmt::format(format_args!("{}", self.counter)),
                    value: val,
                })
            }
        }
    }
}

/// Another transformation of [string] -> [(string,string)]; however,
/// this one always reads one value, treats it as key, and another one,
/// treated as value.
pub struct RecordReadIterator<I: Iterator<Item = String>> {
    i: I,
}

impl<I: Iterator<Item = String>> RecordReadIterator<I> {
    pub fn new(it: I) -> RecordReadIterator<I> {
        RecordReadIterator { i: it }
    }
}

impl<I: Iterator<Item = String>> Iterator for RecordReadIterator<I> {
    type Item = Record;
    fn next(&mut self) -> Option<Record> {
        let (k, v) = (self.i.next(), self.i.next());
        match (k, v) {
            (None, _) => None,
            (_, None) => None,
            (Some(k_), Some(v_)) => {
                Some(Record {
                    key: k_,
                    value: v_,
                })
            }
        }
    }
}

/// A type implementing SinkGenerator is used at the end of the reducer
/// phase to write the output. Given a name, new() should return a new object
/// that can be used to write the output of a reduce partition.
/// Values are always written as a whole to the writer.
///
/// SinkGenerator types are used in general to determine the format of outputs; existing options
/// are plain text files (LinesSinkGenerator) or length-prefixed binary files (WriteLogGenerator).
pub trait SinkGenerator: Send + Clone {
    type Sink: io::Write;
    /// Return a new output identified by name. The existing sink generators use `name` to open
    /// files with that name (or path).
    fn new_output(&self, name: &String) -> Self::Sink;
}