Mercurial > lbo > hg > localmr
changeset 22:63d1a6b2dfda
Make WriteLogWriter generic over the sink used.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 31 Jan 2016 14:11:06 +0000 |
parents | 10e930e0f531 |
children | 0620fcdadb9b |
files | src/formats/writelog.rs |
diffstat | 1 files changed, 13 insertions(+), 10 deletions(-) [+] |
line wrap: on
line diff
--- a/src/formats/writelog.rs Sun Jan 31 14:05:37 2016 +0000 +++ b/src/formats/writelog.rs Sun Jan 31 14:11:06 2016 +0000 @@ -32,8 +32,8 @@ /// files are indexed by IDX files describing offset and length of single entries, /// which is why we don't need length prefixes here. /// -pub struct WriteLogWriter { - dest: Box<Write>, +pub struct WriteLogWriter<Sink: Write + Sized> { + dest: Sink, current_length: u64, records_written: u32, @@ -59,9 +59,9 @@ val } -impl WriteLogWriter { +impl<Sink: Write + Sized> WriteLogWriter<Sink> { /// Return a new WriteLog that writes to dest - pub fn new(dest: Box<Write>) -> WriteLogWriter { + pub fn new(dest: Sink) -> WriteLogWriter<Sink> { WriteLogWriter { dest: dest, current_length: 0, @@ -70,14 +70,14 @@ } /// Opens a WriteLog for writing. Truncates a file if append == false. - pub fn new_to_file(file: &String, append: bool) -> io::Result<WriteLogWriter> { + pub fn new_to_file(file: &String, append: bool) -> io::Result<WriteLogWriter<fs::File>> { fs::OpenOptions::new() .create(true) .write(true) .append(append) .truncate(!append) .open(file) - .map(move |f| WriteLogWriter::new(Box::new(f))) + .map(move |f| WriteLogWriter::new(f)) } /// Return how many (bytes,records) have been written. @@ -85,7 +85,7 @@ (self.current_length, self.records_written) } } -impl Write for WriteLogWriter { +impl<Sink: Write + Sized> Write for WriteLogWriter<Sink> { fn write(&mut self, buf: &[u8]) -> Result<usize> { // BUG: May not account the length in a correct way if the length prefix // is written, but not the record. @@ -107,6 +107,9 @@ } } +/// Like LinesSinkGenerator, opens new WriteLog sinks based on a base path. +/// The framework-supplied suffices are appended directly to the base path given +/// to new(). (E.g. base = `/a/b/c`, input = `_shard1` => `/a/b/c_shard1`) pub struct WriteLogGenerator { base: String, } @@ -118,11 +121,11 @@ } impl MRSinkGenerator for WriteLogGenerator { - type Sink = WriteLogWriter; + type Sink = WriteLogWriter<fs::File>; fn new_output(&mut self, suffix: &String) -> Self::Sink { let mut path = self.base.clone(); path.push_str(&suffix[..]); - let writer = WriteLogWriter::new_to_file(&path, false); + let writer = WriteLogWriter::<fs::File>::new_to_file(&path, false); match writer { Err(e) => panic!("Could not open {}: {}", path, e), Ok(w) => w @@ -384,7 +387,7 @@ .bytes() .collect(); - match WriteLogWriter::new_to_file(&String::from("bench_file.wlg"), false) { + match WriteLogWriter::<fs::File>::new_to_file(&String::from("bench_file.wlg"), false) { Err(e) => panic!("{}", e), Ok(ref mut writer) => { let start = PreciseTime::now();