changeset 1:9e4aafb22d74

Add writelog code (copied from rustis)
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 30 Jan 2016 16:47:20 +0000
parents f1edf1791196
children c79a03c07e57
files src/formats/mod.rs src/formats/writelog.rs src/lib.rs
diffstat 3 files changed, 361 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/formats/mod.rs	Sat Jan 30 16:47:20 2016 +0000
@@ -0,0 +1,5 @@
+//! Contains code for on-disk data structures and file formats.
+
+// TODO: Write input module for text files
+//
+pub mod writelog;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/formats/writelog.rs	Sat Jan 30 16:47:20 2016 +0000
@@ -0,0 +1,354 @@
+#![allow(dead_code)]
+
+use std::io::{Result, Write, Read};
+use std::boxed::Box;
+use std::io;
+use std::fs;
+use std::vec;
+
+/// A length-prefixed record stream named for the original use case,
+/// which was to write a log of all write operations to a database.
+///
+/// # WriteLog
+/// 
+/// WriteLog is a persistent data structure designed to be written to disk
+/// that is a sequence of bytestring.
+/// It can be read back in relatively efficiently and yields the same byte
+/// strings; on disk, it is represented as records prefixed by 4 byte
+/// big-endian length prefixes:
+/// 
+///      llllbbbbbbllllbbllllbbbbbbbbb...
+/// 
+/// Where l is a length byte and b are bytes of a bytestring.
+/// 
+/// There is a special case of WriteLogs: The length-prefixing can be turned
+/// off in order to yield a better efficiency when encoding PCK files. Those
+/// files are indexed by IDX files describing offset and length of single entries,
+/// which is why we don't need length prefixes here.
+///
+pub struct WriteLogWriter {
+    dest: Box<Write>,
+
+    current_length: u64,
+    records_written: u32,
+}
+
+fn encode_u32(val: u32) -> [u8; 4] {
+    let mut buf: [u8; 4] = [0; 4];
+
+    for i in 0..4 {
+        buf[3 - i] = (val >> 8 * i) as u8;
+    }
+
+    buf
+}
+
+fn decode_u32(buf: [u8; 4]) -> u32 {
+    let mut val: u32 = 0;
+
+    for i in 0..4 {
+        val |= (buf[3 - i] as u32) << i * 8;
+    }
+
+    val
+}
+
+impl WriteLogWriter {
+    /// Return a new WriteLog that writes to dest
+    pub fn new(dest: Box<Write>) -> WriteLogWriter {
+        WriteLogWriter {
+            dest: dest,
+            current_length: 0,
+            records_written: 0,
+        }
+    }
+
+    /// Opens a WriteLog for writing. Truncates a file if append == false.
+    pub fn new_to_file(file: &String, append: bool) -> io::Result<WriteLogWriter> {
+        fs::OpenOptions::new()
+            .create(true)
+            .write(true)
+            .append(append)
+            .truncate(!append)
+            .open(file)
+            .map(move |f| WriteLogWriter::new(Box::new(f)))
+    }
+
+    /// Return how many (bytes,records) have been written.
+    pub fn get_stats(&self) -> (u64, u32) {
+        (self.current_length, self.records_written)
+    }
+}
+impl Write for WriteLogWriter {
+    fn write(&mut self, buf: &[u8]) -> Result<usize> {
+        // BUG: May not account the length in a correct way if the length prefix
+        // is written, but not the record.
+        let result = self.dest
+                         .write(&encode_u32(buf.len() as u32)[0..4])
+                         .and(self.dest.write(buf));
+        match result {
+            Err(_) => result,
+            Ok(_) => {
+                self.current_length += 4 + buf.len() as u64;
+                self.records_written += 1;
+                result
+            }
+        }
+    }
+
+    fn flush(&mut self) -> Result<()> {
+        self.dest.flush()
+    }
+}
+
+/// A Reader for WriteLog files. (more information on WriteLog files is to
+/// be found above at WriteLogWriter).
+pub struct WriteLogReader {
+    src: Box<Read>,
+    records_read: u32,
+    bytes_read: usize,
+}
+
+impl WriteLogReader {
+    pub fn new(src: Box<Read>) -> WriteLogReader {
+        WriteLogReader {
+            src: src,
+            records_read: 0,
+            bytes_read: 0,
+        }
+    }
+
+    pub fn new_from_file(file: &String) -> io::Result<WriteLogReader> {
+        fs::OpenOptions::new()
+            .read(true)
+            .open(file)
+            .map(move |f| WriteLogReader::new(Box::new(f)))
+    }
+
+    pub fn get_stats(&self) -> (u32, usize) {
+        (self.records_read, self.bytes_read)
+    }
+
+    // Inlining saves us up to 400ns per record (1600ns vs 2000ns)
+    #[inline]
+    fn read_bytes(&mut self, buf: &mut [u8], len: usize) -> io::Result<usize> {
+        let mut off = 0;
+        loop {
+            match self.src.read(&mut buf[off..len]) {
+                Err(e) => return Err(e),
+                Ok(s) => {
+                    if s == 0 {
+                        return Err(io::Error::new(io::ErrorKind::InvalidData,
+                                                  "Could not read enough data"));
+                    } else if off + s < len {
+                        off += s;
+                    } else {
+                        self.bytes_read += s;
+                        return Ok(off + s);
+                    }
+                }
+            }
+        }
+    }
+
+    /// Reads as many bytes as necessary into a vector and returns it.
+    /// This can of course take up much memory.
+    pub fn read_vec(&mut self) -> io::Result<vec::Vec<u8>> {
+        let mut lengthbuf = [0; 4];
+
+        let mut res = self.read_bytes(&mut lengthbuf, 4);
+
+        match res {
+            Err(e) => return Err(e),
+            Ok(_) => (),
+        }
+
+        let length = decode_u32(lengthbuf) as usize;
+        let mut buffer = vec::Vec::with_capacity(length);
+        buffer.resize(length, 0);
+
+        res = self.read_bytes(&mut buffer[..], length);
+
+        match res {
+            Err(e) => Err(e),
+            Ok(_) => {
+                self.records_read += 1;
+                Ok(buffer)
+            }
+        }
+    }
+}
+
+impl Read for WriteLogReader {
+    fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+        let mut lengthbuf = [0; 4];
+
+        let mut res = self.read_bytes(&mut lengthbuf, 4);
+
+        match res {
+            Err(_) => return res,
+            Ok(_) => (),
+        }
+
+        let mut length = decode_u32(lengthbuf) as usize;
+
+        if dst.len() < length {
+            length = dst.len();
+        }
+
+        res = self.read_bytes(dst, length);
+
+        match res {
+            Err(_) => res,
+            Ok(_) => {
+                self.records_read += 1;
+                res
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::{encode_u32, decode_u32};
+    use super::{WriteLogWriter, WriteLogReader};
+    use std::vec;
+    use std::io::{Read, Write};
+    use std::fs;
+    use std::string;
+
+    #[test]
+    fn test_u32_encoder() {
+        let testvals = [0, 1, 2, 31, 199, 100000, 111111, 3000000, 4100000000];
+
+        for val in testvals.into_iter() {
+            assert_eq!(decode_u32(encode_u32(*val)), *val);
+        }
+    }
+
+    #[test]
+    fn test_write() {
+        let buf1: vec::Vec<u8> = "abc".bytes().collect();
+        let buf2: vec::Vec<u8> = "def".bytes().collect();
+        let dst = vec::Vec::new();
+        let mut w = WriteLogWriter::new(Box::new(dst));
+
+        let _ = w.write(&buf1);
+        let _ = w.write(&buf2);
+
+        let (bytes, _) = w.get_stats();
+        assert_eq!(bytes, 2 * (4 + 3));
+    }
+
+    #[test]
+    fn test_write_read() {
+        let filename = "writelog_test.wlg";
+        {
+            match fs::OpenOptions::new().write(true).create(true).open(filename) {
+                Err(e) => panic!("{}", e),
+                Ok(f) => {
+                    let mut w = WriteLogWriter::new(Box::new(f));
+                    let buf1: vec::Vec<u8> = "abc".bytes().collect();
+                    let buf2: vec::Vec<u8> = "def".bytes().collect();
+
+                    let _ = w.write(&buf1);
+                    let _ = w.write(&buf2);
+
+                    let (bytes, _) = w.get_stats();
+                    assert_eq!(bytes, 2 * (4 + 3));
+                }
+            }
+        }
+        {
+            match fs::OpenOptions::new().read(true).open(filename) {
+                Err(e) => panic!("{}", e),
+                Ok(f) => {
+                    let mut r = WriteLogReader::new(Box::new(f));
+                    let mut buf = [0; 16];
+
+                    let res = r.read(&mut buf);
+                    match res {
+                        Err(e) => panic!("{}", e),
+                        Ok(_) => assert_eq!(string::String::from_utf8_lossy(&buf[0..3]), "abc"),
+                    }
+
+                    let res2 = r.read(&mut buf);
+                    match res2 {
+                        Err(e) => panic!("{}", e),
+                        Ok(_) => assert_eq!(string::String::from_utf8_lossy(&buf[0..3]), "def"),
+                    }
+                }
+            }
+        }
+    }
+
+    extern crate time;
+    use self::time::PreciseTime;
+
+    const N_ENTRIES: u32 = 1000000;
+
+    fn bench_a_writing() {
+        let buf: vec::Vec<u8> = "aaabbbcccdddeeefffggghhhiiijjjkkklllmmmnnnoooppp"
+                                    .bytes()
+                                    .collect();
+
+        match WriteLogWriter::new_to_file(&String::from("bench_file.wlg"), false) {
+            Err(e) => panic!("{}", e),
+            Ok(ref mut writer) => {
+                let start = PreciseTime::now();
+                let mut j = 0;
+
+                for _ in 0..N_ENTRIES {
+                    let _ = writer.write(&buf);
+                    j += 1;
+                }
+                let end = PreciseTime::now();
+                println!("Took {} total; {} per record.",
+                         start.to(end),
+                         start.to(end) / N_ENTRIES as i32);
+                assert_eq!(j, N_ENTRIES);
+
+                let (bytes, _) = writer.get_stats();
+                assert_eq!(bytes, (N_ENTRIES * 3 * 16 + N_ENTRIES * 4) as u64);
+            }
+        }
+    }
+
+    #[test]
+    #[allow(unreachable_code)]
+    fn bench_b_reading() {
+        //! Uses the data written by bench_a_writing().
+        return;
+        bench_a_writing();
+
+        match WriteLogReader::new_from_file(&String::from("bench_file.wlg")) {
+            Err(e) => panic!("{}", e),
+            Ok(ref mut reader) => {
+                let mut buf: [u8; 16 * 4] = [0; 16 * 4];
+                let mut i = 0;
+
+                let start = PreciseTime::now();
+                loop {
+                    match reader.read(&mut buf) {
+                        Err(e) => {
+                            println!("{}", e);
+                            break;
+                        }
+                        Ok(len) => {
+                            i += 1;
+                            assert_eq!(len, 16 * 3);
+                        }
+                    }
+                }
+                let end = PreciseTime::now();
+                println!("Took {} total; {} per record.",
+                         start.to(end),
+                         start.to(end) / N_ENTRIES as i32);
+                assert_eq!(i, N_ENTRIES);
+                assert_eq!(reader.get_stats(),
+                           (N_ENTRIES, (N_ENTRIES * 4 + N_ENTRIES * 3 * 16) as usize));
+            }
+        }
+
+    }
+}
--- a/src/lib.rs	Sat Jan 30 16:26:01 2016 +0000
+++ b/src/lib.rs	Sat Jan 30 16:47:20 2016 +0000
@@ -1,3 +1,5 @@
+pub mod formats;
+
 #[test]
 fn it_works() {
 }